Commit 1652395e authored by Miek Gieben's avatar Miek Gieben

Check everything in

Signed-off-by: default avatarMiek Gieben <miek@miek.nl>
parent 5f2d5788
......@@ -6,14 +6,13 @@ Repos used:
: implements control plane, has testing stuff in pkg/test/main (iirc).
<https://github.com/grpc/grpc-go/tree/master/xds/internal/client>
: implements client for xDS - can probably list all code out from there.
: implements client for xDS - much of this code has been reused here.
To see if things are working start the testing control plane from go-control-plane:
https://github.com/envoyproxy/envoy/blob/master/api/API_OVERVIEW.md
https://github.com/envoyproxy/learnenvoy/blob/master/_articles/service-discovery.md
* https://github.com/envoyproxy/envoy/blob/master/api/API_OVERVIEW.md
* https://github.com/envoyproxy/learnenvoy/blob/master/_articles/service-discovery.md
* This was really helpful: https://www.envoyproxy.io/docs/envoy/v1.11.2/api-docs/xds_protocol
Cluster: A cluster is a group of logically similar endpoints that Envoy connects to. In v2, RDS
routes points to clusters, CDS provides cluster configuration and Envoy discovers the cluster
......@@ -22,19 +21,25 @@ members via EDS.
# Testing
~~~ sh
$ cd ~/src/github.com/envoyproxy/go-control-plane
% make integration.xds
% cd ~/src/github.com/envoyproxy/go-control-plane/pkg/test/main
% go build
% ./main --xds=ads --runtimes=2 -debug
~~~
This runs a binary from pkg/test/main. Now we're testing aDS.
This runs a binary from pkg/test/main. Now we're testing aDS. Everything is using gRPC with TLS,
`grpc.WithInsecure()`. The binary runs on port 18000 on localhost; all these things are currently
hardcoded in the *traffic* plugin. This will be factored out into config as some point.
The script stops, unless you have Envoy installed (which I haven't), but you can run it manually:
Then for CoreDNS, check out the `traffic` branch, create a Corefile:
~~~ sh
./bin/test --xds=ads --runtimes=2 -debug # for ads
~~~ Corefile
example.org {
traffic
debug
}
~~~
This fails with `timeout waiting for the first request`, means you're consumer wasn't quick enough
in asking for xDS assignments.
Start CoreDNS, and see logging/debugging flow by; the test binary should also spew out a bunch of
things. CoreDNS willl build up a list of cluster and endpoints. Next you can query it.
Use insecure.
TODO
......@@ -23,6 +23,7 @@ package xds
import (
"context"
"sync"
"time"
clog "github.com/coredns/coredns/plugin/pkg/log"
......@@ -33,7 +34,7 @@ import (
"google.golang.org/grpc"
)
var log = clog.NewWithPlugin("traffic")
var log = clog.NewWithPlugin("traffic xds:")
const (
cdsURL = "type.googleapis.com/envoy.api.v2.Cluster"
......@@ -48,6 +49,7 @@ type Client struct {
assignments assignment
node *corepb.Node
cancel context.CancelFunc
stop chan struct{}
}
type assignment struct {
......@@ -141,7 +143,8 @@ func (c *Client) Receive(stream adsStream) error {
for {
resp, err := stream.Recv()
if err != nil {
return err
log.Warningf("Trouble receiving from the gRPC connection: %s", err)
time.Sleep(1 * time.Second) // better.
}
switch resp.GetTypeUrl() {
......@@ -157,13 +160,13 @@ func (c *Client) Receive(stream adsStream) error {
}
c.assignments.SetClusterLoadAssignment(cluster.GetName(), nil)
}
println("HERER", len(resp.GetResources()))
println("CDS", len(resp.GetResources()), "processed")
log.Debug("Cluster discovery processed with %d resources", len(resp.GetResources()))
// ack the CDS proto, with we we've got. (empty version would be NACK)
if err := c.ClusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), c.assignments.Clusters()); err != nil {
log.Warningf("Failed to acknowledge cluster discovery: %s", err)
}
// need to figure out how to handle the version exactly.
// need to figure out how to handle the versions and nounces exactly.
// now kick off discovery for endpoints
if err := c.EndpointDiscovery(stream, "", "", c.assignments.Clusters()); err != nil {
......@@ -171,7 +174,23 @@ func (c *Client) Receive(stream adsStream) error {
}
case edsURL:
println("EDS")
for _, r := range resp.GetResources() {
var any ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &any); err != nil {
log.Debugf("Failed to unmarshal endpoint discovery: %s", err)
continue
}
cla, ok := any.Message.(*xdspb.ClusterLoadAssignment)
if !ok {
log.Debugf("Unexpected resource type: %T in endpoint discovery", any.Message)
continue
}
c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla)
// ack the bloody thing
}
println("EDS", len(resp.GetResources()), "processed")
log.Debug("Endpoint discovery processed with %d resources", len(resp.GetResources()))
default:
log.Warningf("Unknown response URL for discovery: %q", resp.GetTypeUrl())
continue
......
This code is copied from
[https://github.com/grpc/grpc-go/tree/master/xds](https://github.com/grpc/grpc-go/tree/master/xds).
Grpc-go is also a consumer of the Envoy xDS data and acts upon it.
The *traffic* plugin only cares about clusters and endpoints, the following bits are deleted:
* lDS; listener discovery is not used here.
* rDS: routes have no use for DNS responses.
Load reporting is also not implemented, although this can be done on the DNS level.
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package bootstrap provides the functionality to initialize certain aspects
// of an xDS client by reading a bootstrap file.
package bootstrap
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"github.com/coredns/coredns/plugin/pkg/log"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/jsonpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/google"
"google.golang.org/grpc/grpclog"
)
const (
// Environment variable which holds the name of the xDS bootstrap file.
fileEnv = "GRPC_XDS_BOOTSTRAP"
// Type name for Google default credentials.
googleDefaultCreds = "google_default"
)
var gRPCVersion = fmt.Sprintf("gRPC-Go %s", grpc.Version)
// For overriding in unit tests.
var fileReadFunc = ioutil.ReadFile
// Config provides the xDS client with several key bits of information that it
// requires in its interaction with an xDS server. The Config is initialized
// from the bootstrap file.
type Config struct {
// BalancerName is the name of the xDS server to connect to.
//
// The bootstrap file contains a list of servers (with name+creds), but we
// pick the first one.
BalancerName string
// Creds contains the credentials to be used while talking to the xDS
// server, as a grpc.DialOption.
Creds grpc.DialOption
// NodeProto contains the node proto to be used in xDS requests.
NodeProto *corepb.Node
}
type channelCreds struct {
Type string `json:"type"`
Config json.RawMessage `json:"config"`
}
type xdsServer struct {
ServerURI string `json:"server_uri"`
ChannelCreds []channelCreds `json:"channel_creds"`
}
// NewConfig returns a new instance of Config initialized by reading the
// bootstrap file found at ${GRPC_XDS_BOOTSTRAP}.
//
// The format of the bootstrap file will be as follows:
// {
// "xds_server": {
// "server_uri": <string containing URI of xds server>,
// "channel_creds": [
// {
// "type": <string containing channel cred type>,
// "config": <JSON object containing config for the type>
// }
// ]
// },
// "node": <JSON form of corepb.Node proto>
// }
//
// Currently, we support exactly one type of credential, which is
// "google_default", where we use the host's default certs for transport
// credentials and a Google oauth token for call credentials.
//
// This function tries to process as much of the bootstrap file as possible (in
// the presence of the errors) and may return a Config object with certain
// fields left unspecified, in which case the caller should use some sane
// defaults.
func NewConfig() (*Config, error) {
config := &Config{}
fName, ok := os.LookupEnv(fileEnv)
if !ok {
return config, fmt.Errorf("xds: %s environment variable not set", fileEnv)
}
grpclog.Infof("xds: Reading bootstrap file from %s", fName)
data, err := fileReadFunc(fName)
if err != nil {
return config, fmt.Errorf("xds: bootstrap file {%v} read failed: %v", fName, err)
}
var jsonData map[string]json.RawMessage
if err := json.Unmarshal(data, &jsonData); err != nil {
return config, fmt.Errorf("xds: json.Unmarshal(%v) failed during bootstrap: %v", string(data), err)
}
m := jsonpb.Unmarshaler{AllowUnknownFields: true}
for k, v := range jsonData {
switch k {
case "node":
n := &corepb.Node{}
if err := m.Unmarshal(bytes.NewReader(v), n); err != nil {
log.Errorf("xds: jsonpb.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err)
break
}
config.NodeProto = n
case "xds_servers":
var servers []*xdsServer
if err := json.Unmarshal(v, &servers); err != nil {
log.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err)
break
}
if len(servers) < 1 {
log.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any xds server to connect to")
break
}
xs := servers[0]
config.BalancerName = xs.ServerURI
for _, cc := range xs.ChannelCreds {
if cc.Type == googleDefaultCreds {
config.Creds = grpc.WithCredentialsBundle(google.NewComputeEngineCredentials())
// We stop at the first credential type that we support.
break
}
}
default:
// Do not fail the xDS bootstrap when an unknown field is seen.
log.Warningf("xds: unexpected data in bootstrap file: {%v, %v}", k, string(v))
}
}
// If we don't find a nodeProto in the bootstrap file, we just create an
// empty one here. That way, callers of this function can always expect
// that the NodeProto field is non-nil.
if config.NodeProto == nil {
config.NodeProto = &corepb.Node{}
}
config.NodeProto.BuildVersion = gRPCVersion
return config, nil
}
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package bootstrap
import (
"os"
"testing"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/google"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
structpb "github.com/golang/protobuf/ptypes/struct"
)
var (
nodeProto = &corepb.Node{
Id: "ENVOY_NODE_ID",
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"TRAFFICDIRECTOR_GRPC_HOSTNAME": {
Kind: &structpb.Value_StringValue{StringValue: "trafficdirector"},
},
},
},
BuildVersion: gRPCVersion,
}
nilCredsConfig = &Config{
BalancerName: "trafficdirector.googleapis.com:443",
Creds: nil,
NodeProto: nodeProto,
}
nonNilCredsConfig = &Config{
BalancerName: "trafficdirector.googleapis.com:443",
Creds: grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()),
NodeProto: nodeProto,
}
)
// TestNewConfig exercises the functionality in NewConfig with different
// bootstrap file contents. It overrides the fileReadFunc by returning
// bootstrap file contents defined in this test, instead of reading from a
// file.
func TestNewConfig(t *testing.T) {
bootstrapFileMap := map[string]string{
"empty": "",
"badJSON": `["test": 123]`,
"emptyNodeProto": `
{
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443"
}]
}`,
"emptyXdsServer": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
}
}`,
"unknownTopLevelFieldInFile": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [
{ "type": "not-google-default" }
]
}],
"unknownField": "foobar"
}`,
"unknownFieldInNodeProto": `
{
"node": {
"id": "ENVOY_NODE_ID",
"unknownField": "foobar",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
}
}`,
"unknownFieldInXdsServer": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [
{ "type": "not-google-default" }
],
"unknownField": "foobar"
}]
}`,
"emptyChannelCreds": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443"
}]
}`,
"nonGoogleDefaultCreds": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [
{ "type": "not-google-default" }
]
}]
}`,
"multipleChannelCreds": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [
{ "type": "not-google-default" },
{ "type": "google_default" }
]
}]
}`,
"goodBootstrap": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [
{ "type": "google_default" }
]
}]
}`,
"multipleXDSServers": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [
{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [{ "type": "google_default" }]
},
{
"server_uri": "backup.never.use.com:1234",
"channel_creds": [{ "type": "not-google-default" }]
}
]
}`,
}
oldFileReadFunc := fileReadFunc
fileReadFunc = func(name string) ([]byte, error) {
if b, ok := bootstrapFileMap[name]; ok {
return []byte(b), nil
}
return nil, os.ErrNotExist
}
defer func() {
fileReadFunc = oldFileReadFunc
os.Unsetenv(fileEnv)
}()
tests := []struct {
name string
wantConfig *Config
}{
{"nonExistentBootstrapFile", &Config{}},
{"empty", &Config{}},
{"badJSON", &Config{}},
{"emptyNodeProto", &Config{
BalancerName: "trafficdirector.googleapis.com:443",
NodeProto: &corepb.Node{BuildVersion: gRPCVersion},
}},
{"emptyXdsServer", &Config{NodeProto: nodeProto}},
{"unknownTopLevelFieldInFile", nilCredsConfig},
{"unknownFieldInNodeProto", &Config{NodeProto: nodeProto}},
{"unknownFieldInXdsServer", nilCredsConfig},
{"emptyChannelCreds", nilCredsConfig},
{"nonGoogleDefaultCreds", nilCredsConfig},
{"multipleChannelCreds", nonNilCredsConfig},
{"goodBootstrap", nonNilCredsConfig},
{"multipleXDSServers", nonNilCredsConfig},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if err := os.Setenv(fileEnv, test.name); err != nil {
t.Fatalf("os.Setenv(%s, %s) failed with error: %v", fileEnv, test.name, err)
}
config := NewConfig()
if config.BalancerName != test.wantConfig.BalancerName {
t.Errorf("config.BalancerName is %s, want %s", config.BalancerName, test.wantConfig.BalancerName)
}
if !proto.Equal(config.NodeProto, test.wantConfig.NodeProto) {
t.Errorf("config.NodeProto is %#v, want %#v", config.NodeProto, test.wantConfig.NodeProto)
}
if (config.Creds != nil) != (test.wantConfig.Creds != nil) {
t.Errorf("config.Creds is %#v, want %#v", config.Creds, test.wantConfig.Creds)
}
})
}
}
func TestNewConfigEnvNotSet(t *testing.T) {
os.Unsetenv(fileEnv)
wantConfig := Config{}
if config := NewConfig(); *config != wantConfig {
t.Errorf("NewConfig() returned : %#v, wanted an empty Config object", config)
}
}
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package buffer provides an implementation of an unbounded buffer.
package buffer
import "sync"
// Unbounded is an implementation of an unbounded buffer which does not use
// extra goroutines. This is typically used for passing updates from one entity
// to another within gRPC.
//
// All methods on this type are thread-safe and don't block on anything except
// the underlying mutex used for synchronization.
//
// Unbounded supports values of any type to be stored in it by using a channel
// of `interface{}`. This means that a call to Put() incurs an extra memory
// allocation, and also that users need a type assertion while reading. For
// performance critical code paths, using Unbounded is strongly discouraged and
// defining a new type specific implementation of this buffer is preferred. See
// internal/transport/transport.go for an example of this.
type Unbounded struct {
c chan interface{}
mu sync.Mutex
backlog []interface{}
}
// NewUnbounded returns a new instance of Unbounded.
func NewUnbounded() *Unbounded {
return &Unbounded{c: make(chan interface{}, 1)}
}
// Put adds t to the unbounded buffer.
func (b *Unbounded) Put(t interface{}) {
b.mu.Lock()
if len(b.backlog) == 0 {
select {
case b.c <- t:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, t)
b.mu.Unlock()
}
// Load sends the earliest buffered data, if any, onto the read channel
// returned by Get(). Users are expected to call this every time they read a
// value from the read channel.
func (b *Unbounded) Load() {
b.mu.Lock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = nil
b.backlog = b.backlog[1:]
default:
}
}
b.mu.Unlock()
}
// Get returns a read channel on which values added to the buffer, via Put(),
// are sent on.
//
// Upon reading a value from this channel, users are expected to call Load() to
// send the next buffered value onto the channel if there is any.
func (b *Unbounded) Get() <-chan interface{} {
return b.c
}
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package buffer
import (
"reflect"
"sort"
"sync"
"testing"
)
const (
numWriters = 10
numWrites = 10
)
// wantReads contains the set of values expected to be read by the reader
// goroutine in the tests.
var wantReads []int
func init() {
for i := 0; i < numWriters; i++ {
for j := 0; j < numWrites; j++ {
wantReads = append(wantReads, i)
}
}
}
// TestSingleWriter starts one reader and one writer goroutine and makes sure
// that the reader gets all the value added to the buffer by the writer.
func TestSingleWriter(t *testing.T) {
ub := NewUnbounded()
reads := []int{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ch := ub.Get()
for i := 0; i < numWriters*numWrites; i++ {
r := <-ch
reads = append(reads, r.(int))
ub.Load()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numWriters; i++ {
for j := 0; j < numWrites; j++ {
ub.Put(i)
}
}
}()
wg.Wait()
if !reflect.DeepEqual(reads, wantReads) {
t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads)
}
}
// TestMultipleWriters starts multiple writers and one reader goroutine and
// makes sure that the reader gets all the data written by all writers.
func TestMultipleWriters(t *testing.T) {
ub := NewUnbounded()
reads := []int{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ch := ub.Get()
for i := 0; i < numWriters*numWrites; i++ {
r := <-ch
reads = append(reads, r.(int))
ub.Load()
}
}()
wg.Add(numWriters)
for i := 0; i < numWriters; i++ {
go func(index int) {
defer wg.Done()
for j := 0; j < numWrites; j++ {
ub.Put(index)
}
}(i)
}
wg.Wait()
sort.Ints(reads)
if !reflect.DeepEqual(reads, wantReads) {
t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads)
}
}
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"fmt"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/ptypes"
)
// handleCDSResponse processes an CDS response received from the xDS server. On
// receipt of a good response, it also invokes the registered watcher callback.
func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
println("handlCDSResponse")
v2c.mu.Lock()
defer v2c.mu.Unlock()
wi := v2c.watchMap[cdsURL]
if wi == nil {
return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp)
}
var returnUpdate CDSUpdate
localCache := make(map[string]CDSUpdate)
for _, r := range resp.GetResources() {
var resource ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &resource); err != nil {
return fmt.Errorf("xds: failed to unmarshal resource in CDS response: %v", err)
}
cluster, ok := resource.Message.(*xdspb.Cluster)
if !ok {
return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message)
}
fmt.Printf("CLUSTER %+v\n", cluster)
update, err := validateCluster(cluster)
if err != nil {
return err
}
// If the Cluster message in the CDS response did not contain a
// serviceName, we will just use the clusterName for EDS.
if update.ServiceName == "" {
update.ServiceName = cluster.GetName()
}
localCache[cluster.GetName()] = update
if cluster.GetName() == wi.target[0] {
returnUpdate = update
}
}
v2c.cdsCache = localCache
var err error
if returnUpdate.ServiceName == "" {
err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp)
}
wi.stopTimer()
wi.callback.(cdsCallback)(returnUpdate, err)
return nil
}
func validateCluster(cluster *xdspb.Cluster) (CDSUpdate, error) {
emptyUpdate := CDSUpdate{ServiceName: ""}
switch {
case cluster.GetType() != xdspb.Cluster_EDS:
return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster)
case cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil:
return emptyUpdate, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster)
case cluster.GetLbPolicy() != xdspb.Cluster_ROUND_ROBIN:
return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}
return CDSUpdate{ServiceName: cluster.GetEdsClusterConfig().GetServiceName()}, nil
}
This diff is collapsed.
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package client implementation a full fledged gRPC client for the xDS API
// used by the xds resolver and balancer implementations.
package xds
import (
"errors"
"fmt"
"time"
"github.com/coredns/coredns/plugin/traffic/xds/bootstrap"
"google.golang.org/grpc"
)
// Options provides all parameters required for the creation of an xDS client.
type Options struct {
// Config contains a fully populated bootstrap config. It is the
// responsibility of the caller to use some sane defaults here if the
// bootstrap process returned with certain fields left unspecified.
Config bootstrap.Config
// DialOpts contains dial options to be used when dialing the xDS server.
DialOpts []grpc.DialOption
}
// Client is a full fledged gRPC client which queries a set of discovery APIs
// (collectively termed as xDS) on a remote management server, to discover
// various dynamic resources. A single client object will be shared by the xds
// resolver and balancer implementations.
type Client struct {
opts Options
cc *grpc.ClientConn // Connection to the xDS server
v2c *v2Client // Actual xDS client implementation using the v2 API
serviceCallback func(ServiceUpdate, error)
}
// New returns a new xdsClient configured with opts.
func New(opts Options) (*Client, error) {
switch {
case opts.Config.BalancerName == "":
return nil, errors.New("xds: no xds_server name provided in options")
case opts.Config.Creds == nil:
fmt.Printf("%s\n", errors.New("xds: no credentials provided in options"))
case opts.Config.NodeProto == nil:
return nil, errors.New("xds: no node_proto provided in options")
}
var dopts []grpc.DialOption
if opts.Config.Creds == nil {
dopts = append([]grpc.DialOption{grpc.WithInsecure()}, opts.DialOpts...)
} else {
dopts = append([]grpc.DialOption{opts.Config.Creds}, opts.DialOpts...)
}
cc, err := grpc.Dial(opts.Config.BalancerName, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err)
}
println("dialed balancer at", opts.Config.BalancerName)
c := &Client{
opts: opts,
cc: cc,
v2c: newV2Client(cc, opts.Config.NodeProto, func(int) time.Duration { return 0 }),
}
return c, nil
}
// Close closes the gRPC connection to the xDS server.
func (c *Client) Close() {
// TODO: Should we invoke the registered callbacks here with an error that
// the client is closed?
c.v2c.close()
c.cc.Close()
}
func (c *Client) Run() {
c.v2c.run()
}
// ServiceUpdate contains update about the service.
type ServiceUpdate struct {
Cluster string
}
// WatchCluster uses CDS to discover information about the provided clusterName.
func (c *Client) WatchCluster(clusterName string, cdsCb func(CDSUpdate, error)) (cancel func()) {
return c.v2c.watchCDS(clusterName, cdsCb)
}
// WatchEndpoints uses EDS to discover information about the endpoints in a cluster.
func (c *Client) WatchEndpoints(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) {
return c.v2c.watchEDS(clusterName, edsCb)
}
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"errors"
"fmt"
"testing"
"time"
"github.com/coredns/coredns/plugin/traffic/xds/bootstrap"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
func clientOpts(balancerName string) Options {
return Options{
Config: bootstrap.Config{
BalancerName: balancerName,
Creds: grpc.WithInsecure(),
NodeProto: &corepb.Node{},
},
// WithTimeout is deprecated. But we are OK to call it here from the
// test, so we clearly know that the dial failed.
DialOpts: []grpc.DialOption{grpc.WithTimeout(5 * time.Second), grpc.WithBlock()},
}
}
func TestNew(t *testing.T) {
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
tests := []struct {
name string
opts Options
wantErr bool
}{
{name: "empty-opts", opts: Options{}, wantErr: true},
{
name: "empty-balancer-name",
opts: Options{
Config: bootstrap.Config{
Creds: grpc.WithInsecure(),
NodeProto: &corepb.Node{},
},
},
wantErr: true,
},
{
name: "empty-dial-creds",
opts: Options{
Config: bootstrap.Config{
BalancerName: "dummy",
NodeProto: &corepb.Node{},
},
},
wantErr: true,
},
{
name: "empty-node-proto",
opts: Options{
Config: bootstrap.Config{
BalancerName: "dummy",
Creds: grpc.WithInsecure(),
},
},
wantErr: true,
},
{
name: "happy-case",
opts: clientOpts(fakeServer.Address),
wantErr: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c, err := New(test.opts)
if err == nil {
defer c.Close()
}
if (err != nil) != test.wantErr {
t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr)
}
})
}
}
// TestWatchService tests the happy case of registering a watcher for
// service updates and receiving a good update.
func TestWatchService(t *testing.T) {
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}
defer xdsClient.Close()
t.Log("Created an xdsClient...")
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
if err != nil {
callbackCh.Send(fmt.Errorf("xdsClient.WatchService returned error: %v", err))
return
}
if su.Cluster != goodClusterName1 {
callbackCh.Send(fmt.Errorf("got clusterName: %+v, want clusterName: %+v", su.Cluster, goodClusterName1))
return
}
callbackCh.Send(nil)
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Make the fakeServer send LDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
// Make the fakeServer send RDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an RDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1}
waitForNilErr(t, callbackCh)
}
// TestWatchServiceWithNoResponseFromServer tests the case where the
// xDS server does not respond to the requests being sent out as part of
// registering a service update watcher. The underlying v2Client will timeout
// and will send us an error.
func TestWatchServiceWithNoResponseFromServer(t *testing.T) {
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}
defer xdsClient.Close()
t.Log("Created an xdsClient...")
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
if su.Cluster != "" {
callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster))
return
}
if err == nil {
callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error"))
return
}
callbackCh.Send(nil)
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Wait for one request from the client, but send no reponses.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
waitForNilErr(t, callbackCh)
}
// TestWatchServiceEmptyRDS tests the case where the underlying
// v2Client receives an empty RDS response.
func TestWatchServiceEmptyRDS(t *testing.T) {
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}
defer xdsClient.Close()
t.Log("Created an xdsClient...")
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
if su.Cluster != "" {
callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster))
return
}
if err == nil {
callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error"))
return
}
callbackCh.Send(nil)
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Make the fakeServer send LDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
// Make the fakeServer send an empty RDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an RDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: noVirtualHostsInRDSResponse}
waitForNilErr(t, callbackCh)
}
// TestWatchServiceWithClientClose tests the case where xDS responses are
// received after the client is closed, and we make sure that the registered
// watcher callback is not invoked.
func TestWatchServiceWithClientClose(t *testing.T) {
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}
defer xdsClient.Close()
t.Log("Created an xdsClient...")
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
callbackCh.Send(errors.New("watcher callback invoked after client close"))
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Make the fakeServer send LDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
xdsClient.Close()
t.Log("Closing the xdsClient...")
// Push an RDS response from the fakeserver
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1}
if cbErr, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout {
t.Fatal(cbErr)
}
}
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"fmt"
"net"
"strconv"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
"github.com/golang/protobuf/ptypes"
)
// OverloadDropConfig contains the config to drop overloads.
type OverloadDropConfig struct {
Category string
Numerator uint32
Denominator uint32
}
// EndpointHealthStatus represents the health status of an endpoint.
type EndpointHealthStatus int32
const (
// EndpointHealthStatusUnknown represents HealthStatus UNKNOWN.
EndpointHealthStatusUnknown EndpointHealthStatus = iota
// EndpointHealthStatusHealthy represents HealthStatus HEALTHY.
EndpointHealthStatusHealthy
// EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY.
EndpointHealthStatusUnhealthy
// EndpointHealthStatusDraining represents HealthStatus DRAINING.
EndpointHealthStatusDraining
// EndpointHealthStatusTimeout represents HealthStatus TIMEOUT.
EndpointHealthStatusTimeout
// EndpointHealthStatusDegraded represents HealthStatus DEGRADED.
EndpointHealthStatusDegraded
)
// Endpoint contains information of an endpoint.
type Endpoint struct {
Address string
HealthStatus EndpointHealthStatus
Weight uint32
}
// Locality contains information of a locality.
type Locality struct {
Endpoints []Endpoint
ID LocalityID
Priority uint32
Weight uint32
}
// EDSUpdate contains an EDS update.
type EDSUpdate struct {
Drops []OverloadDropConfig
Localities []Locality
}
func parseAddress(socketAddress *corepb.SocketAddress) string {
return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue())))
}
func parseDropPolicy(dropPolicy *xdspb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig {
percentage := dropPolicy.GetDropPercentage()
var (
numerator = percentage.GetNumerator()
denominator uint32
)
switch percentage.GetDenominator() {
case typepb.FractionalPercent_HUNDRED:
denominator = 100
case typepb.FractionalPercent_TEN_THOUSAND:
denominator = 10000
case typepb.FractionalPercent_MILLION:
denominator = 1000000
}
return OverloadDropConfig{
Category: dropPolicy.GetCategory(),
Numerator: numerator,
Denominator: denominator,
}
}
func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []Endpoint {
endpoints := make([]Endpoint, 0, len(lbEndpoints))
for _, lbEndpoint := range lbEndpoints {
endpoints = append(endpoints, Endpoint{
HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()),
Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(),
})
}
return endpoints
}
// ParseEDSRespProto turns EDS response proto message to EDSUpdate.
//
// This is temporarily exported to be used in eds balancer, before it switches
// to use xds client. TODO: unexport.
func ParseEDSRespProto(m *xdspb.ClusterLoadAssignment) (*EDSUpdate, error) {
ret := &EDSUpdate{}
for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy))
}
priorities := make(map[uint32]struct{})
for _, locality := range m.Endpoints {
l := locality.GetLocality()
if l == nil {
return nil, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality)
}
lid := LocalityID{Region: l.Region, Zone: l.Zone, SubZone: l.SubZone}
priority := locality.GetPriority()
priorities[priority] = struct{}{}
ret.Localities = append(ret.Localities, Locality{
ID: lid,
Endpoints: parseEndpoints(locality.GetLbEndpoints()),
Weight: locality.GetLoadBalancingWeight().GetValue(),
Priority: priority,
})
}
for i := 0; i < len(priorities); i++ {
if _, ok := priorities[uint32(i)]; !ok {
return nil, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities)
}
}
return ret, nil
}
// ParseEDSRespProtoForTesting parses EDS response, and panic if parsing fails.
// This is used by EDS balancer tests.
//
// TODO: delete this. The EDS balancer should build an EDSUpdate directly,
// instead of building and parsing a proto message.
func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate {
u, err := ParseEDSRespProto(m)
if err != nil {
panic(err.Error())
}
return u
}
func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()
wi := v2c.watchMap[edsURL]
if wi == nil {
return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp)
}
var returnUpdate *EDSUpdate
for _, r := range resp.GetResources() {
var resource ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &resource); err != nil {
return fmt.Errorf("xds: failed to unmarshal resource in EDS response: %v", err)
}
cla, ok := resource.Message.(*xdspb.ClusterLoadAssignment)
if !ok {
return fmt.Errorf("xds: unexpected resource type: %T in EDS response", resource.Message)
}
if cla.GetClusterName() != wi.target[0] {
log.Warningf("xds: got uninteresting EDS resource, got %s, want %s", cla.GetClusterName(), wi.target[0])
// We won't validate the remaining resources. If one of the
// uninteresting ones is invalid, we will still ACK the response.
continue
}
u, err := ParseEDSRespProto(cla)
if err != nil {
return err
}
returnUpdate = u
// Break from the loop because the request resource is found. But
// this also means we won't validate the remaining resources. If one
// of the uninteresting ones is invalid, we will still ACK the
// response.
break
}
if returnUpdate != nil {
wi.stopTimer()
wi.callback.(edsCallback)(returnUpdate, nil)
}
return nil
}
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"errors"
"fmt"
"testing"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/ptypes"
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/xds/internal/testutils"
)
func TestEDSParseRespProto(t *testing.T) {
tests := []struct {
name string
m *xdspb.ClusterLoadAssignment
want *EDSUpdate
wantErr bool
}{
{
name: "missing-priority",
m: func() *xdspb.ClusterLoadAssignment {
clab0 := NewClusterLoadAssignmentBuilder("test", nil)
clab0.AddLocality("locality-1", 1, 0, []string{"addr1:314"}, nil)
clab0.AddLocality("locality-2", 1, 2, []string{"addr2:159"}, nil)
return clab0.Build()
}(),
want: nil,
wantErr: true,
},
{
name: "missing-locality-ID",
m: func() *xdspb.ClusterLoadAssignment {
clab0 := NewClusterLoadAssignmentBuilder("test", nil)
clab0.AddLocality("", 1, 0, []string{"addr1:314"}, nil)
return clab0.Build()
}(),
want: nil,
wantErr: true,
},
{
name: "good",
m: func() *xdspb.ClusterLoadAssignment {
clab0 := NewClusterLoadAssignmentBuilder("test", nil)
clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, &AddLocalityOptions{
Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY},
Weight: []uint32{271},
})
clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, &AddLocalityOptions{
Health: []corepb.HealthStatus{corepb.HealthStatus_DRAINING},
Weight: []uint32{828},
})
return clab0.Build()
}(),
want: &EDSUpdate{
Drops: nil,
Localities: []Locality{
{
Endpoints: []Endpoint{{
Address: "addr1:314",
HealthStatus: EndpointHealthStatusUnhealthy,
Weight: 271,
}},
ID: Locality{SubZone: "locality-1"},
Priority: 1,
Weight: 1,
},
{
Endpoints: []Endpoint{{
Address: "addr2:159",
HealthStatus: EndpointHealthStatusDraining,
Weight: 828,
}},
ID: Locality{SubZone: "locality-2"},
Priority: 0,
Weight: 1,
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseEDSRespProto(tt.m)
if (err != nil) != tt.wantErr {
t.Errorf("ParseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr)
return
}
if d := cmp.Diff(got, tt.want); d != "" {
t.Errorf("ParseEDSRespProto() got = %v, want %v, diff: %v", got, tt.want, d)
}
})
}
}
var (
badlyMarshaledEDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: edsURL,
Value: []byte{1, 2, 3, 4},
},
},
TypeUrl: edsURL,
}
badResourceTypeInEDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: httpConnManagerURL,
Value: marshaledConnMgr1,
},
},
TypeUrl: edsURL,
}
goodEDSResponse1 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
func() *anypb.Any {
clab0 := NewClusterLoadAssignmentBuilder(goodEDSName, nil)
clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil)
clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil)
a, _ := ptypes.MarshalAny(clab0.Build())
return a
}(),
},
TypeUrl: edsURL,
}
goodEDSResponse2 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
func() *anypb.Any {
clab0 := NewClusterLoadAssignmentBuilder("not-goodEDSName", nil)
clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil)
clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil)
a, _ := ptypes.MarshalAny(clab0.Build())
return a
}(),
},
TypeUrl: edsURL,
}
)
func TestEDSHandleResponse(t *testing.T) {
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
tests := []struct {
name string
edsResponse *xdspb.DiscoveryResponse
wantErr bool
wantUpdate *EDSUpdate
wantUpdateErr bool
}{
// Any in resource is badly marshaled.
{
name: "badly-marshaled_response",
edsResponse: badlyMarshaledEDSResponse,
wantErr: true,
wantUpdate: nil,
wantUpdateErr: false,
},
// Response doesn't contain resource with the right type.
{
name: "no-config-in-response",
edsResponse: badResourceTypeInEDSResponse,
wantErr: true,
wantUpdate: nil,
wantUpdateErr: false,
},
// Response contains one uninteresting ClusterLoadAssignment.
{
name: "one-uninterestring-assignment",
edsResponse: goodEDSResponse2,
wantErr: false,
wantUpdate: nil,
wantUpdateErr: false,
},
// Response contains one good ClusterLoadAssignment.
{
name: "one-good-assignment",
edsResponse: goodEDSResponse1,
wantErr: false,
wantUpdate: &EDSUpdate{
Localities: []Locality{
{
Endpoints: []Endpoint{{Address: "addr1:314"}},
ID: Locality{SubZone: "locality-1"},
Priority: 1,
Weight: 1,
},
{
Endpoints: []Endpoint{{Address: "addr2:159"}},
ID: Locality{SubZone: "locality-2"},
Priority: 0,
Weight: 1,
},
},
},
wantUpdateErr: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testWatchHandle(t, &watchHandleTestcase{
responseToHandle: test.edsResponse,
wantHandleErr: test.wantErr,
wantUpdate: test.wantUpdate,
wantUpdateErr: test.wantUpdateErr,
edsWatch: v2c.watchEDS,
watchReqChan: fakeServer.XDSRequestChan,
handleXDSResp: v2c.handleEDSResponse,
})
})
}
}
// TestEDSHandleResponseWithoutWatch tests the case where the v2Client
// receives an EDS response without a registered EDS watcher.
func TestEDSHandleResponseWithoutWatch(t *testing.T) {
_, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
if v2c.handleEDSResponse(goodEDSResponse1) == nil {
t.Fatal("v2c.handleEDSResponse() succeeded, should have failed")
}
}
func TestEDSWatchExpiryTimer(t *testing.T) {
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
callbackCh := testutils.NewChannel()
v2c.watchEDS(goodRouteName1, func(u *EDSUpdate, err error) {
t.Logf("Received callback with edsUpdate {%+v} and error {%v}", u, err)
if u != nil {
callbackCh.Send(fmt.Errorf("received EDSUpdate %v in edsCallback, wanted nil", u))
}
if err == nil {
callbackCh.Send(errors.New("received nil error in edsCallback"))
}
callbackCh.Send(nil)
})
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an CDS request")
}
waitForNilErr(t, callbackCh)
}
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// All structs/functions in this file should be unexported. They are used in EDS
// balancer tests now, to generate test inputs. Eventually, EDS balancer tests
// should generate EDSUpdate directly, instead of generating and parsing the
// proto message.
// TODO: unexported everything in this file.
package xds
import (
"fmt"
"net"
"strconv"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
)
// ClusterLoadAssignmentBuilder builds a ClusterLoadAssignment, aka EDS
// response.
type ClusterLoadAssignmentBuilder struct {
v *xdspb.ClusterLoadAssignment
}
// NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder.
func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder {
var drops []*xdspb.ClusterLoadAssignment_Policy_DropOverload
for i, d := range dropPercents {
drops = append(drops, &xdspb.ClusterLoadAssignment_Policy_DropOverload{
Category: fmt.Sprintf("test-drop-%d", i),
DropPercentage: &typepb.FractionalPercent{
Numerator: d,
Denominator: typepb.FractionalPercent_HUNDRED,
},
})
}
return &ClusterLoadAssignmentBuilder{
v: &xdspb.ClusterLoadAssignment{
ClusterName: clusterName,
Policy: &xdspb.ClusterLoadAssignment_Policy{
DropOverloads: drops,
},
},
}
}
// AddLocalityOptions contains options when adding locality to the builder.
type AddLocalityOptions struct {
Health []corepb.HealthStatus
Weight []uint32
}
// AddLocality adds a locality to the builder.
func (clab *ClusterLoadAssignmentBuilder) AddLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *AddLocalityOptions) {
var lbEndPoints []*endpointpb.LbEndpoint
for i, a := range addrsWithPort {
host, portStr, err := net.SplitHostPort(a)
if err != nil {
panic("failed to split " + a)
}
port, err := strconv.Atoi(portStr)
if err != nil {
panic("failed to atoi " + portStr)
}
lbe := &endpointpb.LbEndpoint{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &corepb.Address{
Address: &corepb.Address_SocketAddress{
SocketAddress: &corepb.SocketAddress{
Protocol: corepb.SocketAddress_TCP,
Address: host,
PortSpecifier: &corepb.SocketAddress_PortValue{
PortValue: uint32(port)}}}}}},
}
if opts != nil {
if i < len(opts.Health) {
lbe.HealthStatus = opts.Health[i]
}
if i < len(opts.Weight) {
lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: opts.Weight[i]}
}
}
lbEndPoints = append(lbEndPoints, lbe)
}
var localityID *corepb.Locality
if subzone != "" {
localityID = &corepb.Locality{
Region: "",
Zone: "",
SubZone: subzone,
}
}
clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{
Locality: localityID,
LbEndpoints: lbEndPoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: weight},
Priority: priority,
})
}
// Build builds ClusterLoadAssignment.
func (clab *ClusterLoadAssignmentBuilder) Build() *xdspb.ClusterLoadAssignment {
return clab.v
}
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"fmt"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)
// Locality is xds.Locality without XXX fields, so it can be used as map
// keys.
//
// xds.Locality cannot be map keys because one of the XXX fields is a slice.
//
// This struct should only be used as map keys. Use the proto message directly
// in all other places.
type LocalityID struct {
Region string
Zone string
SubZone string
}
func (l LocalityID) String() string {
return fmt.Sprintf("%s-%s-%s", l.Region, l.Zone, l.SubZone)
}
// ToProto convert Locality to the proto representation.
func (l LocalityID) ToProto() *corepb.Locality {
return &corepb.Locality{
Region: l.Region,
Zone: l.Zone,
SubZone: l.SubZone,
}
}
package xds
import (
clog "github.com/coredns/coredns/plugin/pkg/log"
)
var log = clog.NewWithPlugin("traffic")
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"reflect"
"testing"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
type watchHandleTestcase struct {
responseToHandle *xdspb.DiscoveryResponse
wantHandleErr bool
wantUpdate interface{}
wantUpdateErr bool
// Only one of the following should be non-nil. The one corresponding with
// typeURL will be called.
ldsWatch func(target string, ldsCb ldsCallback) (cancel func())
rdsWatch func(routeName string, rdsCb rdsCallback) (cancel func())
cdsWatch func(clusterName string, cdsCb cdsCallback) (cancel func())
edsWatch func(clusterName string, edsCb edsCallback) (cancel func())
watchReqChan *testutils.Channel // The request sent for watch will be sent to this channel.
handleXDSResp func(response *xdspb.DiscoveryResponse) error
}
// testWatchHandle is called to test response handling for each xDS.
//
// It starts the xDS watch as configured in test, waits for the fake xds server
// to receive the request (so watch callback is installed), and calls
// handleXDSResp with responseToHandle (if it's set). It then compares the
// update received by watch callback with the expected results.
func testWatchHandle(t *testing.T, test *watchHandleTestcase) {
type updateErr struct {
u interface{}
err error
}
gotUpdateCh := testutils.NewChannel()
var cancelWatch func()
// Register the watcher, this will also trigger the v2Client to send the xDS
// request.
switch {
case test.ldsWatch != nil:
cancelWatch = test.ldsWatch(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("in v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err)
gotUpdateCh.Send(updateErr{u, err})
})
case test.rdsWatch != nil:
cancelWatch = test.rdsWatch(goodRouteName1, func(u rdsUpdate, err error) {
t.Logf("in v2c.watchRDS callback, rdsUpdate: %+v, err: %v", u, err)
gotUpdateCh.Send(updateErr{u, err})
})
case test.cdsWatch != nil:
cancelWatch = test.cdsWatch(clusterName1, func(u CDSUpdate, err error) {
t.Logf("in v2c.watchCDS callback, cdsUpdate: %+v, err: %v", u, err)
gotUpdateCh.Send(updateErr{u, err})
})
case test.edsWatch != nil:
cancelWatch = test.edsWatch(goodEDSName, func(u *EDSUpdate, err error) {
t.Logf("in v2c.watchEDS callback, edsUpdate: %+v, err: %v", u, err)
gotUpdateCh.Send(updateErr{*u, err})
})
default:
t.Fatalf("no watch() is set")
}
defer cancelWatch()
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
if _, err := test.watchReqChan.Receive(); err != nil {
t.Fatalf("Timeout waiting for an xDS request: %v", err)
}
// Directly push the response through a call to handleXDSResp. This bypasses
// the fakeServer, so it's only testing the handle logic. Client response
// processing is covered elsewhere.
//
// Also note that this won't trigger ACK, so there's no need to clear the
// request channel afterwards.
if err := test.handleXDSResp(test.responseToHandle); (err != nil) != test.wantHandleErr {
t.Fatalf("v2c.handleRDSResponse() returned err: %v, wantErr: %v", err, test.wantHandleErr)
}
// If the test doesn't expect the callback to be invoked, verify that no
// update or error is pushed to the callback.
//
// Cannot directly compare test.wantUpdate with nil (typed vs non-typed nil:
// https://golang.org/doc/faq#nil_error).
if c := test.wantUpdate; c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) {
update, err := gotUpdateCh.Receive()
if err == testutils.ErrRecvTimeout {
return
}
t.Fatalf("Unexpected update: +%v", update)
}
wantUpdate := reflect.ValueOf(test.wantUpdate).Elem().Interface()
uErr, err := gotUpdateCh.Receive()
if err == testutils.ErrRecvTimeout {
t.Fatal("Timeout expecting xDS update")
}
gotUpdate := uErr.(updateErr).u
opt := cmp.AllowUnexported(rdsUpdate{}, ldsUpdate{}, CDSUpdate{}, EDSUpdate{})
if diff := cmp.Diff(gotUpdate, wantUpdate, opt); diff != "" {
t.Fatalf("got update : %+v, want %+v, diff: %s", gotUpdate, wantUpdate, diff)
}
gotUpdateErr := uErr.(updateErr).err
if (gotUpdateErr != nil) != test.wantUpdateErr {
t.Fatalf("got xDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr)
}
}
// startServerAndGetCC starts a fake XDS server and also returns a ClientConn
// connected to it.
func startServerAndGetCC(t *testing.T) (*fakeserver.Server, *grpc.ClientConn, func()) {
t.Helper()
fs, sCleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
cc, ccCleanup, err := fs.XDSClientConn()
if err != nil {
sCleanup()
t.Fatalf("Failed to get a clientConn to the fake xDS server: %v", err)
}
return fs, cc, func() {
sCleanup()
ccCleanup()
}
}
// waitForNilErr waits for a nil error value to be received on the
// provided channel.
func waitForNilErr(t *testing.T, ch *testutils.Channel) {
t.Helper()
val, err := ch.Receive()
if err == testutils.ErrRecvTimeout {
t.Fatalf("Timeout expired when expecting update")
}
if val != nil {
if cbErr := val.(error); cbErr != nil {
t.Fatal(cbErr)
}
}
}
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"time"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
)
type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
const (
cdsURL = "type.googleapis.com/envoy.api.v2.Cluster"
edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)
// watchState is an enum to represent the state of a watch call.
type watchState int
const (
watchEnqueued watchState = iota
watchCancelled
watchStarted
)
// watchInfo holds all the information about a watch call.
type watchInfo struct {
typeURL string
target []string
state watchState
callback interface{}
expiryTimer *time.Timer
}
// cancel marks the state as cancelled, and also stops the expiry timer.
func (wi *watchInfo) cancel() {
wi.state = watchCancelled
if wi.expiryTimer != nil {
wi.expiryTimer.Stop()
}
}
// stopTimer stops the expiry timer without cancelling the watch.
func (wi *watchInfo) stopTimer() {
if wi.expiryTimer != nil {
wi.expiryTimer.Stop()
}
}
type ackInfo struct {
typeURL string
version string // Nack if version is an empty string.
nonce string
}
// CDSUpdate contains information from a received CDS response, which is of
// interest to the registered CDS watcher.
type CDSUpdate struct {
// ServiceName is the service name corresponding to the clusterName which
// is being watched for through CDS.
ServiceName string
}
type cdsCallback func(CDSUpdate, error)
type edsCallback func(*EDSUpdate, error)
This diff is collapsed.
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"fmt"
"strconv"
"testing"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/proto"
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
// compareXDSRequest reads requests from channel, compare it with want.
func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, version, nonce string) error {
val, err := ch.Receive()
if err != nil {
return err
}
req := val.(*fakeserver.Request)
if req.Err != nil {
return fmt.Errorf("unexpected error from request: %v", req.Err)
}
wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest)
wantClone.VersionInfo = version
wantClone.ResponseNonce = nonce
if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) {
return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone))
}
return nil
}
func sendXDSRespWithVersion(ch chan<- *fakeserver.Response, respWithoutVersion *xdspb.DiscoveryResponse, version int) (nonce string) {
respToSend := proto.Clone(respWithoutVersion).(*xdspb.DiscoveryResponse)
respToSend.VersionInfo = strconv.Itoa(version)
nonce = strconv.Itoa(int(time.Now().UnixNano()))
respToSend.Nonce = nonce
ch <- &fakeserver.Response{Resp: respToSend}
return
}
// startXDS calls watch to send the first request. It then sends a good response
// and checks for ack.
func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest) *testutils.Channel {
callbackCh := testutils.NewChannel()
switch xdsname {
case "LDS":
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
case "RDS":
v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
case "CDS":
v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
case "EDS":
v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
}
if err := compareXDSRequest(reqChan, req, "", ""); err != nil {
t.Fatalf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("FakeServer received %s request...", xdsname)
return callbackCh
}
// sendGoodResp sends the good response, with the given version, and a random
// nonce.
//
// It also waits and checks that the ack request contains the given version, and
// the generated nonce.
func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) {
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version)
t.Logf("Good %s response pushed to fakeServer...", xdsname)
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version), nonce); err != nil {
t.Errorf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("Good %s response acked", xdsname)
if _, err := callbackCh.Receive(); err != nil {
t.Errorf("Timeout when expecting %s update", xdsname)
}
t.Logf("Good %s response callback executed", xdsname)
}
// sendBadResp sends a bad response with the given version. This response will
// be nacked, so we expect a request with the previous version (version-1).
//
// But the nonce in request should be the new nonce.
func sendBadResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, wantReq *xdspb.DiscoveryRequest) {
var typeURL string
switch xdsname {
case "LDS":
typeURL = ldsURL
case "RDS":
typeURL = rdsURL
case "CDS":
typeURL = cdsURL
case "EDS":
typeURL = edsURL
}
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: typeURL,
}, version)
t.Logf("Bad %s response pushed to fakeServer...", xdsname)
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version-1), nonce); err != nil {
t.Errorf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("Bad %s response nacked", xdsname)
}
// TestV2ClientAck verifies that valid responses are acked, and invalid ones
// are nacked.
//
// This test also verifies the version for different types are independent.
func TestV2ClientAck(t *testing.T) {
var (
versionLDS = 1000
versionRDS = 2000
versionCDS = 3000
versionEDS = 4000
)
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
cbRDS := startXDS(t, "RDS", v2c, fakeServer.XDSRequestChan, goodRDSRequest)
sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS)
versionRDS++
cbCDS := startXDS(t, "CDS", v2c, fakeServer.XDSRequestChan, goodCDSRequest)
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
versionCDS++
cbEDS := startXDS(t, "EDS", v2c, fakeServer.XDSRequestChan, goodEDSRequest)
sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS)
versionEDS++
// Send a bad response, and check for nack.
sendBadResp(t, "LDS", fakeServer, versionLDS, goodLDSRequest)
versionLDS++
sendBadResp(t, "RDS", fakeServer, versionRDS, goodRDSRequest)
versionRDS++
sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest)
versionCDS++
sendBadResp(t, "EDS", fakeServer, versionEDS, goodEDSRequest)
versionEDS++
// send another good response, and check for ack, with the new version.
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS)
versionRDS++
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
versionCDS++
sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS)
versionEDS++
}
// Test when the first response is invalid, and is nacked, the nack requests
// should have an empty version string.
func TestV2ClientAckFirstIsNack(t *testing.T) {
var versionLDS = 1000
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: ldsURL,
}, versionLDS)
t.Logf("Bad response pushed to fakeServer...")
// The expected version string is an empty string, because this is the first
// response, and it's nacked (so there's no previous ack version).
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
versionLDS++
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
}
// Test when a nack is sent after a new watch, we nack with the previous acked
// version (instead of resetting to empty string).
func TestV2ClientAckNackAfterNewWatch(t *testing.T) {
var versionLDS = 1000
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
// Start a new watch.
cbLDS = startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
// This is an invalid response after the new watch.
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: ldsURL,
}, versionLDS)
t.Logf("Bad response pushed to fakeServer...")
// The expected version string is the previous acked version.
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
versionLDS++
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment