Commit 26d8680a authored by Yong Tang's avatar Yong Tang Committed by GitHub

Support multiple k8s api servers specification and load balance among api servers (#820)

* Support multiple k8s api servers specification and load balance among api servers

This fix adds supports for multiple k8s api servers specification,
load balance among api servers.

When two or more api servers are specified in kubernetes block (endpoint ...),
a proxy is created locally (with randomly generately port). The coredns
will points to the generated proxy so that load balancing could be achieved.
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>

* Setup initial healthcheck at the beginning
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>

* Update README.md for kubernetes middleware and remove whitespaces.
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>

* Use middleware/pkg/healthcheck in middleware/kubernetes

for api proxy
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>
parent acfa6501
......@@ -27,6 +27,9 @@ kubernetes ZONE [ZONE...] [
* `resyncperiod` specifies the Kubernetes data API **DURATION** period.
* `endpoint` specifies the **URL** for a remove k8s API endpoint.
If omitted, it will connect to k8s in-cluster using the cluster service account.
Multiple k8s API endpoints could be specified, separated by `,`s, e.g.
`endpoint http://k8s-endpoint1:8080,http://k8s-endpoint2:8080`. CoreDNS
will automatically perform a healthcheck and proxy to the healthy k8s API endpoint.
* `tls` **CERT** **KEY** **CACERT** are the TLS cert, key and the CA cert file names for remote k8s connection.
This option is ignored if connecting in-cluster (i.e. endpoint is not specified).
* `namespaces` **NAMESPACE [NAMESPACE...]**, exposed only the k8s namespaces listed.
......
package kubernetes
import (
"fmt"
"io"
"log"
"net"
"net/http"
"github.com/coredns/coredns/middleware/pkg/healthcheck"
)
type proxyHandler struct {
healthcheck.HealthCheck
}
type apiProxy struct {
http.Server
listener net.Listener
handler proxyHandler
}
func (p *proxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
upstream := p.Select()
network := "tcp"
if upstream.Network != "" {
network = upstream.Network
}
address := upstream.Name
d, err := net.Dial(network, address)
if err != nil {
log.Printf("[ERROR] Unable to establish connection to upstream %s://%s: %s", network, address, err)
http.Error(w, fmt.Sprintf("Unable to establish connection to upstream %s://%s: %s", network, address, err), 500)
return
}
hj, ok := w.(http.Hijacker)
if !ok {
log.Printf("[ERROR] Unable to establish connection: no hijacker")
http.Error(w, "Unable to establish connection: no hijacker", 500)
return
}
nc, _, err := hj.Hijack()
if err != nil {
log.Printf("[ERROR] Unable to hijack connection: %s", err)
http.Error(w, fmt.Sprintf("Unable to hijack connection: %s", err), 500)
return
}
defer nc.Close()
defer d.Close()
err = r.Write(d)
if err != nil {
log.Printf("[ERROR] Unable to copy connection to upstream %s://%s: %s", network, address, err)
http.Error(w, fmt.Sprintf("Unable to copy connection to upstream %s://%s: %s", network, address, err), 500)
return
}
errChan := make(chan error, 2)
cp := func(dst io.Writer, src io.Reader) {
_, err := io.Copy(dst, src)
errChan <- err
}
go cp(d, nc)
go cp(nc, d)
<-errChan
}
func (p *apiProxy) Run() {
p.handler.Start()
p.Serve(p.listener)
}
func (p *apiProxy) Stop() {
p.handler.Stop()
p.Close()
}
......@@ -7,11 +7,13 @@ import (
"log"
"net"
"strings"
"sync/atomic"
"time"
"github.com/coredns/coredns/middleware"
"github.com/coredns/coredns/middleware/etcd/msg"
"github.com/coredns/coredns/middleware/pkg/dnsutil"
"github.com/coredns/coredns/middleware/pkg/healthcheck"
dnsstrings "github.com/coredns/coredns/middleware/pkg/strings"
"github.com/coredns/coredns/middleware/proxy"
"github.com/coredns/coredns/request"
......@@ -32,7 +34,8 @@ type Kubernetes struct {
Zones []string
primaryZone int
Proxy proxy.Proxy // Proxy for looking up names during the resolution process
APIEndpoint string
APIServerList []string
APIProxy *apiProxy
APICertAuth string
APIClientCert string
APIClientKey string
......@@ -173,8 +176,62 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
overrides := &clientcmd.ConfigOverrides{}
clusterinfo := clientcmdapi.Cluster{}
authinfo := clientcmdapi.AuthInfo{}
if len(k.APIEndpoint) > 0 {
clusterinfo.Server = k.APIEndpoint
if len(k.APIServerList) > 0 {
endpoint := k.APIServerList[0]
if len(k.APIServerList) > 1 {
// Use a random port for api proxy, will get the value later through listener.Addr()
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes api proxy: %v", err)
}
k.APIProxy = &apiProxy{
listener: listener,
handler: proxyHandler{
HealthCheck: healthcheck.HealthCheck{
FailTimeout: 3 * time.Second,
MaxFails: 1,
Future: 10 * time.Second,
Path: "/",
Interval: 5 * time.Second,
},
},
}
k.APIProxy.handler.Hosts = make([]*healthcheck.UpstreamHost, len(k.APIServerList))
for i, entry := range k.APIServerList {
uh := &healthcheck.UpstreamHost{
Name: strings.TrimPrefix(entry, "http://"),
CheckDown: func(upstream *proxyHandler) healthcheck.UpstreamHostDownFunc {
return func(uh *healthcheck.UpstreamHost) bool {
down := false
uh.CheckMu.Lock()
until := uh.OkUntil
uh.CheckMu.Unlock()
if !until.IsZero() && time.Now().After(until) {
down = true
}
fails := atomic.LoadInt32(&uh.Fails)
if fails >= upstream.MaxFails && upstream.MaxFails != 0 {
down = true
}
return down
}
}(&k.APIProxy.handler),
}
k.APIProxy.handler.Hosts[i] = uh
}
k.APIProxy.Handler = &k.APIProxy.handler
// Find the random port used for api proxy
endpoint = fmt.Sprintf("http://%s", listener.Addr())
}
clusterinfo.Server = endpoint
} else {
cc, err := rest.InClusterConfig()
if err != nil {
......
......@@ -39,10 +39,16 @@ func setup(c *caddy.Controller) error {
// Register KubeCache start and stop functions with Caddy
c.OnStartup(func() error {
go kubernetes.APIConn.Run()
if kubernetes.APIProxy != nil {
go kubernetes.APIProxy.Run()
}
return nil
})
c.OnShutdown(func() error {
if kubernetes.APIProxy != nil {
kubernetes.APIProxy.Stop()
}
return kubernetes.APIConn.Stop()
})
......@@ -140,7 +146,9 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
case "endpoint":
args := c.RemainingArgs()
if len(args) > 0 {
k8s.APIEndpoint = args[0]
for _, endpoint := range strings.Split(args[0], ",") {
k8s.APIServerList = append(k8s.APIServerList, strings.TrimSpace(endpoint))
}
continue
}
return nil, c.ArgErr()
......
......@@ -19,6 +19,7 @@ type UpstreamHostDownFunc func(*UpstreamHost) bool
type UpstreamHost struct {
Conns int64 // must be first field to be 64-bit aligned on 32-bit systems
Name string // IP address (and port) of this upstream host
Network string // Network (tcp, unix, etc) of the host, default "" is "tcp"
Fails int32
FailTimeout time.Duration
OkUntil time.Time
......
......@@ -450,7 +450,7 @@ func doIntegrationTests(t *testing.T, corefile string, testCases []test.Case) {
// Work-around for timing condition that results in no-data being returned in
// test environment.
time.Sleep(1 * time.Second)
time.Sleep(3 * time.Second)
for _, tc := range testCases {
......@@ -513,6 +513,27 @@ func TestKubernetesIntegration(t *testing.T) {
doIntegrationTests(t, corefile, dnsTestCases)
}
func TestKubernetesIntegrationAPIProxy(t *testing.T) {
removeUpstreamConfig, upstreamServer, udp := createUpstreamServer(t)
defer upstreamServer.Stop()
defer removeUpstreamConfig()
corefile :=
`.:0 {
kubernetes cluster.local 0.0.10.in-addr.arpa {
endpoint http://nonexistance:8080,http://invalidip:8080,http://localhost:8080
namespaces test-1
pods disabled
upstream ` + udp + `
}
erratic . {
drop 0
}
`
doIntegrationTests(t, corefile, dnsTestCases)
}
func TestKubernetesIntegrationPodsInsecure(t *testing.T) {
corefile :=
`.:0 {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment