Commit ebbfffaf authored by Yong Tang's avatar Yong Tang Committed by GitHub

Update k8s.io/[api|apimachinery|client-go] to v0.18.0 (#3796)

* Update k8s.io/[api|apimachinery|client-go] to v0.18.0

This PR updates k8s.io/[api|apimachinery|client-go] to v0.18.0

This PR closes 3791
This PR closes 3792
This PR closes 3793
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>

* Fix test failures
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>

* Fix failed tests
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>

* Fix test failure
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>
parent 8bbfa192
...@@ -40,8 +40,8 @@ require ( ...@@ -40,8 +40,8 @@ require (
google.golang.org/genproto v0.0.0-20200306153348-d950eab6f860 // indirect google.golang.org/genproto v0.0.0-20200306153348-d950eab6f860 // indirect
google.golang.org/grpc v1.28.0 google.golang.org/grpc v1.28.0
gopkg.in/DataDog/dd-trace-go.v1 v1.22.0 gopkg.in/DataDog/dd-trace-go.v1 v1.22.0
k8s.io/api v0.17.4 k8s.io/api v0.18.0
k8s.io/apimachinery v0.17.4 k8s.io/apimachinery v0.18.0
k8s.io/client-go v0.17.4 k8s.io/client-go v0.18.0
k8s.io/klog v1.0.0 k8s.io/klog v1.0.0
) )
This diff is collapsed.
...@@ -159,7 +159,7 @@ func (external) SvcIndexReverse(string) []*object.Service { return nil } ...@@ -159,7 +159,7 @@ func (external) SvcIndexReverse(string) []*object.Service { return nil }
func (external) Modified() int64 { return 0 } func (external) Modified() int64 { return 0 }
func (external) EpIndex(s string) []*object.Endpoints { return nil } func (external) EpIndex(s string) []*object.Endpoints { return nil }
func (external) EndpointsList() []*object.Endpoints { return nil } func (external) EndpointsList() []*object.Endpoints { return nil }
func (external) GetNodeByName(name string) (*api.Node, error) { return nil, nil } func (external) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return nil, nil }
func (external) SvcIndex(s string) []*object.Service { return svcIndexExternal[s] } func (external) SvcIndex(s string) []*object.Service { return svcIndexExternal[s] }
func (external) PodIndex(string) []*object.Pod { return nil } func (external) PodIndex(string) []*object.Pod { return nil }
......
package kubernetes package kubernetes
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
...@@ -34,7 +35,7 @@ type dnsController interface { ...@@ -34,7 +35,7 @@ type dnsController interface {
EpIndex(string) []*object.Endpoints EpIndex(string) []*object.Endpoints
EpIndexReverse(string) []*object.Endpoints EpIndexReverse(string) []*object.Endpoints
GetNodeByName(string) (*api.Node, error) GetNodeByName(context.Context, string) (*api.Node, error)
GetNamespaceByName(string) (*api.Namespace, error) GetNamespaceByName(string) (*api.Namespace, error)
Run() Run()
...@@ -94,7 +95,7 @@ type dnsControlOpts struct { ...@@ -94,7 +95,7 @@ type dnsControlOpts struct {
} }
// newDNSController creates a controller for CoreDNS. // newDNSController creates a controller for CoreDNS.
func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dnsControl { func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts dnsControlOpts) *dnsControl {
dns := dnsControl{ dns := dnsControl{
client: kubeClient, client: kubeClient,
selector: opts.selector, selector: opts.selector,
...@@ -106,8 +107,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns ...@@ -106,8 +107,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
dns.svcLister, dns.svcController = object.NewIndexerInformer( dns.svcLister, dns.svcController = object.NewIndexerInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: serviceListFunc(dns.client, api.NamespaceAll, dns.selector), ListFunc: serviceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: serviceWatchFunc(dns.client, api.NamespaceAll, dns.selector), WatchFunc: serviceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
}, },
&api.Service{}, &api.Service{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
...@@ -118,8 +119,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns ...@@ -118,8 +119,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
if opts.initPodCache { if opts.initPodCache {
dns.podLister, dns.podController = object.NewIndexerInformer( dns.podLister, dns.podController = object.NewIndexerInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: podListFunc(dns.client, api.NamespaceAll, dns.selector), ListFunc: podListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: podWatchFunc(dns.client, api.NamespaceAll, dns.selector), WatchFunc: podWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
}, },
&api.Pod{}, &api.Pod{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
...@@ -131,8 +132,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns ...@@ -131,8 +132,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
if opts.initEndpointsCache { if opts.initEndpointsCache {
dns.epLister, dns.epController = object.NewIndexerInformer( dns.epLister, dns.epController = object.NewIndexerInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: endpointsListFunc(dns.client, api.NamespaceAll, dns.selector), ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector), WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
}, },
&api.Endpoints{}, &api.Endpoints{},
cache.ResourceEventHandlerFuncs{}, cache.ResourceEventHandlerFuncs{},
...@@ -183,8 +184,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns ...@@ -183,8 +184,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
dns.nsLister, dns.nsController = cache.NewInformer( dns.nsLister, dns.nsController = cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: namespaceListFunc(dns.client, dns.namespaceSelector), ListFunc: namespaceListFunc(ctx, dns.client, dns.namespaceSelector),
WatchFunc: namespaceWatchFunc(dns.client, dns.namespaceSelector), WatchFunc: namespaceWatchFunc(ctx, dns.client, dns.namespaceSelector),
}, },
&api.Namespace{}, &api.Namespace{},
defaultResyncPeriod, defaultResyncPeriod,
...@@ -237,17 +238,17 @@ func epIPIndexFunc(obj interface{}) ([]string, error) { ...@@ -237,17 +238,17 @@ func epIPIndexFunc(obj interface{}) ([]string, error) {
return ep.IndexIP, nil return ep.IndexIP, nil
} }
func serviceListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil { if s != nil {
opts.LabelSelector = s.String() opts.LabelSelector = s.String()
} }
listV1, err := c.CoreV1().Services(ns).List(opts) listV1, err := c.CoreV1().Services(ns).List(ctx, opts)
return listV1, err return listV1, err
} }
} }
func podListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil { if s != nil {
opts.LabelSelector = s.String() opts.LabelSelector = s.String()
...@@ -256,27 +257,27 @@ func podListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta ...@@ -256,27 +257,27 @@ func podListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta
opts.FieldSelector = opts.FieldSelector + "," opts.FieldSelector = opts.FieldSelector + ","
} }
opts.FieldSelector = opts.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown" opts.FieldSelector = opts.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
listV1, err := c.CoreV1().Pods(ns).List(opts) listV1, err := c.CoreV1().Pods(ns).List(ctx, opts)
return listV1, err return listV1, err
} }
} }
func endpointsListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil { if s != nil {
opts.LabelSelector = s.String() opts.LabelSelector = s.String()
} }
listV1, err := c.CoreV1().Endpoints(ns).List(opts) listV1, err := c.CoreV1().Endpoints(ns).List(ctx, opts)
return listV1, err return listV1, err
} }
} }
func namespaceListFunc(c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil { if s != nil {
opts.LabelSelector = s.String() opts.LabelSelector = s.String()
} }
listV1, err := c.CoreV1().Namespaces().List(opts) listV1, err := c.CoreV1().Namespaces().List(ctx, opts)
return listV1, err return listV1, err
} }
} }
...@@ -428,8 +429,8 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) { ...@@ -428,8 +429,8 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
// GetNodeByName return the node by name. If nothing is found an error is // GetNodeByName return the node by name. If nothing is found an error is
// returned. This query causes a roundtrip to the k8s API server, so use // returned. This query causes a roundtrip to the k8s API server, so use
// sparingly. Currently this is only used for Federation. // sparingly. Currently this is only used for Federation.
func (dns *dnsControl) GetNodeByName(name string) (*api.Node, error) { func (dns *dnsControl) GetNodeByName(ctx context.Context, name string) (*api.Node, error) {
v1node, err := dns.client.CoreV1().Nodes().Get(name, meta.GetOptions{}) v1node, err := dns.client.CoreV1().Nodes().Get(ctx, name, meta.GetOptions{})
return v1node, err return v1node, err
} }
......
...@@ -29,7 +29,8 @@ func BenchmarkController(b *testing.B) { ...@@ -29,7 +29,8 @@ func BenchmarkController(b *testing.B) {
dco := dnsControlOpts{ dco := dnsControlOpts{
zones: []string{"cluster.local."}, zones: []string{"cluster.local."},
} }
controller := newdnsController(client, dco) ctx := context.Background()
controller := newdnsController(ctx, client, dco)
cidr := "10.0.0.0/19" cidr := "10.0.0.0/19"
// Add resources // Add resources
...@@ -39,7 +40,6 @@ func BenchmarkController(b *testing.B) { ...@@ -39,7 +40,6 @@ func BenchmarkController(b *testing.B) {
m.SetQuestion("svc1.testns.svc.cluster.local.", dns.TypeA) m.SetQuestion("svc1.testns.svc.cluster.local.", dns.TypeA)
k := New([]string{"cluster.local."}) k := New([]string{"cluster.local."})
k.APIConn = controller k.APIConn = controller
ctx := context.Background()
rw := &test.ResponseWriter{} rw := &test.ResponseWriter{}
b.ResetTimer() b.ResetTimer()
...@@ -70,6 +70,7 @@ func generateEndpoints(cidr string, client kubernetes.Interface) { ...@@ -70,6 +70,7 @@ func generateEndpoints(cidr string, client kubernetes.Interface) {
Namespace: "testns", Namespace: "testns",
}, },
} }
ctx := context.TODO()
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
ep.Subsets[0].Addresses = []api.EndpointAddress{ ep.Subsets[0].Addresses = []api.EndpointAddress{
{ {
...@@ -78,7 +79,7 @@ func generateEndpoints(cidr string, client kubernetes.Interface) { ...@@ -78,7 +79,7 @@ func generateEndpoints(cidr string, client kubernetes.Interface) {
}, },
} }
ep.ObjectMeta.Name = "svc" + strconv.Itoa(count) ep.ObjectMeta.Name = "svc" + strconv.Itoa(count)
client.CoreV1().Endpoints("testns").Create(ep) client.CoreV1().Endpoints("testns").Create(ctx, ep, meta.CreateOptions{})
count++ count++
} }
} }
...@@ -121,7 +122,8 @@ func generateSvcs(cidr string, svcType string, client kubernetes.Interface) { ...@@ -121,7 +122,8 @@ func generateSvcs(cidr string, svcType string, client kubernetes.Interface) {
} }
func createClusterIPSvc(suffix int, client kubernetes.Interface, ip net.IP) { func createClusterIPSvc(suffix int, client kubernetes.Interface, ip net.IP) {
client.CoreV1().Services("testns").Create(&api.Service{ ctx := context.TODO()
client.CoreV1().Services("testns").Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{ ObjectMeta: meta.ObjectMeta{
Name: "svc" + strconv.Itoa(suffix), Name: "svc" + strconv.Itoa(suffix),
Namespace: "testns", Namespace: "testns",
...@@ -134,11 +136,12 @@ func createClusterIPSvc(suffix int, client kubernetes.Interface, ip net.IP) { ...@@ -134,11 +136,12 @@ func createClusterIPSvc(suffix int, client kubernetes.Interface, ip net.IP) {
Port: 80, Port: 80,
}}, }},
}, },
}) }, meta.CreateOptions{})
} }
func createHeadlessSvc(suffix int, client kubernetes.Interface, ip net.IP) { func createHeadlessSvc(suffix int, client kubernetes.Interface, ip net.IP) {
client.CoreV1().Services("testns").Create(&api.Service{ ctx := context.TODO()
client.CoreV1().Services("testns").Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{ ObjectMeta: meta.ObjectMeta{
Name: "hdls" + strconv.Itoa(suffix), Name: "hdls" + strconv.Itoa(suffix),
Namespace: "testns", Namespace: "testns",
...@@ -146,11 +149,12 @@ func createHeadlessSvc(suffix int, client kubernetes.Interface, ip net.IP) { ...@@ -146,11 +149,12 @@ func createHeadlessSvc(suffix int, client kubernetes.Interface, ip net.IP) {
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
ClusterIP: api.ClusterIPNone, ClusterIP: api.ClusterIPNone,
}, },
}) }, meta.CreateOptions{})
} }
func createExternalSvc(suffix int, client kubernetes.Interface, ip net.IP) { func createExternalSvc(suffix int, client kubernetes.Interface, ip net.IP) {
client.CoreV1().Services("testns").Create(&api.Service{ ctx := context.TODO()
client.CoreV1().Services("testns").Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{ ObjectMeta: meta.ObjectMeta{
Name: "external" + strconv.Itoa(suffix), Name: "external" + strconv.Itoa(suffix),
Namespace: "testns", Namespace: "testns",
...@@ -164,5 +168,5 @@ func createExternalSvc(suffix int, client kubernetes.Interface, ip net.IP) { ...@@ -164,5 +168,5 @@ func createExternalSvc(suffix int, client kubernetes.Interface, ip net.IP) {
}}, }},
Type: api.ServiceTypeExternalName, Type: api.ServiceTypeExternalName,
}, },
}) }, meta.CreateOptions{})
} }
package kubernetes package kubernetes
import ( import (
"context"
"testing" "testing"
"github.com/coredns/coredns/plugin/etcd/msg" "github.com/coredns/coredns/plugin/etcd/msg"
...@@ -86,7 +87,7 @@ func (external) SvcIndexReverse(string) []*object.Service { return nil } ...@@ -86,7 +87,7 @@ func (external) SvcIndexReverse(string) []*object.Service { return nil }
func (external) Modified() int64 { return 0 } func (external) Modified() int64 { return 0 }
func (external) EpIndex(s string) []*object.Endpoints { return nil } func (external) EpIndex(s string) []*object.Endpoints { return nil }
func (external) EndpointsList() []*object.Endpoints { return nil } func (external) EndpointsList() []*object.Endpoints { return nil }
func (external) GetNodeByName(name string) (*api.Node, error) { return nil, nil } func (external) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return nil, nil }
func (external) SvcIndex(s string) []*object.Service { return svcIndexExternal[s] } func (external) SvcIndex(s string) []*object.Service { return svcIndexExternal[s] }
func (external) PodIndex(string) []*object.Pod { return nil } func (external) PodIndex(string) []*object.Pod { return nil }
......
...@@ -704,7 +704,7 @@ func (APIConnServeTest) EndpointsList() []*object.Endpoints { ...@@ -704,7 +704,7 @@ func (APIConnServeTest) EndpointsList() []*object.Endpoints {
return eps return eps
} }
func (APIConnServeTest) GetNodeByName(name string) (*api.Node, error) { func (APIConnServeTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) {
return &api.Node{ return &api.Node{
ObjectMeta: meta.ObjectMeta{ ObjectMeta: meta.ObjectMeta{
Name: "test.node.foo.bar", Name: "test.node.foo.bar",
......
...@@ -212,7 +212,7 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) { ...@@ -212,7 +212,7 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
} }
// InitKubeCache initializes a new Kubernetes cache. // InitKubeCache initializes a new Kubernetes cache.
func (k *Kubernetes) InitKubeCache() (err error) { func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
config, err := k.getClientConfig() config, err := k.getClientConfig()
if err != nil { if err != nil {
return err return err
...@@ -245,7 +245,7 @@ func (k *Kubernetes) InitKubeCache() (err error) { ...@@ -245,7 +245,7 @@ func (k *Kubernetes) InitKubeCache() (err error) {
k.opts.zones = k.Zones k.opts.zones = k.Zones
k.opts.endpointNameMode = k.endpointNameMode k.opts.endpointNameMode = k.endpointNameMode
k.APIConn = newdnsController(kubeClient, k.opts) k.APIConn = newdnsController(ctx, kubeClient, k.opts)
return err return err
} }
......
...@@ -238,7 +238,7 @@ func (APIConnServiceTest) EndpointsList() []*object.Endpoints { ...@@ -238,7 +238,7 @@ func (APIConnServiceTest) EndpointsList() []*object.Endpoints {
return eps return eps
} }
func (APIConnServiceTest) GetNodeByName(name string) (*api.Node, error) { func (APIConnServiceTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) {
return &api.Node{ return &api.Node{
ObjectMeta: meta.ObjectMeta{ ObjectMeta: meta.ObjectMeta{
Name: "test.node.foo.bar", Name: "test.node.foo.bar",
......
package kubernetes package kubernetes
import ( import (
"context"
"strings" "strings"
"testing" "testing"
"time" "time"
...@@ -22,7 +23,8 @@ const ( ...@@ -22,7 +23,8 @@ const (
func TestDnsProgrammingLatency(t *testing.T) { func TestDnsProgrammingLatency(t *testing.T) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
now := time.Now() now := time.Now()
controller := newdnsController(client, dnsControlOpts{ ctx := context.TODO()
controller := newdnsController(ctx, client, dnsControlOpts{
initEndpointsCache: true, initEndpointsCache: true,
// This is needed as otherwise the fake k8s client doesn't work properly. // This is needed as otherwise the fake k8s client doesn't work properly.
skipAPIObjectsCleanup: true, skipAPIObjectsCleanup: true,
...@@ -104,24 +106,27 @@ func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []ap ...@@ -104,24 +106,27 @@ func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []ap
} }
func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) { func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) {
_, err := client.CoreV1().Endpoints(namespace).Create(buildEndpoints(name, triggerTime, subsets)) ctx := context.TODO()
_, err := client.CoreV1().Endpoints(namespace).Create(ctx, buildEndpoints(name, triggerTime, subsets), meta.CreateOptions{})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) { func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) {
_, err := client.CoreV1().Endpoints(namespace).Update(buildEndpoints(name, triggerTime, subsets)) ctx := context.TODO()
_, err := client.CoreV1().Endpoints(namespace).Update(ctx, buildEndpoints(name, triggerTime, subsets), meta.UpdateOptions{})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) { func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) {
if _, err := client.CoreV1().Services(namespace).Create(&api.Service{ ctx := context.TODO()
if _, err := client.CoreV1().Services(namespace).Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name}, ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name},
Spec: api.ServiceSpec{ClusterIP: clusterIp}, Spec: api.ServiceSpec{ClusterIP: clusterIp},
}); err != nil { }, meta.CreateOptions{}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
......
package kubernetes package kubernetes
import ( import (
"context"
"net" "net"
"testing" "testing"
...@@ -102,7 +103,7 @@ func (APIConnTest) EpIndexReverse(ip string) []*object.Endpoints { ...@@ -102,7 +103,7 @@ func (APIConnTest) EpIndexReverse(ip string) []*object.Endpoints {
return eps return eps
} }
func (APIConnTest) GetNodeByName(name string) (*api.Node, error) { return &api.Node{}, nil } func (APIConnTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return &api.Node{}, nil }
func (APIConnTest) GetNamespaceByName(name string) (*api.Namespace, error) { func (APIConnTest) GetNamespaceByName(name string) (*api.Namespace, error) {
return &api.Namespace{}, nil return &api.Namespace{}, nil
} }
......
...@@ -103,7 +103,7 @@ func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints { ...@@ -103,7 +103,7 @@ func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints {
return nil return nil
} }
func (APIConnReverseTest) GetNodeByName(name string) (*api.Node, error) { func (APIConnReverseTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) {
return &api.Node{ return &api.Node{
ObjectMeta: meta.ObjectMeta{ ObjectMeta: meta.ObjectMeta{
Name: "test.node.foo.bar", Name: "test.node.foo.bar",
......
package kubernetes package kubernetes
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"os" "os"
...@@ -38,7 +39,7 @@ func setup(c *caddy.Controller) error { ...@@ -38,7 +39,7 @@ func setup(c *caddy.Controller) error {
return plugin.Error("kubernetes", err) return plugin.Error("kubernetes", err)
} }
err = k.InitKubeCache() err = k.InitKubeCache(context.Background())
if err != nil { if err != nil {
return plugin.Error("kubernetes", err) return plugin.Error("kubernetes", err)
} }
......
package kubernetes package kubernetes
import ( import (
"context"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )
func serviceWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil { if s != nil {
options.LabelSelector = s.String() options.LabelSelector = s.String()
} }
w, err := c.CoreV1().Services(ns).Watch(options) w, err := c.CoreV1().Services(ns).Watch(ctx, options)
return w, err return w, err
} }
} }
func podWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil { if s != nil {
options.LabelSelector = s.String() options.LabelSelector = s.String()
...@@ -26,27 +28,27 @@ func podWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(opt ...@@ -26,27 +28,27 @@ func podWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(opt
options.FieldSelector = options.FieldSelector + "," options.FieldSelector = options.FieldSelector + ","
} }
options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown" options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
w, err := c.CoreV1().Pods(ns).Watch(options) w, err := c.CoreV1().Pods(ns).Watch(ctx, options)
return w, err return w, err
} }
} }
func endpointsWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil { if s != nil {
options.LabelSelector = s.String() options.LabelSelector = s.String()
} }
w, err := c.CoreV1().Endpoints(ns).Watch(options) w, err := c.CoreV1().Endpoints(ns).Watch(ctx, options)
return w, err return w, err
} }
} }
func namespaceWatchFunc(c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil { if s != nil {
options.LabelSelector = s.String() options.LabelSelector = s.String()
} }
w, err := c.CoreV1().Namespaces().Watch(options) w, err := c.CoreV1().Namespaces().Watch(ctx, options)
return w, err return w, err
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment