Commit bb05a665 authored by ghostflame's avatar ghostflame Committed by Miek Gieben

middleware/proxy: async health checks (#749)

* Switches out Unhealthy bool for OkUntil timestamp

* Make sure servers are healthy forever if there are no health checks

* Moves health check off into a go routine to avoid blocking conditions

* Improved logging info

* Fixes initial date

* Fixes health checking; alters tests to adapt to async health checking

* Moves future variable into static upstream and populates it in more places

* Restores silencing of stdout during testing

* Restores silencing of stdout during testing

* keeps check url string once built

* Removes debug message

* uses zero value to signal no checking; reduces in-mutex code to a fetch
parent edf71fb1
......@@ -41,9 +41,9 @@ proxy FROM TO... {
* `max_fails` is the number of failures within fail_timeout that are needed before considering
a backend to be down. If 0, the backend will never be marked as down. Default is 1.
* `health_check` will check path (on port) on each backend. If a backend returns a status code of
200-399, then that backend is healthy. If it doesn't, the backend is marked as unhealthy for
duration and no requests are routed to it. If this option is not provided then health checks are
disabled. The default duration is 30 seconds ("30s").
200-399, then that backend is marked healthy for double the healthcheck duration. If it doesn't,
it is marked as unhealthy and no requests are routed to it. If this option is not provided then
health checks are disabled. The default duration is 30 seconds ("30s").
* **IGNORED_NAMES** in `except` is a space-separated list of domains to exclude from proxying.
Requests that match none of these names will be passed through.
* `spray` when all backends are unhealthy, randomly pick one to send the traffic to. (This is
......
......@@ -206,6 +206,7 @@ func newUpstream(hosts []string, old *staticUpstream) Upstream {
Spray: nil,
FailTimeout: 10 * time.Second,
MaxFails: 3,
Future: 60 * time.Second,
ex: old.ex,
WithoutPathPrefix: old.WithoutPathPrefix,
IgnoredSubDomains: old.IgnoredSubDomains,
......@@ -218,23 +219,30 @@ func newUpstream(hosts []string, old *staticUpstream) Upstream {
Conns: 0,
Fails: 0,
FailTimeout: upstream.FailTimeout,
Unhealthy: false,
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
return func(uh *UpstreamHost) bool {
if uh.Unhealthy {
return true
down := false
uh.checkMu.Lock()
until := uh.OkUntil
uh.checkMu.Unlock()
if !until.IsZero() && time.Now().After(until) {
down = true
}
fails := atomic.LoadInt32(&uh.Fails)
if fails >= upstream.MaxFails && upstream.MaxFails != 0 {
return true
down = true
}
return false
return down
}
}(upstream),
WithoutPathPrefix: upstream.WithoutPathPrefix,
}
upstream.Hosts[i] = uh
}
return upstream
......
......@@ -27,6 +27,7 @@ func TestStartupShutdown(t *testing.T) {
Policy: &Random{},
Spray: nil,
FailTimeout: 10 * time.Second,
Future: 60 * time.Second,
MaxFails: 1,
}
g := newGrpcClient(nil, upstream)
......
......@@ -30,6 +30,7 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy {
Spray: nil,
FailTimeout: 10 * time.Second,
MaxFails: 3, // TODO(miek): disable error checking for simple lookups?
Future: 60 * time.Second,
ex: newDNSExWithOption(opts),
}
......@@ -40,21 +41,29 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy {
Fails: 0,
FailTimeout: upstream.FailTimeout,
Unhealthy: false,
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
return func(uh *UpstreamHost) bool {
if uh.Unhealthy {
return true
down := false
uh.checkMu.Lock()
until := uh.OkUntil
uh.checkMu.Unlock()
if !until.IsZero() && time.Now().After(until) {
down = true
}
fails := atomic.LoadInt32(&uh.Fails)
if fails >= upstream.MaxFails && upstream.MaxFails != 0 {
return true
down = true
}
return false
return down
}
}(upstream),
WithoutPathPrefix: upstream.WithoutPathPrefix,
}
upstream.Hosts[i] = uh
}
p.Upstreams = &[]Upstream{upstream}
......
......@@ -5,6 +5,7 @@ import (
"net/http/httptest"
"os"
"testing"
"time"
)
var workableServer *httptest.Server
......@@ -54,7 +55,7 @@ func TestRoundRobinPolicy(t *testing.T) {
t.Error("Expected second round robin host to be third host in the pool.")
}
// mark host as down
pool[0].Unhealthy = true
pool[0].OkUntil = time.Unix(0, 0)
h = rrPolicy.Select(pool)
if h != pool[1] {
t.Error("Expected third round robin host to be first host in the pool.")
......
......@@ -59,9 +59,11 @@ type UpstreamHost struct {
Name string // IP address (and port) of this upstream host
Fails int32
FailTimeout time.Duration
Unhealthy bool
OkUntil time.Time
CheckDown UpstreamHostDownFunc
CheckUrl string
WithoutPathPrefix string
Checking bool
checkMu sync.Mutex
}
......@@ -72,7 +74,17 @@ func (uh *UpstreamHost) Down() bool {
if uh.CheckDown == nil {
// Default settings
fails := atomic.LoadInt32(&uh.Fails)
return uh.Unhealthy || fails > 0
after := false
uh.checkMu.Lock()
until := uh.OkUntil
uh.checkMu.Unlock()
if !until.IsZero() && time.Now().After(until) {
after = true
}
return after || fails > 0
}
return uh.CheckDown(uh)
}
......
......@@ -74,8 +74,10 @@ func TestStop(t *testing.T) {
t.Error("Expected healthchecks to hit test server. Got no healthchecks.")
}
// health checks are in a go routine now, so one may well occur after we shutdown,
// but we only ever expect one more
counterValueAfterWaiting := atomic.LoadInt64(&counter)
if counterValueAfterWaiting != counterValueAfterShutdown {
if counterValueAfterWaiting > (counterValueAfterShutdown + 1) {
t.Errorf("Expected no more healthchecks after shutdown. Got: %d healthchecks after shutdown", counterValueAfterWaiting-counterValueAfterShutdown)
}
......
......@@ -36,6 +36,7 @@ type staticUpstream struct {
FailTimeout time.Duration
MaxFails int32
Future time.Duration
HealthCheck struct {
Path string
Port string
......@@ -59,6 +60,7 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
Spray: nil,
FailTimeout: 10 * time.Second,
MaxFails: 1,
Future: 60 * time.Second,
ex: newDNSEx(),
}
......@@ -89,21 +91,25 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
Conns: 0,
Fails: 0,
FailTimeout: upstream.FailTimeout,
Unhealthy: false,
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
return func(uh *UpstreamHost) bool {
down := false
uh.checkMu.Lock()
defer uh.checkMu.Unlock()
if uh.Unhealthy {
return true
until := uh.OkUntil
uh.checkMu.Unlock()
if !until.IsZero() && time.Now().After(until) {
down = true
}
fails := atomic.LoadInt32(&uh.Fails)
if fails >= upstream.MaxFails && upstream.MaxFails != 0 {
return true
down = true
}
return false
return down
}
}(upstream),
WithoutPathPrefix: upstream.WithoutPathPrefix,
......@@ -186,6 +192,12 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
return err
}
u.HealthCheck.Interval = dur
u.Future = 2 * dur
// set a minimum of 3 seconds
if u.Future < (3 * time.Second) {
u.Future = 3 * time.Second
}
}
case "without":
if !c.NextArg() {
......@@ -247,46 +259,93 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
return nil
}
func (u *staticUpstream) healthCheck() {
for _, host := range u.Hosts {
var hostName, checkPort string
// This was moved into a thread so that each host could throw a health
// check at the same time. The reason for this is that if we are checking
// 3 hosts, and the first one is gone, and we spend minutes timing out to
// fail it, we would not have been doing any other health checks in that
// time. So we now have a per-host lock and a threaded health check.
//
// We use the Checking bool to avoid concurrent checks against the same
// host; if one is taking a long time, the next one will find a check in
// progress and simply return before trying.
//
// We are carefully avoiding having the mutex locked while we check,
// otherwise checks will back up, potentially a lot of them if a host is
// absent for a long time. This arrangement makes checks quickly see if
// they are the only one running and abort otherwise.
func healthCheckUrl(nextTs time.Time, host *UpstreamHost) {
// lock for our bool check. We don't just defer the unlock because
// we don't want the lock held while http.Get runs
host.checkMu.Lock()
// are we mid check? Don't run another one
if host.Checking {
host.checkMu.Unlock()
return
}
// The DNS server might be an HTTP server. If so, extract its name.
if url, err := url.Parse(host.Name); err == nil {
hostName = url.Host
} else {
hostName = host.Name
}
host.Checking = true
host.checkMu.Unlock()
// Extract the port number from the parsed server name.
checkHostName, checkPort, err := net.SplitHostPort(hostName)
if err != nil {
checkHostName = hostName
}
//log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local())
// fetch that url. This has been moved into a go func because
// when the remote host is not merely not serving, but actually
// absent, then tcp syn timeouts can be very long, and so one
// fetch could last several check intervals
if r, err := http.Get(host.CheckUrl); err == nil {
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
if u.HealthCheck.Port != "" {
checkPort = u.HealthCheck.Port
if r.StatusCode < 200 || r.StatusCode >= 400 {
log.Printf("[WARNING] Host %s health check returned HTTP code %d\n",
host.Name, r.StatusCode)
nextTs = time.Unix(0, 0)
}
} else {
log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err)
nextTs = time.Unix(0, 0)
}
hostURL := "http://" + net.JoinHostPort(checkHostName, checkPort) + u.HealthCheck.Path
host.checkMu.Lock()
host.Checking = false
host.OkUntil = nextTs
host.checkMu.Unlock()
}
func (u *staticUpstream) healthCheck() {
for _, host := range u.Hosts {
host.checkMu.Lock()
defer host.checkMu.Unlock()
if host.CheckUrl == "" {
var hostName, checkPort string
if r, err := http.Get(hostURL); err == nil {
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
if r.StatusCode < 200 || r.StatusCode >= 400 {
log.Printf("[WARNING] Health check URL %s returned HTTP code %d\n",
hostURL, r.StatusCode)
host.Unhealthy = true
// The DNS server might be an HTTP server. If so, extract its name.
ret, err := url.Parse(host.Name)
if err == nil && len(ret.Host) > 0 {
hostName = ret.Host
} else {
host.Unhealthy = false
hostName = host.Name
}
// Extract the port number from the parsed server name.
checkHostName, checkPort, err := net.SplitHostPort(hostName)
if err != nil {
checkHostName = hostName
}
if u.HealthCheck.Port != "" {
checkPort = u.HealthCheck.Port
}
} else {
log.Printf("[WARNING] Health check probe failed: %v\n", err)
host.Unhealthy = true
host.CheckUrl = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.HealthCheck.Path
}
// calculate this before the get
nextTs := time.Now().Add(u.Future)
// locks/bools should prevent requests backing up
go healthCheckUrl(nextTs, host)
}
}
......
......@@ -23,9 +23,14 @@ func TestHealthCheck(t *testing.T) {
Policy: &Random{},
Spray: nil,
FailTimeout: 10 * time.Second,
Future: 60 * time.Second,
MaxFails: 1,
}
upstream.healthCheck()
// sleep a bit, it's async now
time.Sleep(time.Duration(2 * time.Second))
if upstream.Hosts[0].Down() {
t.Error("Expected first host in testpool to not fail healthcheck.")
}
......@@ -40,15 +45,16 @@ func TestSelect(t *testing.T) {
Hosts: testPool()[:3],
Policy: &Random{},
FailTimeout: 10 * time.Second,
Future: 60 * time.Second,
MaxFails: 1,
}
upstream.Hosts[0].Unhealthy = true
upstream.Hosts[1].Unhealthy = true
upstream.Hosts[2].Unhealthy = true
upstream.Hosts[0].OkUntil = time.Unix(0, 0)
upstream.Hosts[1].OkUntil = time.Unix(0, 0)
upstream.Hosts[2].OkUntil = time.Unix(0, 0)
if h := upstream.Select(); h != nil {
t.Error("Expected select to return nil as all host are down")
}
upstream.Hosts[2].Unhealthy = false
upstream.Hosts[2].OkUntil = time.Time{}
if h := upstream.Select(); h == nil {
t.Error("Expected select to not return nil")
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment