Commit 99287d09 authored by John Belamaric's avatar John Belamaric Committed by GitHub

Watch feature (#1527)

* Add part 1 watch functionality. (squashed)

* add funcs for service/endpoint fqdns

* add endpoints watch

* document exposed funcs

* only send subset deltas

* locking for watch map

* tests and docs

* add pod watch

* remove debugs prints

* feedback part 1

* add error reporting to proto

* inform clients of server stop+errors

* add grpc options param

* use proper context

* Review feedback:
 * Removed client (will move to another repo)
 * Use new log functions
 * Change watchChan to be for string not []string
 * Rework how k8s plugin stores watch tracking info to simplify
 * Normalize the qname on watch request

* Add blank line back

* Revert another spurious change

* Fix tests

* Add stop channel.
Fix tests.
Better docs for plugin interface.

* fmt.Printf -> log.Warningf

* Move from dnsserver to plugin/pkg/watch

* gofmt

* remove dead client watches

* sate linter

* linter omg
parent b7480d5d
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"net" "net"
"github.com/coredns/coredns/pb" "github.com/coredns/coredns/pb"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/miekg/dns" "github.com/miekg/dns"
...@@ -22,6 +23,7 @@ type ServergRPC struct { ...@@ -22,6 +23,7 @@ type ServergRPC struct {
grpcServer *grpc.Server grpcServer *grpc.Server
listenAddr net.Addr listenAddr net.Addr
tlsConfig *tls.Config tlsConfig *tls.Config
watch watch.Watcher
} }
// NewServergRPC returns a new CoreDNS GRPC server and compiles all plugin in to it. // NewServergRPC returns a new CoreDNS GRPC server and compiles all plugin in to it.
...@@ -38,7 +40,7 @@ func NewServergRPC(addr string, group []*Config) (*ServergRPC, error) { ...@@ -38,7 +40,7 @@ func NewServergRPC(addr string, group []*Config) (*ServergRPC, error) {
tlsConfig = conf.TLSConfig tlsConfig = conf.TLSConfig
} }
return &ServergRPC{Server: s, tlsConfig: tlsConfig}, nil return &ServergRPC{Server: s, tlsConfig: tlsConfig, watch: watch.NewWatcher(watchables(s.zones))}, nil
} }
// Serve implements caddy.TCPServer interface. // Serve implements caddy.TCPServer interface.
...@@ -100,6 +102,9 @@ func (s *ServergRPC) OnStartupComplete() { ...@@ -100,6 +102,9 @@ func (s *ServergRPC) OnStartupComplete() {
func (s *ServergRPC) Stop() (err error) { func (s *ServergRPC) Stop() (err error) {
s.m.Lock() s.m.Lock()
defer s.m.Unlock() defer s.m.Unlock()
if s.watch != nil {
s.watch.Stop()
}
if s.grpcServer != nil { if s.grpcServer != nil {
s.grpcServer.GracefulStop() s.grpcServer.GracefulStop()
} }
...@@ -138,6 +143,12 @@ func (s *ServergRPC) Query(ctx context.Context, in *pb.DnsPacket) (*pb.DnsPacket ...@@ -138,6 +143,12 @@ func (s *ServergRPC) Query(ctx context.Context, in *pb.DnsPacket) (*pb.DnsPacket
return &pb.DnsPacket{Msg: packed}, nil return &pb.DnsPacket{Msg: packed}, nil
} }
// Watch is the entrypoint called by the gRPC layer when the user asks
// to watch a query.
func (s *ServergRPC) Watch(stream pb.DnsService_WatchServer) error {
return s.watch.Watch(stream)
}
// Shutdown stops the server (non gracefully). // Shutdown stops the server (non gracefully).
func (s *ServergRPC) Shutdown() error { func (s *ServergRPC) Shutdown() error {
if s.grpcServer != nil { if s.grpcServer != nil {
......
package dnsserver
import (
"github.com/coredns/coredns/plugin/pkg/watch"
)
func watchables(zones map[string]*Config) []watch.Watchable {
var w []watch.Watchable
for _, config := range zones {
plugins := config.Handlers()
for _, p := range plugins {
if x, ok := p.(watch.Watchable); ok {
w = append(w, x)
}
}
}
return w
}
This diff is collapsed.
...@@ -9,4 +9,41 @@ message DnsPacket { ...@@ -9,4 +9,41 @@ message DnsPacket {
service DnsService { service DnsService {
rpc Query (DnsPacket) returns (DnsPacket); rpc Query (DnsPacket) returns (DnsPacket);
rpc Watch (stream WatchRequest) returns (stream WatchResponse);
}
message WatchRequest {
// request_union is a request to either create a new watcher or cancel an existing watcher.
oneof request_union {
WatchCreateRequest create_request = 1;
WatchCancelRequest cancel_request = 2;
}
}
message WatchCreateRequest {
DnsPacket query = 1;
}
message WatchCancelRequest {
// watch_id is the watcher id to cancel
int64 watch_id = 1;
}
message WatchResponse {
// watch_id is the ID of the watcher that corresponds to the response.
int64 watch_id = 1;
// created is set to true if the response is for a create watch request.
// The client should record the watch_id and expect to receive DNS replies
// from the same stream.
// All replies sent to the created watcher will attach with the same watch_id.
bool created = 2;
// canceled is set to true if the response is for a cancel watch request.
// No further events will be sent to the canceled watcher.
bool canceled = 3;
string qname = 4;
string err = 5;
} }
...@@ -2,6 +2,7 @@ package federation ...@@ -2,6 +2,7 @@ package federation
import ( import (
"github.com/coredns/coredns/plugin/kubernetes" "github.com/coredns/coredns/plugin/kubernetes"
"github.com/coredns/coredns/plugin/pkg/watch"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
...@@ -15,6 +16,9 @@ func (APIConnFederationTest) Stop() error { return ni ...@@ -15,6 +16,9 @@ func (APIConnFederationTest) Stop() error { return ni
func (APIConnFederationTest) SvcIndexReverse(string) []*api.Service { return nil } func (APIConnFederationTest) SvcIndexReverse(string) []*api.Service { return nil }
func (APIConnFederationTest) EpIndexReverse(string) []*api.Endpoints { return nil } func (APIConnFederationTest) EpIndexReverse(string) []*api.Endpoints { return nil }
func (APIConnFederationTest) Modified() int64 { return 0 } func (APIConnFederationTest) Modified() int64 { return 0 }
func (APIConnFederationTest) SetWatchChan(watch.Chan) {}
func (APIConnFederationTest) Watch(string) error { return nil }
func (APIConnFederationTest) StopWatching(string) {}
func (APIConnFederationTest) PodIndex(string) []*api.Pod { func (APIConnFederationTest) PodIndex(string) []*api.Pod {
a := []*api.Pod{{ a := []*api.Pod{{
......
...@@ -110,6 +110,11 @@ kubernetes [ZONES...] { ...@@ -110,6 +110,11 @@ kubernetes [ZONES...] {
This plugin implements dynamic health checking. Currently this is limited to reporting healthy when This plugin implements dynamic health checking. Currently this is limited to reporting healthy when
the API has synced. the API has synced.
## Watch
This plugin implements watch. A client that connects to CoreDNS using `coredns/client` can be notified
of changes to A, AAAA, and SRV records for Kubernetes services and endpoints.
## Examples ## Examples
Handle all queries in the `cluster.local` zone. Connect to Kubernetes in-cluster. Also handle all Handle all queries in the `cluster.local` zone. Connect to Kubernetes in-cluster. Also handle all
......
...@@ -7,6 +7,8 @@ import ( ...@@ -7,6 +7,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
dnswatch "github.com/coredns/coredns/plugin/pkg/watch"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
...@@ -45,6 +47,11 @@ type dnsController interface { ...@@ -45,6 +47,11 @@ type dnsController interface {
// Modified returns the timestamp of the most recent changes // Modified returns the timestamp of the most recent changes
Modified() int64 Modified() int64
// Watch-related items
SetWatchChan(dnswatch.Chan)
Watch(string) error
StopWatching(string)
} }
type dnsControl struct { type dnsControl struct {
...@@ -73,6 +80,12 @@ type dnsControl struct { ...@@ -73,6 +80,12 @@ type dnsControl struct {
stopLock sync.Mutex stopLock sync.Mutex
shutdown bool shutdown bool
stopCh chan struct{} stopCh chan struct{}
// watch-related items channel
watchChan dnswatch.Chan
watched map[string]bool
zones []string
endpointNameMode bool
} }
type dnsControlOpts struct { type dnsControlOpts struct {
...@@ -83,14 +96,20 @@ type dnsControlOpts struct { ...@@ -83,14 +96,20 @@ type dnsControlOpts struct {
// Label handling. // Label handling.
labelSelector *meta.LabelSelector labelSelector *meta.LabelSelector
selector labels.Selector selector labels.Selector
zones []string
endpointNameMode bool
} }
// newDNSController creates a controller for CoreDNS. // newDNSController creates a controller for CoreDNS.
func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dnsControl { func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dnsControl {
dns := dnsControl{ dns := dnsControl{
client: kubeClient, client: kubeClient,
selector: opts.selector, selector: opts.selector,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
watched: make(map[string]bool),
zones: opts.zones,
endpointNameMode: opts.endpointNameMode,
} }
dns.svcLister, dns.svcController = cache.NewIndexerInformer( dns.svcLister, dns.svcController = cache.NewIndexerInformer(
...@@ -292,6 +311,22 @@ func namespaceWatchFunc(c *kubernetes.Clientset, s labels.Selector) func(options ...@@ -292,6 +311,22 @@ func namespaceWatchFunc(c *kubernetes.Clientset, s labels.Selector) func(options
} }
} }
func (dns *dnsControl) SetWatchChan(c dnswatch.Chan) {
dns.watchChan = c
}
func (dns *dnsControl) Watch(qname string) error {
if dns.watchChan == nil {
return fmt.Errorf("cannot start watch because the channel has not been set")
}
dns.watched[qname] = true
return nil
}
func (dns *dnsControl) StopWatching(qname string) {
delete(dns.watched, qname)
}
// Stop stops the controller. // Stop stops the controller.
func (dns *dnsControl) Stop() error { func (dns *dnsControl) Stop() error {
dns.stopLock.Lock() dns.stopLock.Lock()
...@@ -492,63 +527,164 @@ func (dns *dnsControl) updateModifed() { ...@@ -492,63 +527,164 @@ func (dns *dnsControl) updateModifed() {
atomic.StoreInt64(&dns.modified, unix) atomic.StoreInt64(&dns.modified, unix)
} }
func (dns *dnsControl) Add(obj interface{}) { dns.updateModifed() } func (dns *dnsControl) sendServiceUpdates(s *api.Service) {
func (dns *dnsControl) Delete(obj interface{}) { dns.updateModifed() } for i := range dns.zones {
name := serviceFQDN(s, dns.zones[i])
if _, ok := dns.watched[name]; ok {
dns.watchChan <- name
}
}
}
func (dns *dnsControl) sendPodUpdates(p *api.Pod) {
for i := range dns.zones {
name := podFQDN(p, dns.zones[i])
if _, ok := dns.watched[name]; ok {
dns.watchChan <- name
}
}
}
func (dns *dnsControl) sendEndpointsUpdates(ep *api.Endpoints) {
for _, zone := range dns.zones {
names := append(endpointFQDN(ep, zone, dns.endpointNameMode), serviceFQDN(ep, zone))
for _, name := range names {
if _, ok := dns.watched[name]; ok {
dns.watchChan <- name
}
}
}
}
// endpointsSubsetDiffs returns an Endpoints struct containing the Subsets that have changed between a and b.
// When we notify clients of changed endpoints we only want to notify them of endpoints that have changed.
// The Endpoints API object holds more than one endpoint, held in a list of Subsets. Each Subset refers to
// an endpoint. So, here we create a new Endpoints struct, and populate it with only the endpoints that have changed.
// This new Endpoints object is later used to generate the list of endpoint FQDNs to send to the client.
// This function computes this literally by combining the sets (in a and not in b) union (in b and not in a).
func endpointsSubsetDiffs(a, b *api.Endpoints) *api.Endpoints {
c := b.DeepCopy()
c.Subsets = []api.EndpointSubset{}
// In the following loop, the first iteration computes (in a but not in b).
// The second iteration then adds (in b but not in a)
// The end result is an Endpoints that only contains the subsets (endpoints) that are different between a and b.
for _, abba := range [][]*api.Endpoints{{a, b}, {b, a}} {
a := abba[0]
b := abba[1]
left:
for _, as := range a.Subsets {
for _, bs := range b.Subsets {
if subsetsEquivalent(as, bs) {
continue left
}
}
c.Subsets = append(c.Subsets, as)
}
}
return c
}
func (dns *dnsControl) Update(objOld, newObj interface{}) { // sendUpdates sends a notification to the server if a watch
// endpoint updates can come frequently, make sure // is enabled for the qname
// it's a change we care about func (dns *dnsControl) sendUpdates(oldObj, newObj interface{}) {
if o, ok := objOld.(*api.Endpoints); ok { // If both objects have the same resource version, they are identical.
n := newObj.(*api.Endpoints) if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) {
if endpointsEquivalent(o, n) { return
}
obj := newObj
if obj == nil {
obj = oldObj
}
switch ob := obj.(type) {
case *api.Service:
dns.updateModifed()
dns.sendServiceUpdates(ob)
case *api.Endpoints:
if newObj == nil || oldObj == nil {
dns.updateModifed()
dns.sendEndpointsUpdates(ob)
return
}
p := oldObj.(*api.Endpoints)
// endpoint updates can come frequently, make sure it's a change we care about
if endpointsEquivalent(p, ob) {
return return
} }
dns.updateModifed()
dns.sendEndpointsUpdates(endpointsSubsetDiffs(p, ob))
case *api.Pod:
dns.updateModifed()
dns.sendPodUpdates(ob)
default:
log.Warningf("Updates for %T not supported.", ob)
} }
dns.updateModifed()
} }
// endpointsEquivalent checks if the update to an endpoint is something func (dns *dnsControl) Add(obj interface{}) {
// that matters to us: ready addresses, host names, ports (including names for SRV) dns.sendUpdates(nil, obj)
func endpointsEquivalent(a, b *api.Endpoints) bool { }
// supposedly we should be able to rely on func (dns *dnsControl) Delete(obj interface{}) {
// these being sorted and able to be compared dns.sendUpdates(obj, nil)
// they are supposed to be in a canonical format }
func (dns *dnsControl) Update(oldObj, newObj interface{}) {
dns.sendUpdates(oldObj, newObj)
}
if len(a.Subsets) != len(b.Subsets) { // subsetsEquivalent checks if two endpoint subsets are significantly equivalent
// I.e. that they have the same ready addresses, host names, ports (including protocol
// and service names for SRV)
func subsetsEquivalent(sa, sb api.EndpointSubset) bool {
if len(sa.Addresses) != len(sb.Addresses) {
return false
}
if len(sa.Ports) != len(sb.Ports) {
return false return false
} }
for i, sa := range a.Subsets { // in Addresses and Ports, we should be able to rely on
// check the Addresses and Ports. Ignore unready addresses. // these being sorted and able to be compared
sb := b.Subsets[i] // they are supposed to be in a canonical format
if len(sa.Addresses) != len(sb.Addresses) { for addr, aaddr := range sa.Addresses {
baddr := sb.Addresses[addr]
if aaddr.IP != baddr.IP {
return false return false
} }
if len(sa.Ports) != len(sb.Ports) { if aaddr.Hostname != baddr.Hostname {
return false return false
} }
}
for addr, aaddr := range sa.Addresses { for port, aport := range sa.Ports {
baddr := sb.Addresses[addr] bport := sb.Ports[port]
if aaddr.IP != baddr.IP { if aport.Name != bport.Name {
return false return false
} }
if aaddr.Hostname != baddr.Hostname { if aport.Port != bport.Port {
return false return false
} }
if aport.Protocol != bport.Protocol {
return false
} }
}
return true
}
for port, aport := range sa.Ports { // endpointsEquivalent checks if the update to an endpoint is something
bport := sb.Ports[port] // that matters to us or if they are effectively equivalent.
if aport.Name != bport.Name { func endpointsEquivalent(a, b *api.Endpoints) bool {
return false
} if len(a.Subsets) != len(b.Subsets) {
if aport.Port != bport.Port { return false
return false }
}
if aport.Protocol != bport.Protocol { // we should be able to rely on
return false // these being sorted and able to be compared
} // they are supposed to be in a canonical format
for i, sa := range a.Subsets {
sb := b.Subsets[i]
if !subsetsEquivalent(sa, sb) {
return false
} }
} }
return true return true
......
package kubernetes
import (
"strconv"
"strings"
"testing"
api "k8s.io/api/core/v1"
)
func endpointSubsets(addrs ...string) (eps []api.EndpointSubset) {
for _, ap := range addrs {
apa := strings.Split(ap, ":")
address := apa[0]
port, _ := strconv.Atoi(apa[1])
eps = append(eps, api.EndpointSubset{Addresses: []api.EndpointAddress{{IP: address}}, Ports: []api.EndpointPort{{Port: int32(port)}}})
}
return eps
}
func TestEndpointsSubsetDiffs(t *testing.T) {
var tests = []struct {
a, b, expected api.Endpoints
}{
{ // From a->b: Nothing changes
api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
api.Endpoints{},
},
{ // From a->b: Everything goes away
api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
api.Endpoints{},
api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
},
{ // From a->b: Everything is new
api.Endpoints{},
api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
},
{ // From a->b: One goes away, one is new
api.Endpoints{Subsets: endpointSubsets("10.0.0.2:8080")},
api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80")},
api.Endpoints{Subsets: endpointSubsets("10.0.0.2:8080", "10.0.0.1:80")},
},
}
for i, te := range tests {
got := endpointsSubsetDiffs(&te.a, &te.b)
if !endpointsEquivalent(got, &te.expected) {
t.Errorf("Expected '%v' for test %v, got '%v'.", te.expected, i, got)
}
}
}
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/coredns/coredns/plugin/pkg/dnstest" "github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/coredns/coredns/plugin/test" "github.com/coredns/coredns/plugin/test"
"github.com/miekg/dns" "github.com/miekg/dns"
...@@ -332,6 +333,9 @@ func (APIConnServeTest) Stop() error { return nil } ...@@ -332,6 +333,9 @@ func (APIConnServeTest) Stop() error { return nil }
func (APIConnServeTest) EpIndexReverse(string) []*api.Endpoints { return nil } func (APIConnServeTest) EpIndexReverse(string) []*api.Endpoints { return nil }
func (APIConnServeTest) SvcIndexReverse(string) []*api.Service { return nil } func (APIConnServeTest) SvcIndexReverse(string) []*api.Service { return nil }
func (APIConnServeTest) Modified() int64 { return time.Now().Unix() } func (APIConnServeTest) Modified() int64 { return time.Now().Unix() }
func (APIConnServeTest) SetWatchChan(watch.Chan) {}
func (APIConnServeTest) Watch(string) error { return nil }
func (APIConnServeTest) StopWatching(string) {}
func (APIConnServeTest) PodIndex(string) []*api.Pod { func (APIConnServeTest) PodIndex(string) []*api.Pod {
a := []*api.Pod{{ a := []*api.Pod{{
......
...@@ -260,6 +260,8 @@ func (k *Kubernetes) InitKubeCache() (err error) { ...@@ -260,6 +260,8 @@ func (k *Kubernetes) InitKubeCache() (err error) {
k.opts.initPodCache = k.podMode == podModeVerified k.opts.initPodCache = k.podMode == podModeVerified
k.opts.zones = k.Zones
k.opts.endpointNameMode = k.endpointNameMode
k.APIConn = newdnsController(kubeClient, k.opts) k.APIConn = newdnsController(kubeClient, k.opts)
return err return err
...@@ -292,6 +294,29 @@ func (k *Kubernetes) Records(state request.Request, exact bool) ([]msg.Service, ...@@ -292,6 +294,29 @@ func (k *Kubernetes) Records(state request.Request, exact bool) ([]msg.Service,
return services, err return services, err
} }
// serviceFQDN returns the k8s cluster dns spec service FQDN for the service (or endpoint) object.
func serviceFQDN(obj meta.Object, zone string) string {
return dnsutil.Join(append([]string{}, obj.GetName(), obj.GetNamespace(), Svc, zone))
}
// podFQDN returns the k8s cluster dns spec FQDN for the pod.
func podFQDN(p *api.Pod, zone string) string {
name := strings.Replace(p.Status.PodIP, ".", "-", -1)
name = strings.Replace(name, ":", "-", -1)
return dnsutil.Join(append([]string{}, name, p.GetNamespace(), Pod, zone))
}
// endpointFQDN returns a list of k8s cluster dns spec service FQDNs for each subset in the endpoint.
func endpointFQDN(ep *api.Endpoints, zone string, endpointNameMode bool) []string {
var names []string
for _, ss := range ep.Subsets {
for _, addr := range ss.Addresses {
names = append(names, dnsutil.Join(append([]string{}, endpointHostname(addr, endpointNameMode), serviceFQDN(ep, zone))))
}
}
return names
}
func endpointHostname(addr api.EndpointAddress, endpointNameMode bool) string { func endpointHostname(addr api.EndpointAddress, endpointNameMode bool) string {
if addr.Hostname != "" { if addr.Hostname != "" {
return strings.ToLower(addr.Hostname) return strings.ToLower(addr.Hostname)
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"testing" "testing"
"github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/coredns/coredns/request" "github.com/coredns/coredns/request"
"github.com/miekg/dns" "github.com/miekg/dns"
...@@ -64,6 +65,9 @@ func (APIConnServiceTest) PodIndex(string) []*api.Pod { return nil } ...@@ -64,6 +65,9 @@ func (APIConnServiceTest) PodIndex(string) []*api.Pod { return nil }
func (APIConnServiceTest) SvcIndexReverse(string) []*api.Service { return nil } func (APIConnServiceTest) SvcIndexReverse(string) []*api.Service { return nil }
func (APIConnServiceTest) EpIndexReverse(string) []*api.Endpoints { return nil } func (APIConnServiceTest) EpIndexReverse(string) []*api.Endpoints { return nil }
func (APIConnServiceTest) Modified() int64 { return 0 } func (APIConnServiceTest) Modified() int64 { return 0 }
func (APIConnServiceTest) SetWatchChan(watch.Chan) {}
func (APIConnServiceTest) Watch(string) error { return nil }
func (APIConnServiceTest) StopWatching(string) {}
func (APIConnServiceTest) SvcIndex(string) []*api.Service { func (APIConnServiceTest) SvcIndex(string) []*api.Service {
svcs := []*api.Service{ svcs := []*api.Service{
...@@ -390,3 +394,85 @@ func TestServices(t *testing.T) { ...@@ -390,3 +394,85 @@ func TestServices(t *testing.T) {
} }
} }
} }
func TestServiceFQDN(t *testing.T) {
fqdn := serviceFQDN(
&api.Service{
ObjectMeta: meta.ObjectMeta{
Name: "svc1",
Namespace: "testns",
},
}, "cluster.local")
expected := "svc1.testns.svc.cluster.local."
if fqdn != expected {
t.Errorf("Expected '%v', got '%v'.", expected, fqdn)
}
}
func TestPodFQDN(t *testing.T) {
fqdn := podFQDN(
&api.Pod{
ObjectMeta: meta.ObjectMeta{
Name: "pod1",
Namespace: "testns",
},
Status: api.PodStatus{
PodIP: "10.10.0.10",
},
}, "cluster.local")
expected := "10-10-0-10.testns.pod.cluster.local."
if fqdn != expected {
t.Errorf("Expected '%v', got '%v'.", expected, fqdn)
}
fqdn = podFQDN(
&api.Pod{
ObjectMeta: meta.ObjectMeta{
Name: "pod1",
Namespace: "testns",
},
Status: api.PodStatus{
PodIP: "aaaa:bbbb:cccc::zzzz",
},
}, "cluster.local")
expected = "aaaa-bbbb-cccc--zzzz.testns.pod.cluster.local."
if fqdn != expected {
t.Errorf("Expected '%v', got '%v'.", expected, fqdn)
}
}
func TestEndpointFQDN(t *testing.T) {
fqdns := endpointFQDN(
&api.Endpoints{
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{
{
IP: "172.0.0.1",
Hostname: "ep1a",
},
{
IP: "172.0.0.2",
},
},
},
},
ObjectMeta: meta.ObjectMeta{
Name: "svc1",
Namespace: "testns",
},
}, "cluster.local", false)
expected := []string{
"ep1a.svc1.testns.svc.cluster.local.",
"172-0-0-2.svc1.testns.svc.cluster.local.",
}
for i := range fqdns {
if fqdns[i] != expected[i] {
t.Errorf("Expected '%v', got '%v'.", expected[i], fqdns[i])
}
}
}
...@@ -3,6 +3,8 @@ package kubernetes ...@@ -3,6 +3,8 @@ package kubernetes
import ( import (
"testing" "testing"
"github.com/coredns/coredns/plugin/pkg/watch"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
...@@ -18,6 +20,9 @@ func (APIConnTest) SvcIndexReverse(string) []*api.Service { return nil } ...@@ -18,6 +20,9 @@ func (APIConnTest) SvcIndexReverse(string) []*api.Service { return nil }
func (APIConnTest) EpIndex(string) []*api.Endpoints { return nil } func (APIConnTest) EpIndex(string) []*api.Endpoints { return nil }
func (APIConnTest) EndpointsList() []*api.Endpoints { return nil } func (APIConnTest) EndpointsList() []*api.Endpoints { return nil }
func (APIConnTest) Modified() int64 { return 0 } func (APIConnTest) Modified() int64 { return 0 }
func (APIConnTest) SetWatchChan(watch.Chan) {}
func (APIConnTest) Watch(string) error { return nil }
func (APIConnTest) StopWatching(string) {}
func (APIConnTest) ServiceList() []*api.Service { func (APIConnTest) ServiceList() []*api.Service {
svcs := []*api.Service{ svcs := []*api.Service{
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"testing" "testing"
"github.com/coredns/coredns/plugin/pkg/dnstest" "github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/coredns/coredns/plugin/test" "github.com/coredns/coredns/plugin/test"
"github.com/miekg/dns" "github.com/miekg/dns"
...@@ -22,6 +23,9 @@ func (APIConnReverseTest) EpIndex(string) []*api.Endpoints { return nil } ...@@ -22,6 +23,9 @@ func (APIConnReverseTest) EpIndex(string) []*api.Endpoints { return nil }
func (APIConnReverseTest) EndpointsList() []*api.Endpoints { return nil } func (APIConnReverseTest) EndpointsList() []*api.Endpoints { return nil }
func (APIConnReverseTest) ServiceList() []*api.Service { return nil } func (APIConnReverseTest) ServiceList() []*api.Service { return nil }
func (APIConnReverseTest) Modified() int64 { return 0 } func (APIConnReverseTest) Modified() int64 { return 0 }
func (APIConnReverseTest) SetWatchChan(watch.Chan) {}
func (APIConnReverseTest) Watch(string) error { return nil }
func (APIConnReverseTest) StopWatching(string) {}
func (APIConnReverseTest) SvcIndex(svc string) []*api.Service { func (APIConnReverseTest) SvcIndex(svc string) []*api.Service {
if svc != "svc1.testns" { if svc != "svc1.testns" {
......
package kubernetes
import (
"github.com/coredns/coredns/plugin/pkg/watch"
)
// SetWatchChan implements watch.Watchable
func (k *Kubernetes) SetWatchChan(c watch.Chan) {
k.APIConn.SetWatchChan(c)
}
// Watch is called when a watch is started for a name.
func (k *Kubernetes) Watch(qname string) error {
return k.APIConn.Watch(qname)
}
// StopWatching is called when no more watches remain for a name
func (k *Kubernetes) StopWatching(qname string) {
k.APIConn.StopWatching(qname)
}
package kubernetes
import (
"testing"
"github.com/coredns/coredns/plugin/pkg/watch"
)
func TestIsWatchable(t *testing.T) {
k := &Kubernetes{}
var i interface{} = k
if _, ok := i.(watch.Watchable); !ok {
t.Error("Kubernetes should implement watch.Watchable and does not")
}
}
package watch
// Chan is used to inform the server of a change. Whenever
// a watched FQDN has a change in data, that FQDN should be
// sent down this channel.
type Chan chan string
// Watchable is the interface watchable plugins should implement
type Watchable interface {
// Name returns the plugin name.
Name() string
// SetWatchChan is called when the watch channel is created.
SetWatchChan(Chan)
// Watch is called whenever a watch is created for a FQDN. Plugins
// should send the FQDN down the watch channel when its data may have
// changed. This is an exact match only.
Watch(qname string) error
// StopWatching is called whenever all watches are canceled for a FQDN.
StopWatching(qname string)
}
package watch
import (
"fmt"
"io"
"sync"
"github.com/miekg/dns"
"github.com/coredns/coredns/pb"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/request"
)
// Watcher handles watch creation, cancellation, and processing.
type Watcher interface {
// Watch monitors a client stream and creates and cancels watches.
Watch(pb.DnsService_WatchServer) error
// Stop cancels open watches and stops the watch processing go routine.
Stop()
}
// Manager contains all the data needed to manage watches
type Manager struct {
changes Chan
stopper chan bool
counter int64
watches map[string]watchlist
plugins []Watchable
mutex sync.Mutex
}
type watchlist map[int64]pb.DnsService_WatchServer
// NewWatcher creates a Watcher, which is used to manage watched names.
func NewWatcher(plugins []Watchable) *Manager {
w := &Manager{changes: make(Chan), stopper: make(chan bool), watches: make(map[string]watchlist), plugins: plugins}
for _, p := range plugins {
p.SetWatchChan(w.changes)
}
go w.process()
return w
}
func (w *Manager) nextID() int64 {
w.mutex.Lock()
w.counter++
id := w.counter
w.mutex.Unlock()
return id
}
// Watch monitors a client stream and creates and cancels watches.
func (w *Manager) Watch(stream pb.DnsService_WatchServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
create := in.GetCreateRequest()
if create != nil {
msg := new(dns.Msg)
err := msg.Unpack(create.Query.Msg)
if err != nil {
log.Warningf("Could not decode watch request: %s\n", err)
stream.Send(&pb.WatchResponse{Err: "could not decode request"})
continue
}
id := w.nextID()
if err := stream.Send(&pb.WatchResponse{WatchId: id, Created: true}); err != nil {
// if we fail to notify client of watch creation, don't create the watch
continue
}
// Normalize qname
qname := (&request.Request{Req: msg}).Name()
w.mutex.Lock()
if _, ok := w.watches[qname]; !ok {
w.watches[qname] = make(watchlist)
}
w.watches[qname][id] = stream
w.mutex.Unlock()
for _, p := range w.plugins {
err := p.Watch(qname)
if err != nil {
log.Warningf("Failed to start watch for %s in plugin %s: %s\n", qname, p.Name(), err)
stream.Send(&pb.WatchResponse{Err: fmt.Sprintf("failed to start watch for %s in plugin %s", qname, p.Name())})
}
}
continue
}
cancel := in.GetCancelRequest()
if cancel != nil {
w.mutex.Lock()
for qname, wl := range w.watches {
ws, ok := wl[cancel.WatchId]
if !ok {
continue
}
// only allow cancels from the client that started it
// TODO: test what happens if a stream tries to cancel a watchID that it doesn't own
if ws != stream {
continue
}
delete(wl, cancel.WatchId)
// if there are no more watches for this qname, we should tell the plugins
if len(wl) == 0 {
for _, p := range w.plugins {
p.StopWatching(qname)
}
delete(w.watches, qname)
}
// let the client know we canceled the watch
stream.Send(&pb.WatchResponse{WatchId: cancel.WatchId, Canceled: true})
}
w.mutex.Unlock()
continue
}
}
}
func (w *Manager) process() {
for {
select {
case <-w.stopper:
return
case changed := <-w.changes:
w.mutex.Lock()
for qname, wl := range w.watches {
if plugin.Zones([]string{changed}).Matches(qname) == "" {
continue
}
for id, stream := range wl {
wr := pb.WatchResponse{WatchId: id, Qname: qname}
err := stream.Send(&wr)
if err != nil {
log.Warningf("Error sending change for %s to watch %d: %s. Removing watch.\n", qname, id, err)
delete(w.watches[qname], id)
}
}
}
w.mutex.Unlock()
}
}
}
// Stop cancels open watches and stops the watch processing go routine.
func (w *Manager) Stop() {
w.stopper <- true
w.mutex.Lock()
for wn, wl := range w.watches {
for id, stream := range wl {
wr := pb.WatchResponse{WatchId: id, Canceled: true}
err := stream.Send(&wr)
if err != nil {
log.Warningf("Error notifiying client of cancellation: %s\n", err)
}
}
delete(w.watches, wn)
}
w.mutex.Unlock()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment