Commit 45f11f32 authored by Miek Gieben's avatar Miek Gieben

Version clusters - not endpoints yet

Signed-off-by: default avatarMiek Gieben <miek@miek.nl>
parent cc872778
...@@ -106,7 +106,7 @@ func (c *Client) Run() { ...@@ -106,7 +106,7 @@ func (c *Client) Run() {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
tick := time.NewTicker(10 * time.Second) tick := time.NewTicker(1 * time.Second)
for { for {
select { select {
case <-tick.C: case <-tick.C:
...@@ -164,6 +164,7 @@ func (c *Client) Receive(stream adsStream) error { ...@@ -164,6 +164,7 @@ func (c *Client) Receive(stream adsStream) error {
switch resp.GetTypeUrl() { switch resp.GetTypeUrl() {
case cdsURL: case cdsURL:
a := &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)}
for _, r := range resp.GetResources() { for _, r := range resp.GetResources() {
var any ptypes.DynamicAny var any ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &any); err != nil { if err := ptypes.UnmarshalAny(r, &any); err != nil {
...@@ -174,24 +175,25 @@ func (c *Client) Receive(stream adsStream) error { ...@@ -174,24 +175,25 @@ func (c *Client) Receive(stream adsStream) error {
if !ok { if !ok {
continue continue
} }
c.assignments.setClusterLoadAssignment(cluster.GetName(), nil) a.setClusterLoadAssignment(cluster.GetName(), nil)
} }
log.Debugf("Cluster discovery processed with %d resources", len(resp.GetResources())) log.Debugf("Cluster discovery processed with %d resources", len(resp.GetResources()))
// ack the CDS proto, with we we've got. (empty version would be NACK) // ack the CDS proto, with we we've got. (empty version would be NACK)
if err := c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), c.assignments.clusters()); err != nil { if err := c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), a.clusters()); err != nil {
log.Debug(err) log.Debug(err)
continue continue
} }
// need to figure out how to handle the versions and nounces exactly. // need to figure out how to handle the versions and nounces exactly.
c.SetNonce(resp.GetNonce())
c.SetAssignments(a)
// now kick off discovery for endpoints // now kick off discovery for endpoints
if err := c.endpointDiscovery(stream, "", resp.GetNonce(), c.assignments.clusters()); err != nil { if err := c.endpointDiscovery(stream, "", resp.GetNonce(), a.clusters()); err != nil {
log.Debug(err) log.Debug(err)
continue continue
} }
c.SetNonce(resp.GetNonce())
case edsURL: case edsURL:
for _, r := range resp.GetResources() { for _, r := range resp.GetResources() {
var any ptypes.DynamicAny var any ptypes.DynamicAny
......
...@@ -11,3 +11,15 @@ func (c *Client) SetNonce(n string) { ...@@ -11,3 +11,15 @@ func (c *Client) SetNonce(n string) {
defer c.mu.Unlock() defer c.mu.Unlock()
c.nonce = n c.nonce = n
} }
func (c *Client) Assignments() *assignment {
c.mu.RLock()
defer c.mu.RUnlock()
return c.assignments
}
func (c *Client) SetAssignments(a *assignment) {
c.mu.Lock()
defer c.mu.Unlock()
c.assignments = a
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment