Commit 29cb00aa authored by Miek Gieben's avatar Miek Gieben Committed by GitHub

Remove grpc watch functionality (#2549)

This was added, but didn't see any use. For a large, complex chunk of
code we should have some users of it.

Remove all watch functionally from plugins, servers and packages.

Fixes: #2548
Signed-off-by: default avatarMiek Gieben <miek@miek.nl>
parent f6981938
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"github.com/coredns/coredns/pb" "github.com/coredns/coredns/pb"
"github.com/coredns/coredns/plugin/pkg/transport" "github.com/coredns/coredns/plugin/pkg/transport"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/miekg/dns" "github.com/miekg/dns"
...@@ -24,7 +23,6 @@ type ServergRPC struct { ...@@ -24,7 +23,6 @@ type ServergRPC struct {
grpcServer *grpc.Server grpcServer *grpc.Server
listenAddr net.Addr listenAddr net.Addr
tlsConfig *tls.Config tlsConfig *tls.Config
watch watch.Watcher
} }
// NewServergRPC returns a new CoreDNS GRPC server and compiles all plugin in to it. // NewServergRPC returns a new CoreDNS GRPC server and compiles all plugin in to it.
...@@ -41,7 +39,7 @@ func NewServergRPC(addr string, group []*Config) (*ServergRPC, error) { ...@@ -41,7 +39,7 @@ func NewServergRPC(addr string, group []*Config) (*ServergRPC, error) {
tlsConfig = conf.TLSConfig tlsConfig = conf.TLSConfig
} }
return &ServergRPC{Server: s, tlsConfig: tlsConfig, watch: watch.NewWatcher(watchables(s.zones))}, nil return &ServergRPC{Server: s, tlsConfig: tlsConfig}, nil
} }
// Serve implements caddy.TCPServer interface. // Serve implements caddy.TCPServer interface.
...@@ -103,9 +101,6 @@ func (s *ServergRPC) OnStartupComplete() { ...@@ -103,9 +101,6 @@ func (s *ServergRPC) OnStartupComplete() {
func (s *ServergRPC) Stop() (err error) { func (s *ServergRPC) Stop() (err error) {
s.m.Lock() s.m.Lock()
defer s.m.Unlock() defer s.m.Unlock()
if s.watch != nil {
s.watch.Stop()
}
if s.grpcServer != nil { if s.grpcServer != nil {
s.grpcServer.GracefulStop() s.grpcServer.GracefulStop()
} }
...@@ -144,12 +139,6 @@ func (s *ServergRPC) Query(ctx context.Context, in *pb.DnsPacket) (*pb.DnsPacket ...@@ -144,12 +139,6 @@ func (s *ServergRPC) Query(ctx context.Context, in *pb.DnsPacket) (*pb.DnsPacket
return &pb.DnsPacket{Msg: packed}, nil return &pb.DnsPacket{Msg: packed}, nil
} }
// Watch is the entrypoint called by the gRPC layer when the user asks
// to watch a query.
func (s *ServergRPC) Watch(stream pb.DnsService_WatchServer) error {
return s.watch.Watch(stream)
}
// Shutdown stops the server (non gracefully). // Shutdown stops the server (non gracefully).
func (s *ServergRPC) Shutdown() error { func (s *ServergRPC) Shutdown() error {
if s.grpcServer != nil { if s.grpcServer != nil {
......
package dnsserver
import (
"github.com/coredns/coredns/plugin/pkg/watch"
)
func watchables(zones map[string]*Config) []watch.Watchable {
var w []watch.Watchable
for _, config := range zones {
plugins := config.Handlers()
for _, p := range plugins {
if x, ok := p.(watch.Watchable); ok {
w = append(w, x)
}
}
}
return w
}
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
all: dns.pb.go all: dns.pb.go
dns.pb.go: dns.proto dns.pb.go: dns.proto
protoc --go_out=plugins=grpc:. dns.proto && \ protoc --go_out=plugins=grpc:. dns.proto
sed -e s?golang.org/x/net/context?context? < dns.pb.go > dns.pb.go.tmp && \
mv dns.pb.go.tmp dns.pb.go .PHONY: clean
clean:
rm dns.pb.go
This diff is collapsed.
...@@ -9,41 +9,4 @@ message DnsPacket { ...@@ -9,41 +9,4 @@ message DnsPacket {
service DnsService { service DnsService {
rpc Query (DnsPacket) returns (DnsPacket); rpc Query (DnsPacket) returns (DnsPacket);
rpc Watch (stream WatchRequest) returns (stream WatchResponse);
}
message WatchRequest {
// request_union is a request to either create a new watcher or cancel an existing watcher.
oneof request_union {
WatchCreateRequest create_request = 1;
WatchCancelRequest cancel_request = 2;
}
}
message WatchCreateRequest {
DnsPacket query = 1;
}
message WatchCancelRequest {
// watch_id is the watcher id to cancel
int64 watch_id = 1;
}
message WatchResponse {
// watch_id is the ID of the watcher that corresponds to the response.
int64 watch_id = 1;
// created is set to true if the response is for a create watch request.
// The client should record the watch_id and expect to receive DNS replies
// from the same stream.
// All replies sent to the created watcher will attach with the same watch_id.
bool created = 2;
// canceled is set to true if the response is for a cancel watch request.
// No further events will be sent to the canceled watcher.
bool canceled = 3;
string qname = 4;
string err = 5;
} }
...@@ -3,7 +3,6 @@ package federation ...@@ -3,7 +3,6 @@ package federation
import ( import (
"github.com/coredns/coredns/plugin/kubernetes" "github.com/coredns/coredns/plugin/kubernetes"
"github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/watch"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
...@@ -19,9 +18,6 @@ func (APIConnFederationTest) Stop() error { return ...@@ -19,9 +18,6 @@ func (APIConnFederationTest) Stop() error { return
func (APIConnFederationTest) SvcIndexReverse(string) []*object.Service { return nil } func (APIConnFederationTest) SvcIndexReverse(string) []*object.Service { return nil }
func (APIConnFederationTest) EpIndexReverse(string) []*object.Endpoints { return nil } func (APIConnFederationTest) EpIndexReverse(string) []*object.Endpoints { return nil }
func (APIConnFederationTest) Modified() int64 { return 0 } func (APIConnFederationTest) Modified() int64 { return 0 }
func (APIConnFederationTest) SetWatchChan(watch.Chan) {}
func (APIConnFederationTest) Watch(string) error { return nil }
func (APIConnFederationTest) StopWatching(string) {}
func (APIConnFederationTest) PodIndex(string) []*object.Pod { func (APIConnFederationTest) PodIndex(string) []*object.Pod {
return []*object.Pod{ return []*object.Pod{
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"github.com/coredns/coredns/plugin/kubernetes" "github.com/coredns/coredns/plugin/kubernetes"
"github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/dnstest" "github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/coredns/coredns/plugin/test" "github.com/coredns/coredns/plugin/test"
"github.com/coredns/coredns/request" "github.com/coredns/coredns/request"
...@@ -158,9 +157,6 @@ func (external) Stop() error { return nil } ...@@ -158,9 +157,6 @@ func (external) Stop() error { return nil }
func (external) EpIndexReverse(string) []*object.Endpoints { return nil } func (external) EpIndexReverse(string) []*object.Endpoints { return nil }
func (external) SvcIndexReverse(string) []*object.Service { return nil } func (external) SvcIndexReverse(string) []*object.Service { return nil }
func (external) Modified() int64 { return 0 } func (external) Modified() int64 { return 0 }
func (external) SetWatchChan(watch.Chan) {}
func (external) Watch(string) error { return nil }
func (external) StopWatching(string) {}
func (external) EpIndex(s string) []*object.Endpoints { return nil } func (external) EpIndex(s string) []*object.Endpoints { return nil }
func (external) EndpointsList() []*object.Endpoints { return nil } func (external) EndpointsList() []*object.Endpoints { return nil }
func (external) GetNodeByName(name string) (*api.Node, error) { return nil, nil } func (external) GetNodeByName(name string) (*api.Node, error) { return nil, nil }
......
...@@ -112,11 +112,6 @@ kubernetes [ZONES...] { ...@@ -112,11 +112,6 @@ kubernetes [ZONES...] {
This plugin implements dynamic health checking. Currently this is limited to reporting healthy when This plugin implements dynamic health checking. Currently this is limited to reporting healthy when
the API has synced. the API has synced.
## Watch
This plugin implements watch. A client that connects to CoreDNS using `coredns/client` can be notified
of changes to A, AAAA, and SRV records for Kubernetes services and endpoints.
## Examples ## Examples
Handle all queries in the `cluster.local` zone. Connect to Kubernetes in-cluster. Also handle all Handle all queries in the `cluster.local` zone. Connect to Kubernetes in-cluster. Also handle all
......
...@@ -8,13 +8,11 @@ import ( ...@@ -8,13 +8,11 @@ import (
"time" "time"
"github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/plugin/kubernetes/object"
dnswatch "github.com/coredns/coredns/plugin/pkg/watch"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
) )
...@@ -45,11 +43,6 @@ type dnsController interface { ...@@ -45,11 +43,6 @@ type dnsController interface {
// Modified returns the timestamp of the most recent changes // Modified returns the timestamp of the most recent changes
Modified() int64 Modified() int64
// Watch-related items
SetWatchChan(dnswatch.Chan)
Watch(string) error
StopWatching(string)
} }
type dnsControl struct { type dnsControl struct {
...@@ -79,9 +72,6 @@ type dnsControl struct { ...@@ -79,9 +72,6 @@ type dnsControl struct {
shutdown bool shutdown bool
stopCh chan struct{} stopCh chan struct{}
// watch-related items channel
watchChan dnswatch.Chan
watched map[string]struct{}
zones []string zones []string
endpointNameMode bool endpointNameMode bool
} }
...@@ -105,7 +95,6 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns ...@@ -105,7 +95,6 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
client: kubeClient, client: kubeClient,
selector: opts.selector, selector: opts.selector,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
watched: make(map[string]struct{}),
zones: opts.zones, zones: opts.zones,
endpointNameMode: opts.endpointNameMode, endpointNameMode: opts.endpointNameMode,
} }
...@@ -117,7 +106,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns ...@@ -117,7 +106,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
}, },
&api.Service{}, &api.Service{},
opts.resyncPeriod, opts.resyncPeriod,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.ResourceEventHandlerFuncs{},
cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc}, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc},
object.ToService, object.ToService,
) )
...@@ -130,7 +119,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns ...@@ -130,7 +119,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
}, },
&api.Pod{}, &api.Pod{},
opts.resyncPeriod, opts.resyncPeriod,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.ResourceEventHandlerFuncs{},
cache.Indexers{podIPIndex: podIPIndexFunc}, cache.Indexers{podIPIndex: podIPIndexFunc},
object.ToPod, object.ToPod,
) )
...@@ -144,7 +133,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns ...@@ -144,7 +133,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
}, },
&api.Endpoints{}, &api.Endpoints{},
opts.resyncPeriod, opts.resyncPeriod,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.ResourceEventHandlerFuncs{},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.ToEndpoints) object.ToEndpoints)
} }
...@@ -223,26 +212,6 @@ func podListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta ...@@ -223,26 +212,6 @@ func podListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta
} }
} }
func serviceWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
w, err := c.CoreV1().Services(ns).Watch(options)
return w, err
}
}
func podWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
w, err := c.CoreV1().Pods(ns).Watch(options)
return w, err
}
}
func endpointsListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { func endpointsListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil { if s != nil {
...@@ -253,16 +222,6 @@ func endpointsListFunc(c kubernetes.Interface, ns string, s labels.Selector) fun ...@@ -253,16 +222,6 @@ func endpointsListFunc(c kubernetes.Interface, ns string, s labels.Selector) fun
} }
} }
func endpointsWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
w, err := c.CoreV1().Endpoints(ns).Watch(options)
return w, err
}
}
func namespaceListFunc(c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { func namespaceListFunc(c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil { if s != nil {
...@@ -273,27 +232,6 @@ func namespaceListFunc(c kubernetes.Interface, s labels.Selector) func(meta.List ...@@ -273,27 +232,6 @@ func namespaceListFunc(c kubernetes.Interface, s labels.Selector) func(meta.List
} }
} }
func namespaceWatchFunc(c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
w, err := c.CoreV1().Namespaces().Watch(options)
return w, err
}
}
func (dns *dnsControl) SetWatchChan(c dnswatch.Chan) { dns.watchChan = c }
func (dns *dnsControl) StopWatching(qname string) { delete(dns.watched, qname) }
func (dns *dnsControl) Watch(qname string) error {
if dns.watchChan == nil {
return fmt.Errorf("cannot start watch because the channel has not been set")
}
dns.watched[qname] = struct{}{}
return nil
}
// Stop stops the controller. // Stop stops the controller.
func (dns *dnsControl) Stop() error { func (dns *dnsControl) Stop() error {
dns.stopLock.Lock() dns.stopLock.Lock()
......
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"github.com/coredns/coredns/plugin/etcd/msg" "github.com/coredns/coredns/plugin/etcd/msg"
"github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/coredns/coredns/plugin/test" "github.com/coredns/coredns/plugin/test"
"github.com/coredns/coredns/request" "github.com/coredns/coredns/request"
...@@ -85,9 +84,6 @@ func (external) Stop() error { return nil } ...@@ -85,9 +84,6 @@ func (external) Stop() error { return nil }
func (external) EpIndexReverse(string) []*object.Endpoints { return nil } func (external) EpIndexReverse(string) []*object.Endpoints { return nil }
func (external) SvcIndexReverse(string) []*object.Service { return nil } func (external) SvcIndexReverse(string) []*object.Service { return nil }
func (external) Modified() int64 { return 0 } func (external) Modified() int64 { return 0 }
func (external) SetWatchChan(watch.Chan) {}
func (external) Watch(string) error { return nil }
func (external) StopWatching(string) {}
func (external) EpIndex(s string) []*object.Endpoints { return nil } func (external) EpIndex(s string) []*object.Endpoints { return nil }
func (external) EndpointsList() []*object.Endpoints { return nil } func (external) EndpointsList() []*object.Endpoints { return nil }
func (external) GetNodeByName(name string) (*api.Node, error) { return nil, nil } func (external) GetNodeByName(name string) (*api.Node, error) { return nil, nil }
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/dnstest" "github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/coredns/coredns/plugin/test" "github.com/coredns/coredns/plugin/test"
"github.com/miekg/dns" "github.com/miekg/dns"
...@@ -441,9 +440,6 @@ func (APIConnServeTest) Stop() error { return nil ...@@ -441,9 +440,6 @@ func (APIConnServeTest) Stop() error { return nil
func (APIConnServeTest) EpIndexReverse(string) []*object.Endpoints { return nil } func (APIConnServeTest) EpIndexReverse(string) []*object.Endpoints { return nil }
func (APIConnServeTest) SvcIndexReverse(string) []*object.Service { return nil } func (APIConnServeTest) SvcIndexReverse(string) []*object.Service { return nil }
func (APIConnServeTest) Modified() int64 { return time.Now().Unix() } func (APIConnServeTest) Modified() int64 { return time.Now().Unix() }
func (APIConnServeTest) SetWatchChan(watch.Chan) {}
func (APIConnServeTest) Watch(string) error { return nil }
func (APIConnServeTest) StopWatching(string) {}
func (APIConnServeTest) PodIndex(string) []*object.Pod { func (APIConnServeTest) PodIndex(string) []*object.Pod {
a := []*object.Pod{ a := []*object.Pod{
......
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/coredns/coredns/request" "github.com/coredns/coredns/request"
"github.com/miekg/dns" "github.com/miekg/dns"
...@@ -66,9 +65,6 @@ func (APIConnServiceTest) PodIndex(string) []*object.Pod { return ni ...@@ -66,9 +65,6 @@ func (APIConnServiceTest) PodIndex(string) []*object.Pod { return ni
func (APIConnServiceTest) SvcIndexReverse(string) []*object.Service { return nil } func (APIConnServiceTest) SvcIndexReverse(string) []*object.Service { return nil }
func (APIConnServiceTest) EpIndexReverse(string) []*object.Endpoints { return nil } func (APIConnServiceTest) EpIndexReverse(string) []*object.Endpoints { return nil }
func (APIConnServiceTest) Modified() int64 { return 0 } func (APIConnServiceTest) Modified() int64 { return 0 }
func (APIConnServiceTest) SetWatchChan(watch.Chan) {}
func (APIConnServiceTest) Watch(string) error { return nil }
func (APIConnServiceTest) StopWatching(string) {}
func (APIConnServiceTest) SvcIndex(string) []*object.Service { func (APIConnServiceTest) SvcIndex(string) []*object.Service {
svcs := []*object.Service{ svcs := []*object.Service{
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"testing" "testing"
"github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/watch"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
) )
...@@ -20,9 +19,6 @@ func (APIConnTest) SvcIndexReverse(string) []*object.Service { return nil } ...@@ -20,9 +19,6 @@ func (APIConnTest) SvcIndexReverse(string) []*object.Service { return nil }
func (APIConnTest) EpIndex(string) []*object.Endpoints { return nil } func (APIConnTest) EpIndex(string) []*object.Endpoints { return nil }
func (APIConnTest) EndpointsList() []*object.Endpoints { return nil } func (APIConnTest) EndpointsList() []*object.Endpoints { return nil }
func (APIConnTest) Modified() int64 { return 0 } func (APIConnTest) Modified() int64 { return 0 }
func (APIConnTest) SetWatchChan(watch.Chan) {}
func (APIConnTest) Watch(string) error { return nil }
func (APIConnTest) StopWatching(string) {}
func (APIConnTest) ServiceList() []*object.Service { func (APIConnTest) ServiceList() []*object.Service {
svcs := []*object.Service{ svcs := []*object.Service{
......
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/dnstest" "github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/coredns/coredns/plugin/test" "github.com/coredns/coredns/plugin/test"
"github.com/miekg/dns" "github.com/miekg/dns"
...@@ -24,9 +23,6 @@ func (APIConnReverseTest) EpIndex(string) []*object.Endpoints { return nil } ...@@ -24,9 +23,6 @@ func (APIConnReverseTest) EpIndex(string) []*object.Endpoints { return nil }
func (APIConnReverseTest) EndpointsList() []*object.Endpoints { return nil } func (APIConnReverseTest) EndpointsList() []*object.Endpoints { return nil }
func (APIConnReverseTest) ServiceList() []*object.Service { return nil } func (APIConnReverseTest) ServiceList() []*object.Service { return nil }
func (APIConnReverseTest) Modified() int64 { return 0 } func (APIConnReverseTest) Modified() int64 { return 0 }
func (APIConnReverseTest) SetWatchChan(watch.Chan) {}
func (APIConnReverseTest) Watch(string) error { return nil }
func (APIConnReverseTest) StopWatching(string) {}
func (APIConnReverseTest) SvcIndex(svc string) []*object.Service { func (APIConnReverseTest) SvcIndex(svc string) []*object.Service {
if svc != "svc1.testns" { if svc != "svc1.testns" {
......
package kubernetes package kubernetes
import ( import (
"github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/watch"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
) )
// SetWatchChan implements watch.Watchable func serviceWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
func (k *Kubernetes) SetWatchChan(c watch.Chan) { return func(options meta.ListOptions) (watch.Interface, error) {
k.APIConn.SetWatchChan(c) if s != nil {
} options.LabelSelector = s.String()
// Watch is called when a watch is started for a name.
func (k *Kubernetes) Watch(qname string) error {
return k.APIConn.Watch(qname)
}
// StopWatching is called when no more watches remain for a name
func (k *Kubernetes) StopWatching(qname string) {
k.APIConn.StopWatching(qname)
}
var _ watch.Watchable = &Kubernetes{}
func (dns *dnsControl) sendServiceUpdates(s *object.Service) {
for i := range dns.zones {
name := serviceFQDN(s, dns.zones[i])
if _, ok := dns.watched[name]; ok {
dns.watchChan <- name
}
}
}
func (dns *dnsControl) sendPodUpdates(p *object.Pod) {
for i := range dns.zones {
name := podFQDN(p, dns.zones[i])
if _, ok := dns.watched[name]; ok {
dns.watchChan <- name
}
}
}
func (dns *dnsControl) sendEndpointsUpdates(ep *object.Endpoints) {
for _, zone := range dns.zones {
for _, name := range endpointFQDN(ep, zone, dns.endpointNameMode) {
if _, ok := dns.watched[name]; ok {
dns.watchChan <- name
}
}
name := serviceFQDN(ep, zone)
if _, ok := dns.watched[name]; ok {
dns.watchChan <- name
}
}
}
// endpointsSubsetDiffs returns an Endpoints struct containing the Subsets that have changed between a and b.
// When we notify clients of changed endpoints we only want to notify them of endpoints that have changed.
// The Endpoints API object holds more than one endpoint, held in a list of Subsets. Each Subset refers to
// an endpoint. So, here we create a new Endpoints struct, and populate it with only the endpoints that have changed.
// This new Endpoints object is later used to generate the list of endpoint FQDNs to send to the client.
// This function computes this literally by combining the sets (in a and not in b) union (in b and not in a).
func endpointsSubsetDiffs(a, b *object.Endpoints) *object.Endpoints {
c := b.CopyWithoutSubsets()
// In the following loop, the first iteration computes (in a but not in b).
// The second iteration then adds (in b but not in a)
// The end result is an Endpoints that only contains the subsets (endpoints) that are different between a and b.
for _, abba := range [][]*object.Endpoints{{a, b}, {b, a}} {
a := abba[0]
b := abba[1]
left:
for _, as := range a.Subsets {
for _, bs := range b.Subsets {
if subsetsEquivalent(as, bs) {
continue left
}
}
c.Subsets = append(c.Subsets, as)
} }
w, err := c.CoreV1().Services(ns).Watch(options)
return w, err
} }
return c
} }
// sendUpdates sends a notification to the server if a watch is enabled for the qname. func podWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
func (dns *dnsControl) sendUpdates(oldObj, newObj interface{}) { return func(options meta.ListOptions) (watch.Interface, error) {
// If both objects have the same resource version, they are identical. if s != nil {
if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) { options.LabelSelector = s.String()
return
}
obj := newObj
if obj == nil {
obj = oldObj
}
switch ob := obj.(type) {
case *object.Service:
dns.updateModifed()
if len(dns.watched) == 0 {
return
}
dns.sendServiceUpdates(ob)
case *object.Endpoints:
if newObj == nil || oldObj == nil {
dns.updateModifed()
if len(dns.watched) == 0 {
return
}
dns.sendEndpointsUpdates(ob)
return
}
p := oldObj.(*object.Endpoints)
// endpoint updates can come frequently, make sure it's a change we care about
if endpointsEquivalent(p, ob) {
return
} }
dns.updateModifed() w, err := c.CoreV1().Pods(ns).Watch(options)
if len(dns.watched) == 0 { return w, err
return
}
dns.sendEndpointsUpdates(endpointsSubsetDiffs(p, ob))
case *object.Pod:
dns.updateModifed()
if len(dns.watched) == 0 {
return
}
dns.sendPodUpdates(ob)
default:
log.Warningf("Updates for %T not supported.", ob)
} }
} }
func (dns *dnsControl) Add(obj interface{}) { dns.sendUpdates(nil, obj) } func endpointsWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
func (dns *dnsControl) Delete(obj interface{}) { dns.sendUpdates(obj, nil) } return func(options meta.ListOptions) (watch.Interface, error) {
func (dns *dnsControl) Update(oldObj, newObj interface{}) { dns.sendUpdates(oldObj, newObj) } if s != nil {
options.LabelSelector = s.String()
// subsetsEquivalent checks if two endpoint subsets are significantly equivalent
// I.e. that they have the same ready addresses, host names, ports (including protocol
// and service names for SRV)
func subsetsEquivalent(sa, sb object.EndpointSubset) bool {
if len(sa.Addresses) != len(sb.Addresses) {
return false
}
if len(sa.Ports) != len(sb.Ports) {
return false
}
// in Addresses and Ports, we should be able to rely on
// these being sorted and able to be compared
// they are supposed to be in a canonical format
for addr, aaddr := range sa.Addresses {
baddr := sb.Addresses[addr]
if aaddr.IP != baddr.IP {
return false
}
if aaddr.Hostname != baddr.Hostname {
return false
}
}
for port, aport := range sa.Ports {
bport := sb.Ports[port]
if aport.Name != bport.Name {
return false
}
if aport.Port != bport.Port {
return false
}
if aport.Protocol != bport.Protocol {
return false
} }
w, err := c.CoreV1().Endpoints(ns).Watch(options)
return w, err
} }
return true
} }
// endpointsEquivalent checks if the update to an endpoint is something func namespaceWatchFunc(c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
// that matters to us or if they are effectively equivalent. return func(options meta.ListOptions) (watch.Interface, error) {
func endpointsEquivalent(a, b *object.Endpoints) bool { if s != nil {
options.LabelSelector = s.String()
if len(a.Subsets) != len(b.Subsets) {
return false
}
// we should be able to rely on
// these being sorted and able to be compared
// they are supposed to be in a canonical format
for i, sa := range a.Subsets {
sb := b.Subsets[i]
if !subsetsEquivalent(sa, sb) {
return false
} }
w, err := c.CoreV1().Namespaces().Watch(options)
return w, err
} }
return true
} }
package kubernetes
import (
"strconv"
"strings"
"testing"
"github.com/coredns/coredns/plugin/kubernetes/object"
)
func endpointSubsets(addrs ...string) (eps []object.EndpointSubset) {
for _, ap := range addrs {
apa := strings.Split(ap, ":")
address := apa[0]
port, _ := strconv.Atoi(apa[1])
eps = append(eps, object.EndpointSubset{Addresses: []object.EndpointAddress{{IP: address}}, Ports: []object.EndpointPort{{Port: int32(port)}}})
}
return eps
}
func TestEndpointsSubsetDiffs(t *testing.T) {
var tests = []struct {
a, b, expected object.Endpoints
}{
{ // From a->b: Nothing changes
object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
object.Endpoints{},
},
{ // From a->b: Everything goes away
object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
object.Endpoints{},
object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
},
{ // From a->b: Everything is new
object.Endpoints{},
object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
},
{ // From a->b: One goes away, one is new
object.Endpoints{Subsets: endpointSubsets("10.0.0.2:8080")},
object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80")},
object.Endpoints{Subsets: endpointSubsets("10.0.0.2:8080", "10.0.0.1:80")},
},
}
for i, te := range tests {
got := endpointsSubsetDiffs(&te.a, &te.b)
if !endpointsEquivalent(got, &te.expected) {
t.Errorf("Expected '%v' for test %v, got '%v'.", te.expected, i, got)
}
}
}
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/dnstest" "github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/test" "github.com/coredns/coredns/plugin/test"
...@@ -142,88 +141,3 @@ func difference(testRRs []dns.RR, gotRRs []dns.RR) []dns.RR { ...@@ -142,88 +141,3 @@ func difference(testRRs []dns.RR, gotRRs []dns.RR) []dns.RR {
} }
return foundRRs return foundRRs
} }
func TestEndpointsEquivalent(t *testing.T) {
epA := object.Endpoints{
Subsets: []object.EndpointSubset{{
Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
}},
}
epB := object.Endpoints{
Subsets: []object.EndpointSubset{{
Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
}},
}
epC := object.Endpoints{
Subsets: []object.EndpointSubset{{
Addresses: []object.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}},
}},
}
epD := object.Endpoints{
Subsets: []object.EndpointSubset{{
Addresses: []object.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}},
},
{
Addresses: []object.EndpointAddress{{IP: "1.2.2.2", Hostname: "foofoo"}},
}},
}
epE := object.Endpoints{
Subsets: []object.EndpointSubset{{
Addresses: []object.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}, {IP: "1.1.1.1"}},
}},
}
epF := object.Endpoints{
Subsets: []object.EndpointSubset{{
Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foofoo"}},
}},
}
epG := object.Endpoints{
Subsets: []object.EndpointSubset{{
Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
Ports: []object.EndpointPort{{Name: "http", Port: 80, Protocol: "TCP"}},
}},
}
epH := object.Endpoints{
Subsets: []object.EndpointSubset{{
Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
Ports: []object.EndpointPort{{Name: "newportname", Port: 80, Protocol: "TCP"}},
}},
}
epI := object.Endpoints{
Subsets: []object.EndpointSubset{{
Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
Ports: []object.EndpointPort{{Name: "http", Port: 8080, Protocol: "TCP"}},
}},
}
epJ := object.Endpoints{
Subsets: []object.EndpointSubset{{
Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
Ports: []object.EndpointPort{{Name: "http", Port: 80, Protocol: "UDP"}},
}},
}
tests := []struct {
equiv bool
a *object.Endpoints
b *object.Endpoints
}{
{true, &epA, &epB},
{false, &epA, &epC},
{false, &epA, &epD},
{false, &epA, &epE},
{false, &epA, &epF},
{false, &epF, &epG},
{false, &epG, &epH},
{false, &epG, &epI},
{false, &epG, &epJ},
}
for i, tc := range tests {
if tc.equiv && !endpointsEquivalent(tc.a, tc.b) {
t.Errorf("Test %d: expected endpoints to be equivalent and they are not.", i)
}
if !tc.equiv && endpointsEquivalent(tc.a, tc.b) {
t.Errorf("Test %d: expected endpoints to be seen as different but they were not.", i)
}
}
}
package watch
// Chan is used to inform the server of a change. Whenever
// a watched FQDN has a change in data, that FQDN should be
// sent down this channel.
type Chan chan string
// Watchable is the interface watchable plugins should implement
type Watchable interface {
// Name returns the plugin name.
Name() string
// SetWatchChan is called when the watch channel is created.
SetWatchChan(Chan)
// Watch is called whenever a watch is created for a FQDN. Plugins
// should send the FQDN down the watch channel when its data may have
// changed. This is an exact match only.
Watch(qname string) error
// StopWatching is called whenever all watches are canceled for a FQDN.
StopWatching(qname string)
}
package watch
import (
"fmt"
"io"
"sync"
"github.com/miekg/dns"
"github.com/coredns/coredns/pb"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/request"
)
// Watcher handles watch creation, cancellation, and processing.
type Watcher interface {
// Watch monitors a client stream and creates and cancels watches.
Watch(pb.DnsService_WatchServer) error
// Stop cancels open watches and stops the watch processing go routine.
Stop()
}
// Manager contains all the data needed to manage watches
type Manager struct {
changes Chan
stopper chan bool
counter int64
watches map[string]watchlist
plugins []Watchable
mutex sync.Mutex
}
type watchlist map[int64]pb.DnsService_WatchServer
// NewWatcher creates a Watcher, which is used to manage watched names.
func NewWatcher(plugins []Watchable) *Manager {
w := &Manager{changes: make(Chan), stopper: make(chan bool), watches: make(map[string]watchlist), plugins: plugins}
for _, p := range plugins {
p.SetWatchChan(w.changes)
}
go w.process()
return w
}
func (w *Manager) nextID() int64 {
w.mutex.Lock()
w.counter++
id := w.counter
w.mutex.Unlock()
return id
}
// Watch monitors a client stream and creates and cancels watches.
func (w *Manager) Watch(stream pb.DnsService_WatchServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
create := in.GetCreateRequest()
if create != nil {
msg := new(dns.Msg)
err := msg.Unpack(create.Query.Msg)
if err != nil {
log.Warningf("Could not decode watch request: %s\n", err)
stream.Send(&pb.WatchResponse{Err: "could not decode request"})
continue
}
id := w.nextID()
if err := stream.Send(&pb.WatchResponse{WatchId: id, Created: true}); err != nil {
// if we fail to notify client of watch creation, don't create the watch
continue
}
// Normalize qname
qname := (&request.Request{Req: msg}).Name()
w.mutex.Lock()
if _, ok := w.watches[qname]; !ok {
w.watches[qname] = make(watchlist)
}
w.watches[qname][id] = stream
w.mutex.Unlock()
for _, p := range w.plugins {
err := p.Watch(qname)
if err != nil {
log.Warningf("Failed to start watch for %s in plugin %s: %s\n", qname, p.Name(), err)
stream.Send(&pb.WatchResponse{Err: fmt.Sprintf("failed to start watch for %s in plugin %s", qname, p.Name())})
}
}
continue
}
cancel := in.GetCancelRequest()
if cancel != nil {
w.mutex.Lock()
for qname, wl := range w.watches {
ws, ok := wl[cancel.WatchId]
if !ok {
continue
}
// only allow cancels from the client that started it
// TODO: test what happens if a stream tries to cancel a watchID that it doesn't own
if ws != stream {
continue
}
delete(wl, cancel.WatchId)
// if there are no more watches for this qname, we should tell the plugins
if len(wl) == 0 {
for _, p := range w.plugins {
p.StopWatching(qname)
}
delete(w.watches, qname)
}
// let the client know we canceled the watch
stream.Send(&pb.WatchResponse{WatchId: cancel.WatchId, Canceled: true})
}
w.mutex.Unlock()
continue
}
}
}
func (w *Manager) process() {
for {
select {
case <-w.stopper:
return
case changed := <-w.changes:
w.mutex.Lock()
for qname, wl := range w.watches {
if plugin.Zones([]string{changed}).Matches(qname) == "" {
continue
}
for id, stream := range wl {
wr := pb.WatchResponse{WatchId: id, Qname: qname}
err := stream.Send(&wr)
if err != nil {
log.Warningf("Error sending change for %s to watch %d: %s. Removing watch.\n", qname, id, err)
delete(w.watches[qname], id)
}
}
}
w.mutex.Unlock()
}
}
}
// Stop cancels open watches and stops the watch processing go routine.
func (w *Manager) Stop() {
w.stopper <- true
w.mutex.Lock()
for wn, wl := range w.watches {
for id, stream := range wl {
wr := pb.WatchResponse{WatchId: id, Canceled: true}
err := stream.Send(&wr)
if err != nil {
log.Warningf("Error notifying client of cancellation: %s\n", err)
}
}
delete(w.watches, wn)
}
w.mutex.Unlock()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment