Commit 0e897776 authored by Manuel Alejandro de Brito Fontes's avatar Manuel Alejandro de Brito Fontes Committed by Miek Gieben

Dep helper (#2151)

* Add dep task to update go dependencies

* Update go dependencies
parent 8f8b81f5
......@@ -95,7 +95,7 @@ $ go get -u github.com/golang/dep/cmd/dep
Use the following to update the locked versions of all dependencies
```sh
$ dep ensure -update
$ make dep-ensure
```
To add a dependency to the project, you might run
......
This diff is collapsed.
......@@ -96,3 +96,10 @@ presubmit:
clean:
go clean
rm -f coredns
.PHONY: dep-ensure
dep-ensure:
dep version || go get -u github.com/golang/dep/cmd/dep
dep ensure -v
dep prune -v
find vendor -name '*_test.go' -delete
package opentracing
import (
"testing"
ot "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
)
func TestConfigurationDefaults(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
assert.Equal(true, config.Enabled)
assert.Equal(false, config.Debug)
assert.Equal(float64(1), config.SampleRate)
assert.Equal("opentracing.test", config.ServiceName)
assert.Equal("localhost", config.AgentHostname)
assert.Equal("8126", config.AgentPort)
}
func TestConfiguration(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
config.SampleRate = 0
config.ServiceName = "api-intake"
config.AgentHostname = "ddagent.consul.local"
config.AgentPort = "58126"
tracer, closer, err := NewTracer(config)
assert.NotNil(tracer)
assert.NotNil(closer)
assert.Nil(err)
assert.Equal("api-intake", tracer.(*Tracer).config.ServiceName)
}
func TestTracerServiceName(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
config.ServiceName = ""
tracer, closer, err := NewTracer(config)
assert.Nil(tracer)
assert.Nil(closer)
assert.NotNil(err)
assert.Equal("A Datadog Tracer requires a valid `ServiceName` set", err.Error())
}
func TestDisabledTracer(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
config.Enabled = false
tracer, closer, err := NewTracer(config)
assert.IsType(&ot.NoopTracer{}, tracer)
assert.IsType(&noopCloser{}, closer)
assert.Nil(err)
}
package opentracing
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSpanContextBaggage(t *testing.T) {
assert := assert.New(t)
ctx := SpanContext{}
ctx = ctx.WithBaggageItem("key", "value")
assert.Equal("value", ctx.baggage["key"])
}
func TestSpanContextIterator(t *testing.T) {
assert := assert.New(t)
baggageIterator := make(map[string]string)
ctx := SpanContext{baggage: map[string]string{"key": "value"}}
ctx.ForeachBaggageItem(func(k, v string) bool {
baggageIterator[k] = v
return true
})
assert.Len(baggageIterator, 1)
assert.Equal("value", baggageIterator["key"])
}
package opentracing_test
import (
"context"
opentracing "github.com/opentracing/opentracing-go"
)
// You can leverage the Golang `Context` for intra-process propagation of
// Spans. In this example we create a root Span, so that it can be reused
// in a nested function to create a child Span.
func Example_startContext() {
// create a new root Span and return a new Context that includes
// the Span itself
ctx := context.Background()
rootSpan, ctx := opentracing.StartSpanFromContext(ctx, "web.request")
defer rootSpan.Finish()
requestHandler(ctx)
}
func requestHandler(ctx context.Context) {
// retrieve the previously set root Span
span := opentracing.SpanFromContext(ctx)
span.SetTag("resource.name", "/")
// or simply create a new child Span from the previous Context
childSpan, _ := opentracing.StartSpanFromContext(ctx, "sql.query")
defer childSpan.Finish()
}
package opentracing_test
import (
// ddtrace namespace is suggested
ddtrace "github.com/DataDog/dd-trace-go/opentracing"
opentracing "github.com/opentracing/opentracing-go"
)
func Example_initialization() {
// create a Tracer configuration
config := ddtrace.NewConfiguration()
config.ServiceName = "api-intake"
config.AgentHostname = "ddagent.consul.local"
// initialize a Tracer and ensure a graceful shutdown
// using the `closer.Close()`
tracer, closer, err := ddtrace.NewTracer(config)
if err != nil {
// handle the configuration error
}
defer closer.Close()
// set the Datadog tracer as a GlobalTracer
opentracing.SetGlobalTracer(tracer)
startWebServer()
}
func startWebServer() {
// start a web server
}
package opentracing_test
import (
opentracing "github.com/opentracing/opentracing-go"
)
// You can use the GlobalTracer to create a root Span. If you need to create a hierarchy,
// simply use the `ChildOf` reference
func Example_startSpan() {
// use the GlobalTracer previously set
rootSpan := opentracing.StartSpan("web.request")
defer rootSpan.Finish()
// set the reference to create a hierarchy of spans
reference := opentracing.ChildOf(rootSpan.Context())
childSpan := opentracing.StartSpan("sql.query", reference)
defer childSpan.Finish()
dbQuery()
}
func dbQuery() {
// start a database query
}
package opentracing
import (
"net/http"
"strconv"
"testing"
opentracing "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
)
func TestTracerPropagationDefaults(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
tracer, _, _ := NewTracer(config)
root := tracer.StartSpan("web.request")
ctx := root.Context()
headers := http.Header{}
// inject the SpanContext
carrier := opentracing.HTTPHeadersCarrier(headers)
err := tracer.Inject(ctx, opentracing.HTTPHeaders, carrier)
assert.Nil(err)
// retrieve the SpanContext
propagated, err := tracer.Extract(opentracing.HTTPHeaders, carrier)
assert.Nil(err)
tCtx, ok := ctx.(SpanContext)
assert.True(ok)
tPropagated, ok := propagated.(SpanContext)
assert.True(ok)
// compare if there is a Context match
assert.Equal(tCtx.traceID, tPropagated.traceID)
assert.Equal(tCtx.spanID, tPropagated.spanID)
// ensure a child can be created
child := tracer.StartSpan("db.query", opentracing.ChildOf(propagated))
tRoot, ok := root.(*Span)
assert.True(ok)
tChild, ok := child.(*Span)
assert.True(ok)
assert.NotEqual(uint64(0), tChild.Span.TraceID)
assert.NotEqual(uint64(0), tChild.Span.SpanID)
assert.Equal(tRoot.Span.SpanID, tChild.Span.ParentID)
assert.Equal(tRoot.Span.TraceID, tChild.Span.ParentID)
tid := strconv.FormatUint(tRoot.Span.TraceID, 10)
pid := strconv.FormatUint(tRoot.Span.SpanID, 10)
// hardcode header names to fail test if defaults are changed
assert.Equal(headers.Get("x-datadog-trace-id"), tid)
assert.Equal(headers.Get("x-datadog-parent-id"), pid)
}
func TestTracerTextMapPropagationHeader(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
config.TextMapPropagator = NewTextMapPropagator("bg-", "tid", "pid")
tracer, _, _ := NewTracer(config)
root := tracer.StartSpan("web.request").SetBaggageItem("item", "x").(*Span)
ctx := root.Context()
headers := http.Header{}
carrier := opentracing.HTTPHeadersCarrier(headers)
err := tracer.Inject(ctx, opentracing.HTTPHeaders, carrier)
assert.Nil(err)
tid := strconv.FormatUint(root.Span.TraceID, 10)
pid := strconv.FormatUint(root.Span.SpanID, 10)
assert.Equal(headers.Get("tid"), tid)
assert.Equal(headers.Get("pid"), pid)
assert.Equal(headers.Get("bg-item"), "x")
}
package opentracing
import (
"errors"
"testing"
"time"
opentracing "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
)
func TestSpanBaggage(t *testing.T) {
assert := assert.New(t)
span := NewSpan("web.request")
span.SetBaggageItem("key", "value")
assert.Equal("value", span.BaggageItem("key"))
}
func TestSpanContext(t *testing.T) {
assert := assert.New(t)
span := NewSpan("web.request")
assert.NotNil(span.Context())
}
func TestSpanOperationName(t *testing.T) {
assert := assert.New(t)
span := NewSpan("web.request")
span.SetOperationName("http.request")
assert.Equal("http.request", span.Span.Name)
}
func TestSpanFinish(t *testing.T) {
assert := assert.New(t)
span := NewSpan("web.request")
span.Finish()
assert.True(span.Span.Duration > 0)
}
func TestSpanFinishWithTime(t *testing.T) {
assert := assert.New(t)
finishTime := time.Now().Add(10 * time.Second)
span := NewSpan("web.request")
span.FinishWithOptions(opentracing.FinishOptions{FinishTime: finishTime})
duration := finishTime.UnixNano() - span.Span.Start
assert.Equal(duration, span.Span.Duration)
}
func TestSpanSetTag(t *testing.T) {
assert := assert.New(t)
span := NewSpan("web.request")
span.SetTag("component", "tracer")
assert.Equal("tracer", span.Meta["component"])
span.SetTag("tagInt", 1234)
assert.Equal("1234", span.Meta["tagInt"])
}
func TestSpanSetDatadogTags(t *testing.T) {
assert := assert.New(t)
span := NewSpan("web.request")
span.SetTag("span.type", "http")
span.SetTag("service.name", "db-cluster")
span.SetTag("resource.name", "SELECT * FROM users;")
assert.Equal("http", span.Span.Type)
assert.Equal("db-cluster", span.Span.Service)
assert.Equal("SELECT * FROM users;", span.Span.Resource)
}
func TestSpanSetErrorTag(t *testing.T) {
assert := assert.New(t)
for _, tt := range []struct {
name string // span name
val interface{} // tag value
msg string // error message
typ string // error type
}{
{
name: "error.error",
val: errors.New("some error"),
msg: "some error",
typ: "*errors.errorString",
},
{
name: "error.string",
val: "some string error",
msg: "some string error",
typ: "*errors.errorString",
},
{
name: "error.struct",
val: struct{ N int }{5},
msg: "{5}",
typ: "*errors.errorString",
},
{
name: "error.other",
val: 1,
msg: "1",
typ: "*errors.errorString",
},
{
name: "error.nil",
val: nil,
msg: "",
typ: "",
},
} {
span := NewSpan(tt.name)
span.SetTag(Error, tt.val)
assert.Equal(span.Meta["error.msg"], tt.msg)
assert.Equal(span.Meta["error.type"], tt.typ)
if tt.val != nil {
assert.NotEqual(span.Meta["error.stack"], "")
}
}
}
package opentracing
import (
"testing"
"time"
ddtrace "github.com/DataDog/dd-trace-go/tracer"
opentracing "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
)
func TestDefaultTracer(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
tracer, _, _ := NewTracer(config)
tTracer, ok := tracer.(*Tracer)
assert.True(ok)
assert.Equal(tTracer.impl, ddtrace.DefaultTracer)
}
func TestTracerStartSpan(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
tracer, _, _ := NewTracer(config)
span, ok := tracer.StartSpan("web.request").(*Span)
assert.True(ok)
assert.NotEqual(uint64(0), span.Span.TraceID)
assert.NotEqual(uint64(0), span.Span.SpanID)
assert.Equal(uint64(0), span.Span.ParentID)
assert.Equal("web.request", span.Span.Name)
assert.Equal("opentracing.test", span.Span.Service)
assert.NotNil(span.Span.Tracer())
}
func TestTracerStartChildSpan(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
tracer, _, _ := NewTracer(config)
root := tracer.StartSpan("web.request")
child := tracer.StartSpan("db.query", opentracing.ChildOf(root.Context()))
tRoot, ok := root.(*Span)
assert.True(ok)
tChild, ok := child.(*Span)
assert.True(ok)
assert.NotEqual(uint64(0), tChild.Span.TraceID)
assert.NotEqual(uint64(0), tChild.Span.SpanID)
assert.Equal(tRoot.Span.SpanID, tChild.Span.ParentID)
assert.Equal(tRoot.Span.TraceID, tChild.Span.ParentID)
}
func TestTracerBaggagePropagation(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
tracer, _, _ := NewTracer(config)
root := tracer.StartSpan("web.request")
root.SetBaggageItem("key", "value")
child := tracer.StartSpan("db.query", opentracing.ChildOf(root.Context()))
context, ok := child.Context().(SpanContext)
assert.True(ok)
assert.Equal("value", context.baggage["key"])
}
func TestTracerBaggageImmutability(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
tracer, _, _ := NewTracer(config)
root := tracer.StartSpan("web.request")
root.SetBaggageItem("key", "value")
child := tracer.StartSpan("db.query", opentracing.ChildOf(root.Context()))
child.SetBaggageItem("key", "changed!")
parentContext, ok := root.Context().(SpanContext)
assert.True(ok)
childContext, ok := child.Context().(SpanContext)
assert.True(ok)
assert.Equal("value", parentContext.baggage["key"])
assert.Equal("changed!", childContext.baggage["key"])
}
func TestTracerSpanTags(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
tracer, _, _ := NewTracer(config)
tag := opentracing.Tag{Key: "key", Value: "value"}
span, ok := tracer.StartSpan("web.request", tag).(*Span)
assert.True(ok)
assert.Equal("value", span.Span.Meta["key"])
}
func TestTracerSpanGlobalTags(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
config.GlobalTags["key"] = "value"
tracer, _, _ := NewTracer(config)
span := tracer.StartSpan("web.request").(*Span)
assert.Equal("value", span.Span.Meta["key"])
child := tracer.StartSpan("db.query", opentracing.ChildOf(span.Context())).(*Span)
assert.Equal("value", child.Span.Meta["key"])
}
func TestTracerSpanStartTime(t *testing.T) {
assert := assert.New(t)
config := NewConfiguration()
tracer, _, _ := NewTracer(config)
startTime := time.Now().Add(-10 * time.Second)
span, ok := tracer.StartSpan("web.request", opentracing.StartTime(startTime)).(*Span)
assert.True(ok)
assert.Equal(startTime.UnixNano(), span.Span.Start)
}
package tracer
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
const (
testInitSize = 2
testMaxSize = 5
)
func TestSpanBufferPushOne(t *testing.T) {
assert := assert.New(t)
buffer := newSpanBuffer(newTracerChans(), testInitSize, testMaxSize)
assert.NotNil(buffer)
assert.Len(buffer.spans, 0)
traceID := NextSpanID()
root := NewSpan("name1", "a-service", "a-resource", traceID, traceID, 0, nil)
root.buffer = buffer
buffer.Push(root)
assert.Len(buffer.spans, 1, "there is one span in the buffer")
assert.Equal(root, buffer.spans[0], "the span is the one pushed before")
root.Finish()
select {
case trace := <-buffer.channels.trace:
assert.Len(trace, 1, "there was a trace in the channel")
assert.Equal(root, trace[0], "the trace in the channel is the one pushed before")
assert.Equal(0, buffer.Len(), "no more spans in the buffer")
case err := <-buffer.channels.err:
assert.Fail("unexpected error:", err.Error())
t.Logf("buffer: %v", buffer)
}
}
func TestSpanBufferPushNoFinish(t *testing.T) {
assert := assert.New(t)
buffer := newSpanBuffer(newTracerChans(), testInitSize, testMaxSize)
assert.NotNil(buffer)
assert.Len(buffer.spans, 0)
traceID := NextSpanID()
root := NewSpan("name1", "a-service", "a-resource", traceID, traceID, 0, nil)
root.buffer = buffer
buffer.Push(root)
assert.Len(buffer.spans, 1, "there is one span in the buffer")
assert.Equal(root, buffer.spans[0], "the span is the one pushed before")
select {
case <-buffer.channels.trace:
assert.Fail("span was not finished, should not be flushed")
t.Logf("buffer: %v", buffer)
case err := <-buffer.channels.err:
assert.Fail("unexpected error:", err.Error())
t.Logf("buffer: %v", buffer)
case <-time.After(time.Second / 10):
t.Logf("expected timeout, nothing should show up in buffer as the trace is not finished")
}
}
func TestSpanBufferPushSeveral(t *testing.T) {
assert := assert.New(t)
buffer := newSpanBuffer(newTracerChans(), testInitSize, testMaxSize)
assert.NotNil(buffer)
assert.Len(buffer.spans, 0)
traceID := NextSpanID()
root := NewSpan("name1", "a-service", "a-resource", traceID, traceID, 0, nil)
span2 := NewSpan("name2", "a-service", "a-resource", NextSpanID(), traceID, root.SpanID, nil)
span3 := NewSpan("name3", "a-service", "a-resource", NextSpanID(), traceID, root.SpanID, nil)
span3a := NewSpan("name3", "a-service", "a-resource", NextSpanID(), traceID, span3.SpanID, nil)
spans := []*Span{root, span2, span3, span3a}
for i, span := range spans {
span.buffer = buffer
buffer.Push(span)
assert.Len(buffer.spans, i+1, "there is one more span in the buffer")
assert.Equal(span, buffer.spans[i], "the span is the one pushed before")
}
for _, span := range spans {
span.Finish()
}
select {
case trace := <-buffer.channels.trace:
assert.Len(trace, 4, "there was one trace with the right number of spans in the channel")
for _, span := range spans {
assert.Contains(trace, span, "the trace contains the spans")
}
case err := <-buffer.channels.err:
assert.Fail("unexpected error:", err.Error())
}
}
package tracer
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestPushTrace(t *testing.T) {
assert := assert.New(t)
channels := newTracerChans()
trace := []*Span{
&Span{
Name: "pylons.request",
Service: "pylons",
Resource: "/",
},
&Span{
Name: "pylons.request",
Service: "pylons",
Resource: "/foo",
},
}
channels.pushTrace(trace)
assert.Len(channels.trace, 1, "there should be data in channel")
assert.Len(channels.traceFlush, 0, "no flush requested yet")
pushed := <-channels.trace
assert.Equal(trace, pushed)
many := traceChanLen/2 + 1
for i := 0; i < many; i++ {
channels.pushTrace(make([]*Span, i))
}
assert.Len(channels.trace, many, "all traces should be in the channel, not yet blocking")
assert.Len(channels.traceFlush, 1, "a trace flush should have been requested")
for i := 0; i < cap(channels.trace); i++ {
channels.pushTrace(make([]*Span, i))
}
assert.Len(channels.trace, traceChanLen, "buffer should be full")
assert.NotEqual(0, len(channels.err), "there should be an error logged")
err := <-channels.err
assert.Equal(&errorTraceChanFull{Len: traceChanLen}, err)
}
func TestPushService(t *testing.T) {
assert := assert.New(t)
channels := newTracerChans()
service := Service{
Name: "redis-master",
App: "redis",
AppType: "db",
}
channels.pushService(service)
assert.Len(channels.service, 1, "there should be data in channel")
assert.Len(channels.serviceFlush, 0, "no flush requested yet")
pushed := <-channels.service
assert.Equal(service, pushed)
many := serviceChanLen/2 + 1
for i := 0; i < many; i++ {
channels.pushService(Service{
Name: fmt.Sprintf("service%d", i),
App: "custom",
AppType: "web",
})
}
assert.Len(channels.service, many, "all services should be in the channel, not yet blocking")
assert.Len(channels.serviceFlush, 1, "a service flush should have been requested")
for i := 0; i < cap(channels.service); i++ {
channels.pushService(Service{
Name: fmt.Sprintf("service%d", i),
App: "custom",
AppType: "web",
})
}
assert.Len(channels.service, serviceChanLen, "buffer should be full")
assert.NotEqual(0, len(channels.err), "there should be an error logged")
err := <-channels.err
assert.Equal(&errorServiceChanFull{Len: serviceChanLen}, err)
}
func TestPushErr(t *testing.T) {
assert := assert.New(t)
channels := newTracerChans()
err := fmt.Errorf("ooops")
channels.pushErr(err)
assert.Len(channels.err, 1, "there should be data in channel")
assert.Len(channels.errFlush, 0, "no flush requested yet")
pushed := <-channels.err
assert.Equal(err, pushed)
many := errChanLen/2 + 1
for i := 0; i < many; i++ {
channels.pushErr(fmt.Errorf("err %d", i))
}
assert.Len(channels.err, many, "all errs should be in the channel, not yet blocking")
assert.Len(channels.errFlush, 1, "a err flush should have been requested")
for i := 0; i < cap(channels.err); i++ {
channels.pushErr(fmt.Errorf("err %d", i))
}
// if we reach this, means pushErr is not blocking, which is what we want to double-check
}
package tracer
import (
"testing"
"context"
"github.com/stretchr/testify/assert"
)
func TestContextWithSpanDefault(t *testing.T) {
assert := assert.New(t)
// create a new context with a span
span := SpanFromContextDefault(nil)
assert.NotNil(span)
ctx := context.Background()
assert.NotNil(SpanFromContextDefault(ctx))
}
func TestSpanFromContext(t *testing.T) {
assert := assert.New(t)
// create a new context with a span
ctx := context.Background()
tracer := NewTracer()
expectedSpan := tracer.NewRootSpan("pylons.request", "pylons", "/")
ctx = ContextWithSpan(ctx, expectedSpan)
span, ok := SpanFromContext(ctx)
assert.True(ok)
assert.Equal(expectedSpan, span)
}
func TestSpanFromContextNil(t *testing.T) {
assert := assert.New(t)
// create a context without a span
ctx := context.Background()
span, ok := SpanFromContext(ctx)
assert.False(ok)
assert.Nil(span)
span, ok = SpanFromContext(nil)
assert.False(ok)
assert.Nil(span)
}
func TestSpanMissingParent(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()
// assuming we're in an inner function and we
// forget the nil or ok checks
ctx := context.Background()
span, _ := SpanFromContext(ctx)
// span is nil according to the API
child := tracer.NewChildSpan("redis.command", span)
child.Finish()
// the child is finished but it's not recorded in
// the tracer buffer because the service is missing
assert.True(child.Duration > 0)
assert.Equal(1, len(tracer.channels.trace))
}
package tracer
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/ugorji/go/codec"
)
func TestEncoderContentType(t *testing.T) {
assert := assert.New(t)
testCases := []struct {
encoder Encoder
contentType string
}{
{newJSONEncoder(), "application/json"},
{newMsgpackEncoder(), "application/msgpack"},
}
for _, tc := range testCases {
assert.Equal(tc.contentType, tc.encoder.ContentType())
}
}
func TestJSONEncoding(t *testing.T) {
assert := assert.New(t)
testCases := []struct {
traces int
size int
}{
{1, 1},
{3, 1},
{1, 3},
{3, 3},
}
for _, tc := range testCases {
payload := getTestTrace(tc.traces, tc.size)
encoder := newJSONEncoder()
err := encoder.EncodeTraces(payload)
assert.Nil(err)
// decode to check the right encoding
var traces [][]*Span
dec := json.NewDecoder(encoder.buffer)
err = dec.Decode(&traces)
assert.Nil(err)
assert.Len(traces, tc.traces)
for _, trace := range traces {
assert.Len(trace, tc.size)
span := trace[0]
assert.Equal(uint64(42), span.TraceID)
assert.Equal(uint64(52), span.SpanID)
assert.Equal(uint64(42), span.ParentID)
assert.Equal("web", span.Type)
assert.Equal("high.throughput", span.Service)
assert.Equal("sending.events", span.Name)
assert.Equal("SEND /data", span.Resource)
assert.Equal(int64(1481215590883401105), span.Start)
assert.Equal(int64(1000000000), span.Duration)
assert.Equal("192.168.0.1", span.Meta["http.host"])
assert.Equal(float64(41.99), span.Metrics["http.monitor"])
}
}
}
func TestMsgpackEncoding(t *testing.T) {
assert := assert.New(t)
testCases := []struct {
traces int
size int
}{
{1, 1},
{3, 1},
{1, 3},
{3, 3},
}
for _, tc := range testCases {
payload := getTestTrace(tc.traces, tc.size)
encoder := newMsgpackEncoder()
err := encoder.EncodeTraces(payload)
assert.Nil(err)
// decode to check the right encoding
var traces [][]*Span
var mh codec.MsgpackHandle
dec := codec.NewDecoder(encoder.buffer, &mh)
err = dec.Decode(&traces)
assert.Nil(err)
assert.Len(traces, tc.traces)
for _, trace := range traces {
assert.Len(trace, tc.size)
span := trace[0]
assert.Equal(uint64(42), span.TraceID)
assert.Equal(uint64(52), span.SpanID)
assert.Equal(uint64(42), span.ParentID)
assert.Equal("web", span.Type)
assert.Equal("high.throughput", span.Service)
assert.Equal("sending.events", span.Name)
assert.Equal("SEND /data", span.Resource)
assert.Equal(int64(1481215590883401105), span.Start)
assert.Equal(int64(1000000000), span.Duration)
assert.Equal("192.168.0.1", span.Meta["http.host"])
assert.Equal(float64(41.99), span.Metrics["http.monitor"])
}
}
}
package tracer
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestErrorSpanBufFull(t *testing.T) {
assert := assert.New(t)
err := &errorSpanBufFull{Len: 42}
assert.Equal("span buffer is full (length: 42)", err.Error())
assert.Equal("ErrorSpanBufFull", errorKey(err))
}
func TestErrorTraceChanFull(t *testing.T) {
assert := assert.New(t)
err := &errorTraceChanFull{Len: 42}
assert.Equal("trace channel is full (length: 42)", err.Error())
assert.Equal("ErrorTraceChanFull", errorKey(err))
}
func TestErrorServiceChanFull(t *testing.T) {
assert := assert.New(t)
err := &errorServiceChanFull{Len: 42}
assert.Equal("service channel is full (length: 42)", err.Error())
assert.Equal("ErrorServiceChanFull", errorKey(err))
}
func TestErrorTraceIDMismatch(t *testing.T) {
assert := assert.New(t)
err := &errorTraceIDMismatch{Expected: 42, Actual: 65535}
assert.Equal("trace ID mismatch (expected: 2a actual: ffff)", err.Error())
assert.Equal("ErrorTraceIDMismatch", errorKey(err))
}
func TestErrorNoSpanBuf(t *testing.T) {
assert := assert.New(t)
err := &errorNoSpanBuf{SpanName: "do"}
assert.Equal("no span buffer (span name: 'do')", err.Error())
}
func TestErrorFlushLostTraces(t *testing.T) {
assert := assert.New(t)
err := &errorFlushLostTraces{Nb: 100}
assert.Equal("unable to flush traces, lost 100 traces", err.Error())
}
func TestErrorFlushLostServices(t *testing.T) {
assert := assert.New(t)
err := &errorFlushLostServices{Nb: 100}
assert.Equal("unable to flush services, lost 100 services", err.Error())
}
func TestErrorKey(t *testing.T) {
assert := assert.New(t)
assert.Equal("this is something unexpected", errorKey(fmt.Errorf("this is something unexpected")))
assert.Equal("", errorKey(nil))
}
func TestAggregateErrors(t *testing.T) {
assert := assert.New(t)
errChan := make(chan error, 100)
errChan <- &errorSpanBufFull{Len: 1000}
errChan <- &errorSpanBufFull{Len: 1000}
errChan <- &errorSpanBufFull{Len: 1000}
errChan <- &errorSpanBufFull{Len: 1000}
errChan <- &errorFlushLostTraces{Nb: 42}
errChan <- &errorTraceIDMismatch{Expected: 42, Actual: 1}
errChan <- &errorTraceIDMismatch{Expected: 42, Actual: 4095}
errs := aggregateErrors(errChan)
assert.Equal(map[string]errorSummary{
"ErrorSpanBufFull": errorSummary{
Count: 4,
Example: "span buffer is full (length: 1000)",
},
"ErrorTraceIDMismatch": errorSummary{
Count: 2,
Example: "trace ID mismatch (expected: 2a actual: fff)",
},
"ErrorFlushLostTraces": errorSummary{
Count: 1,
Example: "unable to flush traces, lost 42 traces",
},
}, errs)
}
package tracer_test
import (
"context"
"fmt"
"io"
"log"
"net/http"
"os"
"github.com/DataDog/dd-trace-go/tracer"
)
func saveFile(ctx context.Context, path string, r io.Reader) error {
// Start a new span that is the child of the span stored in the context, and
// attach it to the current context. If the context has no span, it will
// return an empty root span.
span, ctx := tracer.NewChildSpanWithContext("filestore.saveFile", ctx)
defer span.Finish()
// save the file contents.
file, err := os.Create(path)
if err != nil {
span.SetError(err)
return err
}
defer file.Close()
_, err = io.Copy(file, r)
span.SetError(err)
return err
}
func saveFileHandler(w http.ResponseWriter, r *http.Request) {
// the name of the operation we're measuring
name := "http.request"
service := "example-filestore"
resource := "/saveFile"
// This is the main entry point of our application, so we create a root span
// that includes the service and resource name.
span := tracer.NewRootSpan(name, service, resource)
defer span.Finish()
// Add the span to the request's context so we can pass the tracing information
// down the stack.
ctx := span.Context(r.Context())
// Do the work.
err := saveFile(ctx, "/tmp/example", r.Body)
span.SetError(err) // no-op if err == nil
if err != nil {
http.Error(w, fmt.Sprintf("error saving file! %s", err), 500)
return
}
w.Write([]byte("saved file!"))
}
// Tracing the hierarchy of spans in a request is a key part of tracing. This, for example,
// let's a developer associate all of the database calls in a web request. As of Go 1.7,
// the standard way of doing this is with the context package. Along with supporting
// deadlines, cancellation signals and more, Contexts are perfect for passing (optional)
// telemetry data through your stack.
//
// Read more about contexts here: https://golang.org/pkg/context/
//
// Here is an example illustrating how to pass tracing data with contexts.
func Example_context() {
http.HandleFunc("/saveFile", saveFileHandler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
package tracer_test
import (
"net/http"
"github.com/DataDog/dd-trace-go/tracer"
)
func Example() {
span := tracer.NewRootSpan("http.client.request", "example.com", "/user/{id}")
defer span.Finish()
url := "http://example.com/user/123"
resp, err := http.Get(url)
if err != nil {
span.SetError(err)
return
}
span.SetMeta("http.status", resp.Status)
span.SetMeta("http.url", url)
}
package tracer
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/DataDog/dd-trace-go/tracer/ext"
)
func TestSpanStart(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
// a new span sets the Start after the initialization
assert.NotEqual(int64(0), span.Start)
}
func TestSpanString(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
// don't bother checking the contents, just make sure it works.
assert.NotEqual("", span.String())
span.Finish()
assert.NotEqual("", span.String())
}
func TestSpanSetMeta(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
// check the map is properly initialized
span.SetMeta("status.code", "200")
assert.Equal("200", span.Meta["status.code"])
// operating on a finished span is a no-op
nMeta := len(span.Meta)
span.Finish()
span.SetMeta("finished.test", "true")
assert.Equal(len(span.Meta), nMeta)
assert.Equal(span.Meta["finished.test"], "")
}
func TestSpanSetMetas(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
span.SetSamplingPriority(0) // avoid interferences with "_sampling_priority_v1" meta
metas := map[string]string{
"error.msg": "Something wrong",
"error.type": "*errors.errorString",
"status.code": "200",
"system.pid": "29176",
}
extraMetas := map[string]string{
"custom.1": "something custom",
"custom.2": "something even more special",
}
nopMetas := map[string]string{
"nopKey1": "nopValue1",
"nopKey2": "nopValue2",
}
// check the map is properly initialized
span.SetMetas(metas)
assert.Equal(len(metas), len(span.Meta))
for k := range metas {
assert.Equal(metas[k], span.Meta[k])
}
// check a second call adds the new metas, but does not remove old ones
span.SetMetas(extraMetas)
assert.Equal(len(metas)+len(extraMetas), len(span.Meta))
for k := range extraMetas {
assert.Equal(extraMetas[k], span.Meta[k])
}
assert.Equal(span.Meta["status.code"], "200")
// operating on a finished span is a no-op
span.Finish()
span.SetMetas(nopMetas)
assert.Equal(len(metas)+len(extraMetas), len(span.Meta))
for k := range nopMetas {
assert.Equal("", span.Meta[k])
}
}
func TestSpanSetMetric(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
// check the map is properly initialized
span.SetMetric("bytes", 1024.42)
assert.Equal(1, len(span.Metrics))
assert.Equal(1024.42, span.Metrics["bytes"])
// operating on a finished span is a no-op
span.Finish()
span.SetMetric("finished.test", 1337)
assert.Equal(1, len(span.Metrics))
assert.Equal(0.0, span.Metrics["finished.test"])
}
func TestSpanError(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
// check the error is set in the default meta
err := errors.New("Something wrong")
span.SetError(err)
assert.Equal(int32(1), span.Error)
assert.Equal("Something wrong", span.Meta["error.msg"])
assert.Equal("*errors.errorString", span.Meta["error.type"])
assert.NotEqual("", span.Meta["error.stack"])
// operating on a finished span is a no-op
span = tracer.NewRootSpan("flask.request", "flask", "/")
nMeta := len(span.Meta)
span.Finish()
span.SetError(err)
assert.Equal(int32(0), span.Error)
assert.Equal(nMeta, len(span.Meta))
assert.Equal("", span.Meta["error.msg"])
assert.Equal("", span.Meta["error.type"])
assert.Equal("", span.Meta["error.stack"])
}
func TestSpanError_Typed(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
// check the error is set in the default meta
err := &boomError{}
span.SetError(err)
assert.Equal(int32(1), span.Error)
assert.Equal("boom", span.Meta["error.msg"])
assert.Equal("*tracer.boomError", span.Meta["error.type"])
assert.NotEqual("", span.Meta["error.stack"])
}
func TestEmptySpan(t *testing.T) {
// ensure the empty span won't crash the app
var span Span
span.SetMeta("a", "b")
span.SetError(nil)
span.Finish()
var s *Span
s.SetMeta("a", "b")
s.SetError(nil)
s.Finish()
}
func TestSpanErrorNil(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
// don't set the error if it's nil
nMeta := len(span.Meta)
span.SetError(nil)
assert.Equal(int32(0), span.Error)
assert.Equal(nMeta, len(span.Meta))
}
func TestSpanFinish(t *testing.T) {
assert := assert.New(t)
wait := time.Millisecond * 2
tracer := NewTracer()
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
// the finish should set finished and the duration
time.Sleep(wait)
span.Finish()
assert.True(span.Duration > int64(wait))
assert.True(span.finished)
}
func TestSpanFinishTwice(t *testing.T) {
assert := assert.New(t)
wait := time.Millisecond * 2
tracer, _ := getTestTracer()
defer tracer.Stop()
assert.Len(tracer.channels.trace, 0)
// the finish must be idempotent
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
time.Sleep(wait)
span.Finish()
assert.Len(tracer.channels.trace, 1)
previousDuration := span.Duration
time.Sleep(wait)
span.Finish()
assert.Equal(previousDuration, span.Duration)
assert.Len(tracer.channels.trace, 1)
}
func TestSpanContext(t *testing.T) {
ctx := context.Background()
_, ok := SpanFromContext(ctx)
assert.False(t, ok)
tracer := NewTracer()
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
ctx = span.Context(ctx)
s2, ok := SpanFromContext(ctx)
assert.True(t, ok)
assert.Equal(t, span.SpanID, s2.SpanID)
}
// Prior to a bug fix, this failed when running `go test -race`
func TestSpanModifyWhileFlushing(t *testing.T) {
tracer, _ := getTestTracer()
defer tracer.Stop()
done := make(chan struct{})
go func() {
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
span.Finish()
// It doesn't make much sense to update the span after it's been finished,
// but an error in a user's code could lead to this.
span.SetMeta("race_test", "true")
span.SetMetric("race_test2", 133.7)
span.SetMetrics("race_test3", 133.7)
span.SetError(errors.New("t"))
done <- struct{}{}
}()
run := true
for run {
select {
case <-done:
run = false
default:
tracer.flushTraces()
}
}
}
func TestSpanSamplingPriority(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()
span := tracer.NewRootSpan("my.name", "my.service", "my.resource")
assert.Equal(0.0, span.Metrics["_sampling_priority_v1"], "default sampling priority if undefined is 0")
assert.False(span.HasSamplingPriority(), "by default, sampling priority is undefined")
assert.Equal(0, span.GetSamplingPriority(), "default sampling priority for root spans is 0")
childSpan := tracer.NewChildSpan("my.child", span)
assert.Equal(span.Metrics["_sampling_priority_v1"], childSpan.Metrics["_sampling_priority_v1"])
assert.Equal(span.HasSamplingPriority(), childSpan.HasSamplingPriority())
assert.Equal(span.GetSamplingPriority(), childSpan.GetSamplingPriority())
for _, priority := range []int{
ext.PriorityUserReject,
ext.PriorityAutoReject,
ext.PriorityAutoKeep,
ext.PriorityUserKeep,
999, // not used yet, but we should allow it
} {
span.SetSamplingPriority(priority)
assert.True(span.HasSamplingPriority())
assert.Equal(priority, span.GetSamplingPriority())
childSpan = tracer.NewChildSpan("my.child", span)
assert.Equal(span.Metrics["_sampling_priority_v1"], childSpan.Metrics["_sampling_priority_v1"])
assert.Equal(span.HasSamplingPriority(), childSpan.HasSamplingPriority())
assert.Equal(span.GetSamplingPriority(), childSpan.GetSamplingPriority())
}
}
type boomError struct{}
func (e *boomError) Error() string { return "boom" }
package tracer
import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
)
func BenchmarkNormalTimeNow(b *testing.B) {
for n := 0; n < b.N; n++ {
lowPrecisionNow()
}
}
func BenchmarkHighPrecisionTime(b *testing.B) {
for n := 0; n < b.N; n++ {
highPrecisionNow()
}
}
func TestHighPrecisionTimerIsMoreAccurate(t *testing.T) {
startLow := lowPrecisionNow()
startHigh := highPrecisionNow()
stopHigh := highPrecisionNow()
for stopHigh == startHigh {
stopHigh = highPrecisionNow()
}
stopLow := lowPrecisionNow()
assert.Equal(t, int64(0), stopLow-startLow)
}
package tracer
import (
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
// getTestSpan returns a Span with different fields set
func getTestSpan() *Span {
return &Span{
TraceID: 42,
SpanID: 52,
ParentID: 42,
Type: "web",
Service: "high.throughput",
Name: "sending.events",
Resource: "SEND /data",
Start: 1481215590883401105,
Duration: 1000000000,
Meta: map[string]string{"http.host": "192.168.0.1"},
Metrics: map[string]float64{"http.monitor": 41.99},
}
}
// getTestTrace returns a list of traces that is composed by ``traceN`` number
// of traces, each one composed by ``size`` number of spans.
func getTestTrace(traceN, size int) [][]*Span {
var traces [][]*Span
for i := 0; i < traceN; i++ {
trace := []*Span{}
for j := 0; j < size; j++ {
trace = append(trace, getTestSpan())
}
traces = append(traces, trace)
}
return traces
}
func getTestServices() map[string]Service {
return map[string]Service{
"svc1": Service{Name: "scv1", App: "a", AppType: "b"},
"svc2": Service{Name: "scv2", App: "c", AppType: "d"},
}
}
type mockDatadogAPIHandler struct {
t *testing.T
}
func (m mockDatadogAPIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
assert := assert.New(m.t)
header := r.Header.Get("X-Datadog-Trace-Count")
assert.NotEqual("", header, "X-Datadog-Trace-Count header should be here")
count, err := strconv.Atoi(header)
assert.Nil(err, "header should be an int")
assert.NotEqual(0, count, "there should be a non-zero amount of traces")
}
func mockDatadogAPINewServer(t *testing.T) *httptest.Server {
handler := mockDatadogAPIHandler{t: t}
server := httptest.NewServer(handler)
return server
}
func TestTracesAgentIntegration(t *testing.T) {
assert := assert.New(t)
testCases := []struct {
payload [][]*Span
}{
{getTestTrace(1, 1)},
{getTestTrace(10, 1)},
{getTestTrace(1, 10)},
{getTestTrace(10, 10)},
}
for _, tc := range testCases {
transport := newHTTPTransport(defaultHostname, defaultPort)
response, err := transport.SendTraces(tc.payload)
assert.NoError(err)
assert.NotNil(response)
assert.Equal(200, response.StatusCode)
}
}
func TestAPIDowngrade(t *testing.T) {
assert := assert.New(t)
transport := newHTTPTransport(defaultHostname, defaultPort)
transport.traceURL = "http://localhost:8126/v0.0/traces"
// if we get a 404 we should downgrade the API
traces := getTestTrace(2, 2)
response, err := transport.SendTraces(traces)
assert.NoError(err)
assert.NotNil(response)
assert.Equal(200, response.StatusCode)
}
func TestEncoderDowngrade(t *testing.T) {
assert := assert.New(t)
transport := newHTTPTransport(defaultHostname, defaultPort)
transport.traceURL = "http://localhost:8126/v0.2/traces"
// if we get a 415 because of a wrong encoder, we should downgrade the encoder
traces := getTestTrace(2, 2)
response, err := transport.SendTraces(traces)
assert.NoError(err)
assert.NotNil(response)
assert.Equal(200, response.StatusCode)
}
func TestTransportServices(t *testing.T) {
assert := assert.New(t)
transport := newHTTPTransport(defaultHostname, defaultPort)
response, err := transport.SendServices(getTestServices())
assert.NoError(err)
assert.NotNil(response)
assert.Equal(200, response.StatusCode)
}
func TestTransportServicesDowngrade_0_0(t *testing.T) {
assert := assert.New(t)
transport := newHTTPTransport(defaultHostname, defaultPort)
transport.serviceURL = "http://localhost:8126/v0.0/services"
response, err := transport.SendServices(getTestServices())
assert.NoError(err)
assert.NotNil(response)
assert.Equal(200, response.StatusCode)
}
func TestTransportServicesDowngrade_0_2(t *testing.T) {
assert := assert.New(t)
transport := newHTTPTransport(defaultHostname, defaultPort)
transport.serviceURL = "http://localhost:8126/v0.2/services"
response, err := transport.SendServices(getTestServices())
assert.NoError(err)
assert.NotNil(response)
assert.Equal(200, response.StatusCode)
}
func TestTransportEncoderPool(t *testing.T) {
assert := assert.New(t)
transport := newHTTPTransport(defaultHostname, defaultPort)
// MsgpackEncoder is the default encoder of the pool
encoder := transport.getEncoder()
assert.Equal("application/msgpack", encoder.ContentType())
}
func TestTransportSwitchEncoder(t *testing.T) {
assert := assert.New(t)
transport := newHTTPTransport(defaultHostname, defaultPort)
transport.changeEncoder(jsonEncoderFactory)
// MsgpackEncoder is the default encoder of the pool
encoder := transport.getEncoder()
assert.Equal("application/json", encoder.ContentType())
}
func TestTraceCountHeader(t *testing.T) {
assert := assert.New(t)
testCases := []struct {
payload [][]*Span
}{
{getTestTrace(1, 1)},
{getTestTrace(10, 1)},
{getTestTrace(100, 10)},
}
receiver := mockDatadogAPINewServer(t)
parsedURL, err := url.Parse(receiver.URL)
assert.NoError(err)
host := parsedURL.Host
hostItems := strings.Split(host, ":")
assert.Equal(2, len(hostItems), "port should be given, as it's chosen randomly")
hostname := hostItems[0]
port := hostItems[1]
for _, tc := range testCases {
transport := newHTTPTransport(hostname, port)
response, err := transport.SendTraces(tc.payload)
assert.NoError(err)
assert.NotNil(response)
assert.Equal(200, response.StatusCode)
}
receiver.Close()
}
package sarama
import "testing"
var (
aclCreateRequest = []byte{
0, 0, 0, 1,
3, // resource type = group
0, 5, 'g', 'r', 'o', 'u', 'p',
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
2, // all
2, // deny
}
)
func TestCreateAclsRequest(t *testing.T) {
req := &CreateAclsRequest{
AclCreations: []*AclCreation{{
Resource: Resource{
ResourceType: AclResourceGroup,
ResourceName: "group",
},
Acl: Acl{
Principal: "principal",
Host: "host",
Operation: AclOperationAll,
PermissionType: AclPermissionDeny,
}},
},
}
testRequest(t, "create request", req, aclCreateRequest)
}
package sarama
import (
"testing"
"time"
)
var (
createResponseWithError = []byte{
0, 0, 0, 100,
0, 0, 0, 1,
0, 42,
0, 5, 'e', 'r', 'r', 'o', 'r',
}
createResponseArray = []byte{
0, 0, 0, 100,
0, 0, 0, 2,
0, 42,
0, 5, 'e', 'r', 'r', 'o', 'r',
0, 0,
255, 255,
}
)
func TestCreateAclsResponse(t *testing.T) {
errmsg := "error"
resp := &CreateAclsResponse{
ThrottleTime: 100 * time.Millisecond,
AclCreationResponses: []*AclCreationResponse{{
Err: ErrInvalidRequest,
ErrMsg: &errmsg,
}},
}
testResponse(t, "response with error", resp, createResponseWithError)
resp.AclCreationResponses = append(resp.AclCreationResponses, new(AclCreationResponse))
testResponse(t, "response array", resp, createResponseArray)
}
package sarama
import "testing"
var (
aclDeleteRequestNulls = []byte{
0, 0, 0, 1,
1,
255, 255,
255, 255,
255, 255,
11,
3,
}
aclDeleteRequest = []byte{
0, 0, 0, 1,
1, // any
0, 6, 'f', 'i', 'l', 't', 'e', 'r',
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
4, // write
3, // allow
}
aclDeleteRequestArray = []byte{
0, 0, 0, 2,
1,
0, 6, 'f', 'i', 'l', 't', 'e', 'r',
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
4, // write
3, // allow
2,
0, 5, 't', 'o', 'p', 'i', 'c',
255, 255,
255, 255,
6,
2,
}
)
func TestDeleteAclsRequest(t *testing.T) {
req := &DeleteAclsRequest{
Filters: []*AclFilter{{
ResourceType: AclResourceAny,
Operation: AclOperationAlterConfigs,
PermissionType: AclPermissionAllow,
}},
}
testRequest(t, "delete request nulls", req, aclDeleteRequestNulls)
req.Filters[0].ResourceName = nullString("filter")
req.Filters[0].Principal = nullString("principal")
req.Filters[0].Host = nullString("host")
req.Filters[0].Operation = AclOperationWrite
testRequest(t, "delete request", req, aclDeleteRequest)
req.Filters = append(req.Filters, &AclFilter{
ResourceType: AclResourceTopic,
ResourceName: nullString("topic"),
Operation: AclOperationDelete,
PermissionType: AclPermissionDeny,
})
testRequest(t, "delete request array", req, aclDeleteRequestArray)
}
package sarama
import (
"testing"
"time"
)
var (
deleteAclsResponse = []byte{
0, 0, 0, 100,
0, 0, 0, 1,
0, 0, // no error
255, 255, // no error message
0, 0, 0, 1, // 1 matching acl
0, 0, // no error
255, 255, // no error message
2, // resource type
0, 5, 't', 'o', 'p', 'i', 'c',
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
4,
3,
}
)
func TestDeleteAclsResponse(t *testing.T) {
resp := &DeleteAclsResponse{
ThrottleTime: 100 * time.Millisecond,
FilterResponses: []*FilterResponse{{
MatchingAcls: []*MatchingAcl{{
Resource: Resource{ResourceType: AclResourceTopic, ResourceName: "topic"},
Acl: Acl{Principal: "principal", Host: "host", Operation: AclOperationWrite, PermissionType: AclPermissionAllow},
}},
}},
}
testResponse(t, "", resp, deleteAclsResponse)
}
package sarama
import (
"testing"
)
var (
aclDescribeRequest = []byte{
2, // resource type
0, 5, 't', 'o', 'p', 'i', 'c',
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
5, // acl operation
3, // acl permission type
}
)
func TestAclDescribeRequest(t *testing.T) {
resourcename := "topic"
principal := "principal"
host := "host"
req := &DescribeAclsRequest{
AclFilter{
ResourceType: AclResourceTopic,
ResourceName: &resourcename,
Principal: &principal,
Host: &host,
Operation: AclOperationCreate,
PermissionType: AclPermissionAllow,
},
}
testRequest(t, "", req, aclDescribeRequest)
}
package sarama
import (
"testing"
"time"
)
var aclDescribeResponseError = []byte{
0, 0, 0, 100,
0, 8, // error
0, 5, 'e', 'r', 'r', 'o', 'r',
0, 0, 0, 1, // 1 resource
2, // cluster type
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 1, // 1 acl
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
4, // write
3, // allow
}
func TestAclDescribeResponse(t *testing.T) {
errmsg := "error"
resp := &DescribeAclsResponse{
ThrottleTime: 100 * time.Millisecond,
Err: ErrBrokerNotAvailable,
ErrMsg: &errmsg,
ResourceAcls: []*ResourceAcls{{
Resource: Resource{
ResourceName: "topic",
ResourceType: AclResourceTopic,
},
Acls: []*Acl{
{
Principal: "principal",
Host: "host",
Operation: AclOperationWrite,
PermissionType: AclPermissionAllow,
},
},
}},
}
testResponse(t, "describe", resp, aclDescribeResponseError)
}
package sarama
import "testing"
var (
addOffsetsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64,
0, 0,
0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd',
}
)
func TestAddOffsetsToTxnRequest(t *testing.T) {
req := &AddOffsetsToTxnRequest{
TransactionalID: "txn",
ProducerID: 8000,
ProducerEpoch: 0,
GroupID: "groupid",
}
testRequest(t, "", req, addOffsetsToTxnRequest)
}
package sarama
import (
"testing"
"time"
)
var (
addOffsetsToTxnResponse = []byte{
0, 0, 0, 100,
0, 47,
}
)
func TestAddOffsetsToTxnResponse(t *testing.T) {
resp := &AddOffsetsToTxnResponse{
ThrottleTime: 100 * time.Millisecond,
Err: ErrInvalidProducerEpoch,
}
testResponse(t, "", resp, addOffsetsToTxnResponse)
}
package sarama
import "testing"
var (
addPartitionsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64, // ProducerID
0, 0, 0, 0, // ProducerEpoch
0, 1, // 1 topic
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 1, 0, 0, 0, 1,
}
)
func TestAddPartitionsToTxnRequest(t *testing.T) {
req := &AddPartitionsToTxnRequest{
TransactionalID: "txn",
ProducerID: 8000,
ProducerEpoch: 0,
TopicPartitions: map[string][]int32{
"topic": []int32{1},
},
}
testRequest(t, "", req, addPartitionsToTxnRequest)
}
package sarama
import (
"testing"
"time"
)
var (
addPartitionsToTxnResponse = []byte{
0, 0, 0, 100,
0, 0, 0, 1,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 1, // 1 partition error
0, 0, 0, 2, // partition 2
0, 48, // error
}
)
func TestAddPartitionsToTxnResponse(t *testing.T) {
resp := &AddPartitionsToTxnResponse{
ThrottleTime: 100 * time.Millisecond,
Errors: map[string][]*PartitionError{
"topic": []*PartitionError{&PartitionError{
Err: ErrInvalidTxnState,
Partition: 2,
}},
},
}
testResponse(t, "", resp, addPartitionsToTxnResponse)
}
package sarama
import "testing"
var (
emptyAlterConfigsRequest = []byte{
0, 0, 0, 0, // 0 configs
0, // don't Validate
}
singleAlterConfigsRequest = []byte{
0, 0, 0, 1, // 1 config
2, // a topic
0, 3, 'f', 'o', 'o', // topic name: foo
0, 0, 0, 1, //1 config name
0, 10, // 10 chars
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, 4,
'1', '0', '0', '0',
0, // don't validate
}
doubleAlterConfigsRequest = []byte{
0, 0, 0, 2, // 2 config
2, // a topic
0, 3, 'f', 'o', 'o', // topic name: foo
0, 0, 0, 1, //1 config name
0, 10, // 10 chars
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, 4,
'1', '0', '0', '0',
2, // a topic
0, 3, 'b', 'a', 'r', // topic name: foo
0, 0, 0, 1, //2 config
0, 12, // 12 chars
'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's',
0, 4,
'1', '0', '0', '0',
0, // don't validate
}
)
func TestAlterConfigsRequest(t *testing.T) {
var request *AlterConfigsRequest
request = &AlterConfigsRequest{
Resources: []*AlterConfigsResource{},
}
testRequest(t, "no requests", request, emptyAlterConfigsRequest)
configValue := "1000"
request = &AlterConfigsRequest{
Resources: []*AlterConfigsResource{
&AlterConfigsResource{
Type: TopicResource,
Name: "foo",
ConfigEntries: map[string]*string{
"segment.ms": &configValue,
},
},
},
}
testRequest(t, "one config", request, singleAlterConfigsRequest)
request = &AlterConfigsRequest{
Resources: []*AlterConfigsResource{
&AlterConfigsResource{
Type: TopicResource,
Name: "foo",
ConfigEntries: map[string]*string{
"segment.ms": &configValue,
},
},
&AlterConfigsResource{
Type: TopicResource,
Name: "bar",
ConfigEntries: map[string]*string{
"retention.ms": &configValue,
},
},
},
}
testRequest(t, "two configs", request, doubleAlterConfigsRequest)
}
package sarama
import (
"testing"
)
var (
alterResponseEmpty = []byte{
0, 0, 0, 0, //throttle
0, 0, 0, 0, // no configs
}
alterResponsePopulated = []byte{
0, 0, 0, 0, //throttle
0, 0, 0, 1, // response
0, 0, //errorcode
0, 0, //string
2, // topic
0, 3, 'f', 'o', 'o',
}
)
func TestAlterConfigsResponse(t *testing.T) {
var response *AlterConfigsResponse
response = &AlterConfigsResponse{
Resources: []*AlterConfigsResourceResponse{},
}
testVersionDecodable(t, "empty", response, alterResponseEmpty, 0)
if len(response.Resources) != 0 {
t.Error("Expected no groups")
}
response = &AlterConfigsResponse{
Resources: []*AlterConfigsResourceResponse{
&AlterConfigsResourceResponse{
ErrorCode: 0,
ErrorMsg: "",
Type: TopicResource,
Name: "foo",
},
},
}
testResponse(t, "response with error", response, alterResponsePopulated)
}
package sarama
import "testing"
var (
apiVersionRequest = []byte{}
)
func TestApiVersionsRequest(t *testing.T) {
var request *ApiVersionsRequest
request = new(ApiVersionsRequest)
testRequest(t, "basic", request, apiVersionRequest)
}
package sarama
import "testing"
var (
apiVersionResponse = []byte{
0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x03,
0x00, 0x02,
0x00, 0x01,
}
)
func TestApiVersionsResponse(t *testing.T) {
var response *ApiVersionsResponse
response = new(ApiVersionsResponse)
testVersionDecodable(t, "no error", response, apiVersionResponse, 0)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
}
if response.ApiVersions[0].ApiKey != 0x03 {
t.Error("Decoding error: expected 0x03 but got", response.ApiVersions[0].ApiKey)
}
if response.ApiVersions[0].MinVersion != 0x02 {
t.Error("Decoding error: expected 0x02 but got", response.ApiVersions[0].MinVersion)
}
if response.ApiVersions[0].MaxVersion != 0x01 {
t.Error("Decoding error: expected 0x01 but got", response.ApiVersions[0].MaxVersion)
}
}
package sarama
import (
"fmt"
"testing"
"time"
)
func ExampleBroker() {
broker := NewBroker("localhost:9092")
err := broker.Open(nil)
if err != nil {
panic(err)
}
request := MetadataRequest{Topics: []string{"myTopic"}}
response, err := broker.GetMetadata(&request)
if err != nil {
_ = broker.Close()
panic(err)
}
fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
if err = broker.Close(); err != nil {
panic(err)
}
}
type mockEncoder struct {
bytes []byte
}
func (m mockEncoder) encode(pe packetEncoder) error {
return pe.putRawBytes(m.bytes)
}
type brokerMetrics struct {
bytesRead int
bytesWritten int
}
func TestBrokerAccessors(t *testing.T) {
broker := NewBroker("abc:123")
if broker.ID() != -1 {
t.Error("New broker didn't have an ID of -1.")
}
if broker.Addr() != "abc:123" {
t.Error("New broker didn't have the correct address")
}
broker.id = 34
if broker.ID() != 34 {
t.Error("Manually setting broker ID did not take effect.")
}
}
func TestSimpleBrokerCommunication(t *testing.T) {
for _, tt := range brokerTestTable {
Logger.Printf("Testing broker communication for %s", tt.name)
mb := NewMockBroker(t, 0)
mb.Returns(&mockEncoder{tt.response})
pendingNotify := make(chan brokerMetrics)
// Register a callback to be notified about successful requests
mb.SetNotifier(func(bytesRead, bytesWritten int) {
pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
})
broker := NewBroker(mb.Addr())
// Set the broker id in order to validate local broker metrics
broker.id = 0
conf := NewConfig()
conf.Version = tt.version
err := broker.Open(conf)
if err != nil {
t.Fatal(err)
}
tt.runner(t, broker)
// Wait up to 500 ms for the remote broker to process the request and
// notify us about the metrics
timeout := 500 * time.Millisecond
select {
case mockBrokerMetrics := <-pendingNotify:
validateBrokerMetrics(t, broker, mockBrokerMetrics)
case <-time.After(timeout):
t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
}
mb.Close()
err = broker.Close()
if err != nil {
t.Error(err)
}
}
}
// We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
var brokerTestTable = []struct {
version KafkaVersion
name string
response []byte
runner func(*testing.T, *Broker)
}{
{V0_10_0_0,
"MetadataRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := MetadataRequest{}
response, err := broker.GetMetadata(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("Metadata request got no response!")
}
}},
{V0_10_0_0,
"ConsumerMetadataRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ConsumerMetadataRequest{}
response, err := broker.GetConsumerMetadata(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("Consumer Metadata request got no response!")
}
}},
{V0_10_0_0,
"ProduceRequest (NoResponse)",
[]byte{},
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
request.RequiredAcks = NoResponse
response, err := broker.Produce(&request)
if err != nil {
t.Error(err)
}
if response != nil {
t.Error("Produce request with NoResponse got a response!")
}
}},
{V0_10_0_0,
"ProduceRequest (WaitForLocal)",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}
request.RequiredAcks = WaitForLocal
response, err := broker.Produce(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("Produce request without NoResponse got no response!")
}
}},
{V0_10_0_0,
"FetchRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := FetchRequest{}
response, err := broker.Fetch(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("Fetch request got no response!")
}
}},
{V0_10_0_0,
"OffsetFetchRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetFetchRequest{}
response, err := broker.FetchOffset(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("OffsetFetch request got no response!")
}
}},
{V0_10_0_0,
"OffsetCommitRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetCommitRequest{}
response, err := broker.CommitOffset(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("OffsetCommit request got no response!")
}
}},
{V0_10_0_0,
"OffsetRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := OffsetRequest{}
response, err := broker.GetAvailableOffsets(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("Offset request got no response!")
}
}},
{V0_10_0_0,
"JoinGroupRequest",
[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := JoinGroupRequest{}
response, err := broker.JoinGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("JoinGroup request got no response!")
}
}},
{V0_10_0_0,
"SyncGroupRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := SyncGroupRequest{}
response, err := broker.SyncGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("SyncGroup request got no response!")
}
}},
{V0_10_0_0,
"LeaveGroupRequest",
[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := LeaveGroupRequest{}
response, err := broker.LeaveGroup(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("LeaveGroup request got no response!")
}
}},
{V0_10_0_0,
"HeartbeatRequest",
[]byte{0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := HeartbeatRequest{}
response, err := broker.Heartbeat(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("Heartbeat request got no response!")
}
}},
{V0_10_0_0,
"ListGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ListGroupsRequest{}
response, err := broker.ListGroups(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("ListGroups request got no response!")
}
}},
{V0_10_0_0,
"DescribeGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := DescribeGroupsRequest{}
response, err := broker.DescribeGroups(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("DescribeGroups request got no response!")
}
}},
{V0_10_0_0,
"ApiVersionsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := ApiVersionsRequest{}
response, err := broker.ApiVersions(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("ApiVersions request got no response!")
}
}},
{V1_1_0_0,
"DeleteGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := DeleteGroupsRequest{}
response, err := broker.DeleteGroups(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("DeleteGroups request got no response!")
}
}},
}
func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
metricValidators := newMetricValidators()
mockBrokerBytesRead := mockBrokerMetrics.bytesRead
mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
// Check that the number of bytes sent corresponds to what the mock broker received
metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
if mockBrokerBytesWritten == 0 {
// This a ProduceRequest with NoResponse
metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
} else {
metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
}
// Check that the number of bytes received corresponds to what the mock broker sent
metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
// Run the validators
metricValidators.run(t, broker.conf.MetricRegistry)
}
This diff is collapsed.
package sarama
import (
"math/big"
"net"
"testing"
"time"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
)
func TestTLS(t *testing.T) {
cakey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
t.Fatal(err)
}
clientkey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
t.Fatal(err)
}
hostkey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
t.Fatal(err)
}
nvb := time.Now().Add(-1 * time.Hour)
nva := time.Now().Add(1 * time.Hour)
caTemplate := &x509.Certificate{
Subject: pkix.Name{CommonName: "ca"},
Issuer: pkix.Name{CommonName: "ca"},
SerialNumber: big.NewInt(0),
NotAfter: nva,
NotBefore: nvb,
IsCA: true,
BasicConstraintsValid: true,
KeyUsage: x509.KeyUsageCertSign,
}
caDer, err := x509.CreateCertificate(rand.Reader, caTemplate, caTemplate, &cakey.PublicKey, cakey)
if err != nil {
t.Fatal(err)
}
caFinalCert, err := x509.ParseCertificate(caDer)
if err != nil {
t.Fatal(err)
}
hostDer, err := x509.CreateCertificate(rand.Reader, &x509.Certificate{
Subject: pkix.Name{CommonName: "host"},
Issuer: pkix.Name{CommonName: "ca"},
IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1)},
SerialNumber: big.NewInt(0),
NotAfter: nva,
NotBefore: nvb,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
}, caFinalCert, &hostkey.PublicKey, cakey)
if err != nil {
t.Fatal(err)
}
clientDer, err := x509.CreateCertificate(rand.Reader, &x509.Certificate{
Subject: pkix.Name{CommonName: "client"},
Issuer: pkix.Name{CommonName: "ca"},
SerialNumber: big.NewInt(0),
NotAfter: nva,
NotBefore: nvb,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
}, caFinalCert, &clientkey.PublicKey, cakey)
if err != nil {
t.Fatal(err)
}
pool := x509.NewCertPool()
pool.AddCert(caFinalCert)
systemCerts, err := x509.SystemCertPool()
if err != nil {
t.Fatal(err)
}
// Keep server the same - it's the client that we're testing
serverTLSConfig := &tls.Config{
Certificates: []tls.Certificate{tls.Certificate{
Certificate: [][]byte{hostDer},
PrivateKey: hostkey,
}},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: pool,
}
for _, tc := range []struct {
Succeed bool
Server, Client *tls.Config
}{
{ // Verify client fails if wrong CA cert pool is specified
Succeed: false,
Server: serverTLSConfig,
Client: &tls.Config{
RootCAs: systemCerts,
Certificates: []tls.Certificate{tls.Certificate{
Certificate: [][]byte{clientDer},
PrivateKey: clientkey,
}},
},
},
{ // Verify client fails if wrong key is specified
Succeed: false,
Server: serverTLSConfig,
Client: &tls.Config{
RootCAs: pool,
Certificates: []tls.Certificate{tls.Certificate{
Certificate: [][]byte{clientDer},
PrivateKey: hostkey,
}},
},
},
{ // Verify client fails if wrong cert is specified
Succeed: false,
Server: serverTLSConfig,
Client: &tls.Config{
RootCAs: pool,
Certificates: []tls.Certificate{tls.Certificate{
Certificate: [][]byte{hostDer},
PrivateKey: clientkey,
}},
},
},
{ // Verify client fails if no CAs are specified
Succeed: false,
Server: serverTLSConfig,
Client: &tls.Config{
Certificates: []tls.Certificate{tls.Certificate{
Certificate: [][]byte{clientDer},
PrivateKey: clientkey,
}},
},
},
{ // Verify client fails if no keys are specified
Succeed: false,
Server: serverTLSConfig,
Client: &tls.Config{
RootCAs: pool,
},
},
{ // Finally, verify it all works happily with client and server cert in place
Succeed: true,
Server: serverTLSConfig,
Client: &tls.Config{
RootCAs: pool,
Certificates: []tls.Certificate{tls.Certificate{
Certificate: [][]byte{clientDer},
PrivateKey: clientkey,
}},
},
},
} {
doListenerTLSTest(t, tc.Succeed, tc.Server, tc.Client)
}
}
func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientConfig *tls.Config) {
serverConfig.BuildNameToCertificate()
clientConfig.BuildNameToCertificate()
seedListener, err := tls.Listen("tcp", "127.0.0.1:0", serverConfig)
if err != nil {
t.Fatal("cannot open listener", err)
}
var childT *testing.T
if expectSuccess {
childT = t
} else {
childT = &testing.T{} // we want to swallow errors
}
seedBroker := NewMockBrokerListener(childT, 1, seedListener)
defer seedBroker.Close()
seedBroker.Returns(new(MetadataResponse))
config := NewConfig()
config.Net.TLS.Enable = true
config.Net.TLS.Config = clientConfig
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err == nil {
safeClose(t, client)
}
if expectSuccess {
if err != nil {
t.Fatal(err)
}
} else {
if err == nil {
t.Fatal("expected failure")
}
}
}
This diff is collapsed.
package sarama
import (
"bytes"
"reflect"
"testing"
)
var (
groupMemberMetadata = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
groupMemberAssignment = []byte{
0, 1, // Version
0, 0, 0, 1, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, // Topic one, partition array length
0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
)
func TestConsumerGroupMemberMetadata(t *testing.T) {
meta := &ConsumerGroupMemberMetadata{
Version: 1,
Topics: []string{"one", "two"},
UserData: []byte{0x01, 0x02, 0x03},
}
buf, err := encode(meta, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberMetadata, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberMetadata, buf)
}
meta2 := new(ConsumerGroupMemberMetadata)
err = decode(buf, meta2)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(meta, meta2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", meta, meta2)
}
}
func TestConsumerGroupMemberAssignment(t *testing.T) {
amt := &ConsumerGroupMemberAssignment{
Version: 1,
Topics: map[string][]int32{
"one": {0, 2, 4},
},
UserData: []byte{0x01, 0x02, 0x03},
}
buf, err := encode(amt, nil)
if err != nil {
t.Error("Failed to encode data", err)
} else if !bytes.Equal(groupMemberAssignment, buf) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", groupMemberAssignment, buf)
}
amt2 := new(ConsumerGroupMemberAssignment)
err = decode(buf, amt2)
if err != nil {
t.Error("Failed to decode data", err)
} else if !reflect.DeepEqual(amt, amt2) {
t.Errorf("Encoded data does not match expectation\nexpected: %v\nactual: %v", amt, amt2)
}
}
package sarama
import (
"testing"
)
var (
consumerMetadataRequestEmpty = []byte{
0x00, 0x00}
consumerMetadataRequestString = []byte{
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r'}
)
func TestConsumerMetadataRequest(t *testing.T) {
request := new(ConsumerMetadataRequest)
testEncodable(t, "empty string", request, consumerMetadataRequestEmpty)
testVersionDecodable(t, "empty string", request, consumerMetadataRequestEmpty, 0)
request.ConsumerGroup = "foobar"
testEncodable(t, "with string", request, consumerMetadataRequestString)
testVersionDecodable(t, "with string", request, consumerMetadataRequestString, 0)
}
package sarama
import "testing"
var (
consumerMetadataResponseError = []byte{
0x00, 0x0E,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00}
consumerMetadataResponseSuccess = []byte{
0x00, 0x00,
0x00, 0x00, 0x00, 0xAB,
0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0xCC, 0xDD}
)
func TestConsumerMetadataResponseError(t *testing.T) {
response := &ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
testEncodable(t, "", response, consumerMetadataResponseError)
decodedResp := &ConsumerMetadataResponse{}
if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil {
t.Error("could not decode: ", err)
}
if decodedResp.Err != ErrOffsetsLoadInProgress {
t.Errorf("got %s, want %s", decodedResp.Err, ErrOffsetsLoadInProgress)
}
}
func TestConsumerMetadataResponseSuccess(t *testing.T) {
broker := NewBroker("foo:52445")
broker.id = 0xAB
response := ConsumerMetadataResponse{
Coordinator: broker,
CoordinatorID: 0xAB,
CoordinatorHost: "foo",
CoordinatorPort: 0xCCDD,
Err: ErrNoError,
}
testResponse(t, "success", &response, consumerMetadataResponseSuccess)
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
package sarama
import "testing"
var (
basicHeartbeatRequest = []byte{
0, 3, 'f', 'o', 'o', // Group ID
0x00, 0x01, 0x02, 0x03, // Generatiuon ID
0, 3, 'b', 'a', 'z', // Member ID
}
)
func TestHeartbeatRequest(t *testing.T) {
var request *HeartbeatRequest
request = new(HeartbeatRequest)
request.GroupId = "foo"
request.GenerationId = 66051
request.MemberId = "baz"
testRequest(t, "basic", request, basicHeartbeatRequest)
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment