Commit 9d39ea51 authored by Yong Tang's avatar Yong Tang Committed by GitHub

Add `go mod` support (#2503)

* Remove vendor and go-dep
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>

* Add go.mod
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>

* Update Makefile and .travis.yml
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>
parent 39d94835

Too many changes to show.

To preserve performance only 1000 of 1000+ files are displayed.

...@@ -38,7 +38,6 @@ before_install: ...@@ -38,7 +38,6 @@ before_install:
before_script: before_script:
- docker run -d --net=host --name=etcd quay.io/coreos/etcd:v$ETCD_VERSION - docker run -d --net=host --name=etcd quay.io/coreos/etcd:v$ETCD_VERSION
- make godeps
script: script:
- make TEST_TYPE=$TEST_TYPE travis - make TEST_TYPE=$TEST_TYPE travis
......
This diff is collapsed.
ignored = [
"github.com/mholt/caddy",
"github.com/mholt/caddy/caddyfile",
"github.com/mholt/caddy/startupshutdown",
"github.com/mholt/caddy/onevent",
"github.com/miekg/dns",
"github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promhttp",
]
[prune]
go-tests = true
non-go = true
unused-packages = true
# client-go v10.0.0 uses apimachinery 2b1284ed4c93a43499e781493253e2ac5959c4fd
# and api 89a74a8d264df0e993299876a8cde88379b940ee,
# and introduced klog 8139d8cb77af419532b33dfa7dd09fbc5f1d344f
# and yaml fd68e9863619f6ec2fdd8625fe1f02e7c877e480 (see Godep.json).
# go dep is unable to match Godep.json automatically so have to specify here.
[[constraint]]
name = "k8s.io/client-go"
version = "v10.0.0"
[[override]]
name = "k8s.io/apimachinery"
revision = "2b1284ed4c93a43499e781493253e2ac5959c4fd"
[[override]]
name = "k8s.io/api"
revision = "89a74a8d264df0e993299876a8cde88379b940ee"
[[override]]
name = "k8s.io/klog"
revision = "8139d8cb77af419532b33dfa7dd09fbc5f1d344f"
[[override]]
name = "sigs.k8s.io/yaml"
revision = "fd68e9863619f6ec2fdd8625fe1f02e7c877e480"
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
GITCOMMIT:=$(shell git describe --dirty --always) GITCOMMIT:=$(shell git describe --dirty --always)
BINARY:=coredns BINARY:=coredns
SYSTEM:= SYSTEM:=
CHECKS:=check godeps CHECKS:=check
BUILDOPTS:=-v BUILDOPTS:=-v
GOPATH?=$(HOME)/go GOPATH?=$(HOME)/go
PRESUBMIT:=core coremain plugin test request PRESUBMIT:=core coremain plugin test request
...@@ -14,45 +14,29 @@ all: coredns ...@@ -14,45 +14,29 @@ all: coredns
.PHONY: coredns .PHONY: coredns
coredns: $(CHECKS) coredns: $(CHECKS)
CGO_ENABLED=$(CGO_ENABLED) $(SYSTEM) go build $(BUILDOPTS) -ldflags="-s -w -X github.com/coredns/coredns/coremain.GitCommit=$(GITCOMMIT)" -o $(BINARY) GO111MODULE=on CGO_ENABLED=$(CGO_ENABLED) $(SYSTEM) go build $(BUILDOPTS) -ldflags="-s -w -X github.com/coredns/coredns/coremain.GitCommit=$(GITCOMMIT)" -o $(BINARY)
.PHONY: check .PHONY: check
check: presubmit core/plugin/zplugin.go core/dnsserver/zdirectives.go godeps check: presubmit core/plugin/zplugin.go core/dnsserver/zdirectives.go
.PHONY: godeps
godeps:
@ # Not vendoring these, so external plugins compile, avoiding:
@ # cannot use c (type *"github.com/mholt/caddy".Controller) as type
@ # *"github.com/coredns/coredns/vendor/github.com/mholt/caddy".Controller like errors.
(cd $(GOPATH)/src/github.com/mholt/caddy 2>/dev/null && git checkout -q master 2>/dev/null || true)
(cd $(GOPATH)/src/github.com/miekg/dns 2>/dev/null && git checkout -q master 2>/dev/null || true)
(cd $(GOPATH)/src/github.com/prometheus/client_golang 2>/dev/null && git checkout -q master 2>/dev/null || true)
go get -u github.com/mholt/caddy
go get -u github.com/miekg/dns
go get -u github.com/prometheus/client_golang/prometheus/promhttp
go get -u github.com/prometheus/client_golang/prometheus
(cd $(GOPATH)/src/github.com/mholt/caddy && git checkout -q v0.11.4)
(cd $(GOPATH)/src/github.com/miekg/dns && git checkout -q v1.1.4)
(cd $(GOPATH)/src/github.com/prometheus/client_golang && git checkout -q v0.9.1)
.PHONY: travis .PHONY: travis
travis: travis:
ifeq ($(TEST_TYPE),core) ifeq ($(TEST_TYPE),core)
( cd request ; go test -v -tags 'etcd' -race ./... ) ( cd request ; GO111MODULE=on go test -v -tags 'etcd' -race ./... )
( cd core ; go test -v -tags 'etcd' -race ./... ) ( cd core ; GO111MODULE=on go test -v -tags 'etcd' -race ./... )
( cd coremain ; go test -v -tags 'etcd' -race ./... ) ( cd coremain ; GO111MODULE=on go test -v -tags 'etcd' -race ./... )
endif endif
ifeq ($(TEST_TYPE),integration) ifeq ($(TEST_TYPE),integration)
( cd test ; go test -v -tags 'etcd' -race ./... ) ( cd test ; GO111MODULE=on go test -v -tags 'etcd' -race ./... )
endif endif
ifeq ($(TEST_TYPE),plugin) ifeq ($(TEST_TYPE),plugin)
( cd plugin ; go test -v -tags 'etcd' -race ./... ) ( cd plugin ; GO111MODULE=on go test -v -tags 'etcd' -race ./... )
endif endif
ifeq ($(TEST_TYPE),coverage) ifeq ($(TEST_TYPE),coverage)
for d in `go list ./... | grep -v vendor`; do \ for d in `go list ./... | grep -v vendor`; do \
t=$$(date +%s); \ t=$$(date +%s); \
go test -i -tags 'etcd' -coverprofile=cover.out -covermode=atomic $$d || exit 1; \ GO111MODULE=on go test -i -tags 'etcd' -coverprofile=cover.out -covermode=atomic $$d || exit 1; \
go test -v -tags 'etcd' -coverprofile=cover.out -covermode=atomic $$d || exit 1; \ GO111MODULE=on go test -v -tags 'etcd' -coverprofile=cover.out -covermode=atomic $$d || exit 1; \
echo "Coverage test $$d took $$(($$(date +%s)-t)) seconds"; \ echo "Coverage test $$d took $$(($$(date +%s)-t)) seconds"; \
if [ -f cover.out ]; then \ if [ -f cover.out ]; then \
cat cover.out >> coverage.txt; \ cat cover.out >> coverage.txt; \
...@@ -62,11 +46,11 @@ ifeq ($(TEST_TYPE),coverage) ...@@ -62,11 +46,11 @@ ifeq ($(TEST_TYPE),coverage)
endif endif
core/plugin/zplugin.go core/dnsserver/zdirectives.go: plugin.cfg core/plugin/zplugin.go core/dnsserver/zdirectives.go: plugin.cfg
go generate coredns.go GO111MODULE=on go generate coredns.go
.PHONY: gen .PHONY: gen
gen: gen:
go generate coredns.go GO111MODULE=on go generate coredns.go
.PHONY: pb .PHONY: pb
pb: pb:
...@@ -79,12 +63,5 @@ presubmit: ...@@ -79,12 +63,5 @@ presubmit:
.PHONY: clean .PHONY: clean
clean: clean:
go clean GO111MODULE=on go clean
rm -f coredns rm -f coredns
.PHONY: dep-ensure
dep-ensure:
dep version || go get -u github.com/golang/dep/cmd/dep
dep ensure -v
dep prune -v
find vendor -name '*_test.go' -delete
module github.com/coredns/coredns
go 1.12
require (
cloud.google.com/go v0.28.0
github.com/DataDog/dd-trace-go v0.6.1
github.com/Shopify/sarama v1.17.0
github.com/apache/thrift v0.12.0
github.com/aws/aws-sdk-go v1.14.17
github.com/coreos/etcd v3.3.11+incompatible
github.com/davecgh/go-spew v1.1.0
github.com/dnstap/golang-dnstap v0.0.0-20170829151710-2cf77a2b5e11
github.com/eapache/go-resiliency v1.1.0
github.com/eapache/go-xerial-snappy v0.0.0-20160609142408-bb955e01b934
github.com/eapache/queue v1.1.0
github.com/evanphx/json-patch v4.1.0+incompatible
github.com/farsightsec/golang-framestream v0.0.0-20180124174429-c06a5734334d
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 // indirect
github.com/go-ini/ini v1.37.0
github.com/go-logfmt/logfmt v0.3.0
github.com/gogo/protobuf v1.0.0
github.com/golang/protobuf v1.2.0
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf
github.com/google/uuid v1.1.1 // indirect
github.com/googleapis/gnostic v0.2.0
github.com/gophercloud/gophercloud v0.0.0-20180928224355-bfc006765209
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47
github.com/imdario/mergo v0.3.5
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8
github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/mholt/caddy v0.11.4
github.com/miekg/dns v1.1.4
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492
github.com/opentracing/opentracing-go v1.0.2
github.com/openzipkin/zipkin-go-opentracing v0.3.4
github.com/petar/GoLLRB v0.0.0-20130427215148-53be0d36a84c
github.com/peterbourgon/diskv v2.0.1+incompatible
github.com/pierrec/lz4 v2.0.3+incompatible
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165
github.com/spf13/pflag v1.0.1
github.com/ugorji/go v0.0.0-20161130061742-9c7f9b7a2bc3
golang.org/x/crypto v0.0.0-20180621125126-a49355c7e3f8
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
golang.org/x/sys v0.0.0-20180627142611-7138fd3d9dc8
golang.org/x/text v0.3.0
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2
google.golang.org/appengine v1.2.0
google.golang.org/genproto v0.0.0-20180627194029-ff3583edef7d
google.golang.org/grpc v1.13.0
gopkg.in/inf.v0 v0.9.1
gopkg.in/yaml.v2 v2.2.1
k8s.io/api v0.0.0-20181204000039-89a74a8d264d
k8s.io/apimachinery v0.0.0-20181127025237-2b1284ed4c93
k8s.io/client-go v10.0.0+incompatible
k8s.io/klog v0.0.0-20181108234604-8139d8cb77af
k8s.io/kube-openapi v0.0.0-20180928202339-9dfdf9be683f
sigs.k8s.io/yaml v1.1.0
)
This diff is collapsed.
# This is the official list of cloud authors for copyright purposes.
# This file is distinct from the CONTRIBUTORS files.
# See the latter for an explanation.
# Names should be added to this file as:
# Name or Organization <email address>
# The email address is not required for organizations.
Filippo Valsorda <hi@filippo.io>
Google Inc.
Ingo Oeser <nightlyone@googlemail.com>
Palm Stone Games, Inc.
Paweł Knap <pawelknap88@gmail.com>
Péter Szilágyi <peterke@gmail.com>
Tyler Treat <ttreat31@gmail.com>
# People who have agreed to one of the CLAs and can contribute patches.
# The AUTHORS file lists the copyright holders; this file
# lists people. For example, Google employees are listed here
# but not in AUTHORS, because Google holds the copyright.
#
# https://developers.google.com/open-source/cla/individual
# https://developers.google.com/open-source/cla/corporate
#
# Names should be added to this file as:
# Name <email address>
# Keep the list alphabetically sorted.
Alexis Hunt <lexer@google.com>
Andreas Litt <andreas.litt@gmail.com>
Andrew Gerrand <adg@golang.org>
Brad Fitzpatrick <bradfitz@golang.org>
Burcu Dogan <jbd@google.com>
Dave Day <djd@golang.org>
David Sansome <me@davidsansome.com>
David Symonds <dsymonds@golang.org>
Filippo Valsorda <hi@filippo.io>
Glenn Lewis <gmlewis@google.com>
Ingo Oeser <nightlyone@googlemail.com>
James Hall <james.hall@shopify.com>
Johan Euphrosine <proppy@google.com>
Jonathan Amsterdam <jba@google.com>
Kunpei Sakai <namusyaka@gmail.com>
Luna Duclos <luna.duclos@palmstonegames.com>
Magnus Hiie <magnus.hiie@gmail.com>
Mario Castro <mariocaster@gmail.com>
Michael McGreevy <mcgreevy@golang.org>
Omar Jarjur <ojarjur@google.com>
Paweł Knap <pawelknap88@gmail.com>
Péter Szilágyi <peterke@gmail.com>
Sarah Adams <shadams@google.com>
Thanatat Tamtan <acoshift@gmail.com>
Toby Burress <kurin@google.com>
Tuo Shan <shantuo@google.com>
Tyler Treat <ttreat31@gmail.com>
This diff is collapsed.
Copyright (c) 2016, Datadog <info@datadoghq.com>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Datadog nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Component,Origin,License,Copyright
import,io.opentracing,Apache-2.0,Copyright 2016-2017 The OpenTracing Authors
\ No newline at end of file
package opentracing
import (
"os"
"path/filepath"
)
// Configuration struct configures the Datadog tracer. Please use the NewConfiguration
// constructor to begin.
type Configuration struct {
// Enabled, when false, returns a no-op implementation of the Tracer.
Enabled bool
// Debug, when true, writes details to logs.
Debug bool
// ServiceName specifies the name of this application.
ServiceName string
// SampleRate sets the Tracer sample rate (ext/priority.go).
SampleRate float64
// AgentHostname specifies the hostname of the agent where the traces
// are sent to.
AgentHostname string
// AgentPort specifies the port that the agent is listening on.
AgentPort string
// GlobalTags holds a set of tags that will be automatically applied to
// all spans.
GlobalTags map[string]interface{}
// TextMapPropagator is an injector used for Context propagation.
TextMapPropagator Propagator
}
// NewConfiguration creates a `Configuration` object with default values.
func NewConfiguration() *Configuration {
// default service name is the Go binary name
binaryName := filepath.Base(os.Args[0])
// Configuration struct with default values
return &Configuration{
Enabled: true,
Debug: false,
ServiceName: binaryName,
SampleRate: 1,
AgentHostname: "localhost",
AgentPort: "8126",
GlobalTags: make(map[string]interface{}),
TextMapPropagator: NewTextMapPropagator("", "", ""),
}
}
type noopCloser struct{}
func (c *noopCloser) Close() error { return nil }
package opentracing
// SpanContext represents Span state that must propagate to descendant Spans
// and across process boundaries.
type SpanContext struct {
traceID uint64
spanID uint64
parentID uint64
sampled bool
span *Span
baggage map[string]string
}
// ForeachBaggageItem grants access to all baggage items stored in the
// SpanContext
func (c SpanContext) ForeachBaggageItem(handler func(k, v string) bool) {
for k, v := range c.baggage {
if !handler(k, v) {
break
}
}
}
// WithBaggageItem returns an entirely new SpanContext with the
// given key:value baggage pair set.
func (c SpanContext) WithBaggageItem(key, val string) SpanContext {
var newBaggage map[string]string
if c.baggage == nil {
newBaggage = map[string]string{key: val}
} else {
newBaggage = make(map[string]string, len(c.baggage)+1)
for k, v := range c.baggage {
newBaggage[k] = v
}
newBaggage[key] = val
}
// Use positional parameters so the compiler will help catch new fields.
return SpanContext{
traceID: c.traceID,
spanID: c.spanID,
parentID: c.parentID,
sampled: c.sampled,
span: c.span,
baggage: newBaggage,
}
}
// Package opentracing implements an OpenTracing (http://opentracing.io)
// compatible tracer. A Datadog tracer must be initialized through
// a Configuration object as you can see in the following examples.
package opentracing
package opentracing
import (
"strconv"
"strings"
ot "github.com/opentracing/opentracing-go"
)
// Propagator implementations should be able to inject and extract
// SpanContexts into an implementation specific carrier.
type Propagator interface {
// Inject takes the SpanContext and injects it into the carrier using
// an implementation specific method.
Inject(context ot.SpanContext, carrier interface{}) error
// Extract returns the SpanContext from the given carrier using an
// implementation specific method.
Extract(carrier interface{}) (ot.SpanContext, error)
}
const (
defaultBaggageHeaderPrefix = "ot-baggage-"
defaultTraceIDHeader = "x-datadog-trace-id"
defaultParentIDHeader = "x-datadog-parent-id"
)
// NewTextMapPropagator returns a new propagator which uses opentracing.TextMap
// to inject and extract values. The parameters specify the prefix that will
// be used to prefix baggage header keys along with the trace and parent header.
// Empty strings may be provided to use the defaults, which are: "ot-baggage-" as
// prefix for baggage headers, "x-datadog-trace-id" and "x-datadog-parent-id" for
// trace and parent ID headers.
func NewTextMapPropagator(baggagePrefix, traceHeader, parentHeader string) *TextMapPropagator {
if baggagePrefix == "" {
baggagePrefix = defaultBaggageHeaderPrefix
}
if traceHeader == "" {
traceHeader = defaultTraceIDHeader
}
if parentHeader == "" {
parentHeader = defaultParentIDHeader
}
return &TextMapPropagator{baggagePrefix, traceHeader, parentHeader}
}
// TextMapPropagator implements a propagator which uses opentracing.TextMap
// internally.
type TextMapPropagator struct {
baggagePrefix string
traceHeader string
parentHeader string
}
// Inject defines the TextMapPropagator to propagate SpanContext data
// out of the current process. The implementation propagates the
// TraceID and the current active SpanID, as well as the Span baggage.
func (p *TextMapPropagator) Inject(context ot.SpanContext, carrier interface{}) error {
ctx, ok := context.(SpanContext)
if !ok {
return ot.ErrInvalidSpanContext
}
writer, ok := carrier.(ot.TextMapWriter)
if !ok {
return ot.ErrInvalidCarrier
}
// propagate the TraceID and the current active SpanID
writer.Set(p.traceHeader, strconv.FormatUint(ctx.traceID, 10))
writer.Set(p.parentHeader, strconv.FormatUint(ctx.spanID, 10))
// propagate OpenTracing baggage
for k, v := range ctx.baggage {
writer.Set(p.baggagePrefix+k, v)
}
return nil
}
// Extract implements Propagator.
func (p *TextMapPropagator) Extract(carrier interface{}) (ot.SpanContext, error) {
reader, ok := carrier.(ot.TextMapReader)
if !ok {
return nil, ot.ErrInvalidCarrier
}
var err error
var traceID, parentID uint64
decodedBaggage := make(map[string]string)
// extract SpanContext fields
err = reader.ForeachKey(func(k, v string) error {
switch strings.ToLower(k) {
case p.traceHeader:
traceID, err = strconv.ParseUint(v, 10, 64)
if err != nil {
return ot.ErrSpanContextCorrupted
}
case p.parentHeader:
parentID, err = strconv.ParseUint(v, 10, 64)
if err != nil {
return ot.ErrSpanContextCorrupted
}
default:
lowercaseK := strings.ToLower(k)
if strings.HasPrefix(lowercaseK, p.baggagePrefix) {
decodedBaggage[strings.TrimPrefix(lowercaseK, p.baggagePrefix)] = v
}
}
return nil
})
if err != nil {
return nil, err
}
if traceID == 0 || parentID == 0 {
return nil, ot.ErrSpanContextNotFound
}
return SpanContext{
traceID: traceID,
spanID: parentID,
baggage: decodedBaggage,
}, nil
}
package opentracing
import (
"fmt"
"time"
ddtrace "github.com/DataDog/dd-trace-go/tracer"
ot "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
)
// Span represents an active, un-finished span in the OpenTracing system.
// Spans are created by the Tracer interface.
type Span struct {
*ddtrace.Span
context SpanContext
tracer *Tracer
}
// Tracer provides access to the `Tracer`` that created this Span.
func (s *Span) Tracer() ot.Tracer {
return s.tracer
}
// Context yields the SpanContext for this Span. Note that the return
// value of Context() is still valid after a call to Span.Finish(), as is
// a call to Span.Context() after a call to Span.Finish().
func (s *Span) Context() ot.SpanContext {
return s.context
}
// SetBaggageItem sets a key:value pair on this Span and its SpanContext
// that also propagates to descendants of this Span.
func (s *Span) SetBaggageItem(key, val string) ot.Span {
s.Span.Lock()
defer s.Span.Unlock()
s.context = s.context.WithBaggageItem(key, val)
return s
}
// BaggageItem gets the value for a baggage item given its key. Returns the empty string
// if the value isn't found in this Span.
func (s *Span) BaggageItem(key string) string {
s.Span.Lock()
defer s.Span.Unlock()
return s.context.baggage[key]
}
// SetTag adds a tag to the span, overwriting pre-existing values for
// the given `key`.
func (s *Span) SetTag(key string, value interface{}) ot.Span {
switch key {
case ServiceName:
s.Span.Lock()
defer s.Span.Unlock()
s.Span.Service = fmt.Sprint(value)
case ResourceName:
s.Span.Lock()
defer s.Span.Unlock()
s.Span.Resource = fmt.Sprint(value)
case SpanType:
s.Span.Lock()
defer s.Span.Unlock()
s.Span.Type = fmt.Sprint(value)
case Error:
switch v := value.(type) {
case nil:
// no error
case error:
s.Span.SetError(v)
default:
s.Span.SetError(fmt.Errorf("%v", v))
}
default:
// NOTE: locking is not required because the `SetMeta` is
// already thread-safe
s.Span.SetMeta(key, fmt.Sprint(value))
}
return s
}
// FinishWithOptions is like Finish() but with explicit control over
// timestamps and log data.
func (s *Span) FinishWithOptions(options ot.FinishOptions) {
if options.FinishTime.IsZero() {
options.FinishTime = time.Now().UTC()
}
s.Span.FinishWithTime(options.FinishTime.UnixNano())
}
// SetOperationName sets or changes the operation name.
func (s *Span) SetOperationName(operationName string) ot.Span {
s.Span.Lock()
defer s.Span.Unlock()
s.Span.Name = operationName
return s
}
// LogFields is an efficient and type-checked way to record key:value
// logging data about a Span, though the programming interface is a little
// more verbose than LogKV().
func (s *Span) LogFields(fields ...log.Field) {
// TODO: implementation missing
}
// LogKV is a concise, readable way to record key:value logging data about
// a Span, though unfortunately this also makes it less efficient and less
// type-safe than LogFields().
func (s *Span) LogKV(keyVals ...interface{}) {
// TODO: implementation missing
}
// LogEvent is deprecated: use LogFields or LogKV
func (s *Span) LogEvent(event string) {
// TODO: implementation missing
}
// LogEventWithPayload deprecated: use LogFields or LogKV
func (s *Span) LogEventWithPayload(event string, payload interface{}) {
// TODO: implementation missing
}
// Log is deprecated: use LogFields or LogKV
func (s *Span) Log(data ot.LogData) {
// TODO: implementation missing
}
// NewSpan is the OpenTracing Span constructor
func NewSpan(operationName string) *Span {
span := &ddtrace.Span{
Name: operationName,
}
otSpan := &Span{
Span: span,
context: SpanContext{
traceID: span.TraceID,
spanID: span.SpanID,
parentID: span.ParentID,
sampled: span.Sampled,
},
}
// SpanContext is propagated and used to create children
otSpan.context.span = otSpan
return otSpan
}
package opentracing
const (
// SpanType defines the Span type (web, db, cache)
SpanType = "span.type"
// ServiceName defines the Service name for this Span
ServiceName = "service.name"
// ResourceName defines the Resource name for the Span
ResourceName = "resource.name"
// Error defines an error.
Error = "error.error"
)
package opentracing
import (
"errors"
"io"
"time"
ddtrace "github.com/DataDog/dd-trace-go/tracer"
ot "github.com/opentracing/opentracing-go"
)
// Tracer is a simple, thin interface for Span creation and SpanContext
// propagation. In the current state, this Tracer is a compatibility layer
// that wraps the Datadog Tracer implementation.
type Tracer struct {
// impl is the Datadog Tracer implementation.
impl *ddtrace.Tracer
// config holds the Configuration used to create the Tracer.
config *Configuration
}
// StartSpan creates, starts, and returns a new Span with the given `operationName`
// A Span with no SpanReference options (e.g., opentracing.ChildOf() or
// opentracing.FollowsFrom()) becomes the root of its own trace.
func (t *Tracer) StartSpan(operationName string, options ...ot.StartSpanOption) ot.Span {
sso := ot.StartSpanOptions{}
for _, o := range options {
o.Apply(&sso)
}
return t.startSpanWithOptions(operationName, sso)
}
func (t *Tracer) startSpanWithOptions(operationName string, options ot.StartSpanOptions) ot.Span {
if options.StartTime.IsZero() {
options.StartTime = time.Now().UTC()
}
var context SpanContext
var hasParent bool
var parent *Span
var span *ddtrace.Span
for _, ref := range options.References {
ctx, ok := ref.ReferencedContext.(SpanContext)
if !ok {
// ignore the SpanContext since it's not valid
continue
}
// if we have parenting define it
if ref.Type == ot.ChildOfRef {
hasParent = true
context = ctx
parent = ctx.span
}
}
if parent == nil {
// create a root Span with the default service name and resource
span = t.impl.NewRootSpan(operationName, t.config.ServiceName, operationName)
if hasParent {
// the Context doesn't have a Span reference because it
// has been propagated from another process, so we set these
// values manually
span.TraceID = context.traceID
span.ParentID = context.spanID
t.impl.Sample(span)
}
} else {
// create a child Span that inherits from a parent
span = t.impl.NewChildSpan(operationName, parent.Span)
}
// create an OpenTracing compatible Span; the SpanContext has a
// back-reference that is used for parent-child hierarchy
otSpan := &Span{
Span: span,
context: SpanContext{
traceID: span.TraceID,
spanID: span.SpanID,
parentID: span.ParentID,
sampled: span.Sampled,
},
tracer: t,
}
otSpan.context.span = otSpan
// set start time
otSpan.Span.Start = options.StartTime.UnixNano()
if parent != nil {
// propagate baggage items
if l := len(parent.context.baggage); l > 0 {
otSpan.context.baggage = make(map[string]string, len(parent.context.baggage))
for k, v := range parent.context.baggage {
otSpan.context.baggage[k] = v
}
}
}
// add tags from options
for k, v := range options.Tags {
otSpan.SetTag(k, v)
}
// add global tags
for k, v := range t.config.GlobalTags {
otSpan.SetTag(k, v)
}
return otSpan
}
// Inject takes the `sm` SpanContext instance and injects it for
// propagation within `carrier`. The actual type of `carrier` depends on
// the value of `format`. Currently supported Injectors are:
// * `TextMap`
// * `HTTPHeaders`
func (t *Tracer) Inject(ctx ot.SpanContext, format interface{}, carrier interface{}) error {
switch format {
case ot.TextMap, ot.HTTPHeaders:
return t.config.TextMapPropagator.Inject(ctx, carrier)
}
return ot.ErrUnsupportedFormat
}
// Extract returns a SpanContext instance given `format` and `carrier`.
func (t *Tracer) Extract(format interface{}, carrier interface{}) (ot.SpanContext, error) {
switch format {
case ot.TextMap, ot.HTTPHeaders:
return t.config.TextMapPropagator.Extract(carrier)
}
return nil, ot.ErrUnsupportedFormat
}
// Close method implements `io.Closer` interface to graceful shutdown the Datadog
// Tracer. Note that this is a blocking operation that waits for the flushing Go
// routine.
func (t *Tracer) Close() error {
t.impl.Stop()
return nil
}
// NewTracer uses a Configuration object to initialize a Datadog Tracer.
// The initialization returns a `io.Closer` that can be used to graceful
// shutdown the tracer. If the configuration object defines a disabled
// Tracer, a no-op implementation is returned.
func NewTracer(config *Configuration) (ot.Tracer, io.Closer, error) {
if config.ServiceName == "" {
// abort initialization if a `ServiceName` is not defined
return nil, nil, errors.New("A Datadog Tracer requires a valid `ServiceName` set")
}
if config.Enabled == false {
// return a no-op implementation so Datadog provides the minimum overhead
return &ot.NoopTracer{}, &noopCloser{}, nil
}
// configure a Datadog Tracer
transport := ddtrace.NewTransport(config.AgentHostname, config.AgentPort)
tracer := &Tracer{
impl: ddtrace.NewTracerTransport(transport),
config: config,
}
tracer.impl.SetDebugLogging(config.Debug)
tracer.impl.SetSampleRate(config.SampleRate)
// set the new Datadog Tracer as a `DefaultTracer` so it can be
// used in integrations. NOTE: this is a temporary implementation
// that can be removed once all integrations have been migrated
// to the OpenTracing API.
ddtrace.DefaultTracer = tracer.impl
return tracer, tracer, nil
}
package tracer
import (
"sync"
)
const (
// spanBufferDefaultInitSize is the initial size of our trace buffer,
// by default we allocate for a handful of spans within the trace,
// reasonable as span is actually way bigger, and avoids re-allocating
// over and over. Could be fine-tuned at runtime.
spanBufferDefaultInitSize = 10
// spanBufferDefaultMaxSize is the maximum number of spans we keep in memory.
// This is to avoid memory leaks, if above that value, spans are randomly
// dropped and ignore, resulting in corrupted tracing data, but ensuring
// original program continues to work as expected.
spanBufferDefaultMaxSize = 1e5
)
type spanBuffer struct {
channels tracerChans
spans []*Span
finishedSpans int
initSize int
maxSize int
sync.RWMutex
}
func newSpanBuffer(channels tracerChans, initSize, maxSize int) *spanBuffer {
if initSize <= 0 {
initSize = spanBufferDefaultInitSize
}
if maxSize <= 0 {
maxSize = spanBufferDefaultMaxSize
}
return &spanBuffer{
channels: channels,
initSize: initSize,
maxSize: maxSize,
}
}
func (tb *spanBuffer) Push(span *Span) {
if tb == nil {
return
}
tb.Lock()
defer tb.Unlock()
if len(tb.spans) > 0 {
// if spanBuffer is full, forget span
if len(tb.spans) >= tb.maxSize {
tb.channels.pushErr(&errorSpanBufFull{Len: len(tb.spans)})
return
}
// if there's a trace ID mismatch, ignore span
if tb.spans[0].TraceID != span.TraceID {
tb.channels.pushErr(&errorTraceIDMismatch{Expected: tb.spans[0].TraceID, Actual: span.TraceID})
return
}
}
if tb.spans == nil {
tb.spans = make([]*Span, 0, tb.initSize)
}
tb.spans = append(tb.spans, span)
}
func (tb *spanBuffer) flushable() bool {
tb.RLock()
defer tb.RUnlock()
if len(tb.spans) == 0 {
return false
}
return tb.finishedSpans == len(tb.spans)
}
func (tb *spanBuffer) ack() {
tb.Lock()
defer tb.Unlock()
tb.finishedSpans++
}
func (tb *spanBuffer) doFlush() {
if !tb.flushable() {
return
}
tb.Lock()
defer tb.Unlock()
tb.channels.pushTrace(tb.spans)
tb.spans = nil
tb.finishedSpans = 0 // important, because a buffer can be used for several flushes
}
func (tb *spanBuffer) Flush() {
if tb == nil {
return
}
tb.doFlush()
}
func (tb *spanBuffer) AckFinish() {
if tb == nil {
return
}
tb.ack()
tb.doFlush()
}
func (tb *spanBuffer) Len() int {
if tb == nil {
return 0
}
tb.RLock()
defer tb.RUnlock()
return len(tb.spans)
}
package tracer
const (
// traceChanLen is the capacity of the trace channel. This channels is emptied
// on a regular basis (worker thread) or when it reaches 50% of its capacity.
// If it's full, then data is simply dropped and ignored, with a log message.
// This only happens under heavy load,
traceChanLen = 1000
// serviceChanLen is the length of the service channel. As for the trace channel,
// it's emptied by worker thread or when it reaches 50%. Note that there should
// be much less data here, as service data does not be to be updated that often.
serviceChanLen = 50
// errChanLen is the number of errors we keep in the error channel. When this
// one is full, errors are just ignored, dropped, nothing left. At some point,
// there's already a whole lot of errors in the backlog, there's no real point
// in keeping millions of errors, a representative sample is enough. And we
// don't want to block user code and/or bloat memory or log files with redundant data.
errChanLen = 200
)
// traceChans holds most tracer channels together, it's mostly used to
// pass them together to the span buffer/context. It's obviously safe
// to access it concurrently as it contains channels only. And it's convenient
// to have it isolated from tracer, for the sake of unit testing.
type tracerChans struct {
trace chan []*Span
service chan Service
err chan error
traceFlush chan struct{}
serviceFlush chan struct{}
errFlush chan struct{}
}
func newTracerChans() tracerChans {
return tracerChans{
trace: make(chan []*Span, traceChanLen),
service: make(chan Service, serviceChanLen),
err: make(chan error, errChanLen),
traceFlush: make(chan struct{}, 1),
serviceFlush: make(chan struct{}, 1),
errFlush: make(chan struct{}, 1),
}
}
func (tc *tracerChans) pushTrace(trace []*Span) {
if len(tc.trace) >= cap(tc.trace)/2 { // starts being full, anticipate, try and flush soon
select {
case tc.traceFlush <- struct{}{}:
default: // a flush was already requested, skip
}
}
select {
case tc.trace <- trace:
default: // never block user code
tc.pushErr(&errorTraceChanFull{Len: len(tc.trace)})
}
}
func (tc *tracerChans) pushService(service Service) {
if len(tc.service) >= cap(tc.service)/2 { // starts being full, anticipate, try and flush soon
select {
case tc.serviceFlush <- struct{}{}:
default: // a flush was already requested, skip
}
}
select {
case tc.service <- service:
default: // never block user code
tc.pushErr(&errorServiceChanFull{Len: len(tc.service)})
}
}
func (tc *tracerChans) pushErr(err error) {
if len(tc.err) >= cap(tc.err)/2 { // starts being full, anticipate, try and flush soon
select {
case tc.errFlush <- struct{}{}:
default: // a flush was already requested, skip
}
}
select {
case tc.err <- err:
default:
// OK, if we get this, our error error buffer is full,
// we can assume it is filled with meaningful messages which
// are going to be logged and hopefully read, nothing better
// we can do, blocking would make things worse.
}
}
package tracer
import (
"context"
)
var spanKey = "datadog_trace_span"
// ContextWithSpan will return a new context that includes the given span.
// DEPRECATED: use span.Context(ctx) instead.
func ContextWithSpan(ctx context.Context, span *Span) context.Context {
if span == nil {
return ctx
}
return span.Context(ctx)
}
// SpanFromContext returns the stored *Span from the Context if it's available.
// This helper returns also the ok value that is true if the span is present.
func SpanFromContext(ctx context.Context) (*Span, bool) {
if ctx == nil {
return nil, false
}
span, ok := ctx.Value(spanKey).(*Span)
return span, ok
}
// SpanFromContextDefault returns the stored *Span from the Context. If not, it
// will return an empty span that will do nothing.
func SpanFromContextDefault(ctx context.Context) *Span {
// FIXME[matt] is it better to return a singleton empty span?
if ctx == nil {
return &Span{}
}
span, ok := SpanFromContext(ctx)
if !ok {
return &Span{}
}
return span
}
// Package tracer contains Datadog's tracing client. It is used to trace
// requests as they flow across web servers, databases and microservices so
// that developers have visibility into bottlenecks and troublesome
// requests.
//
// Package tracer has two core objects: Tracers and Spans. Spans represent
// a chunk of computation time. They have names, durations, timestamps and
// other metadata. Tracers are used to create hierarchies of spans in a
// request, buffer and submit them to the server.
//
// The tracing client can perform trace sampling. While the trace agent
// already samples traces to reduce bandwidth usage, client sampling reduces
// performance overhead.
//
// To enable APM and/or tracing of supported integrations, follow the instructions for
// the appropriate package: https://godoc.org/github.com/DataDog/dd-trace-go/tracer#pkg-subdirectories
//
// Sample code is available in the two examples below:
package tracer
package tracer
import (
"bytes"
"encoding/json"
"github.com/ugorji/go/codec"
)
const (
jsonContentType = "application/json"
msgpackContentType = "application/msgpack"
)
// Encoder is a generic interface that expects encoding methods for traces and
// services, and a Read() method that will be used by the http handler
type Encoder interface {
EncodeTraces(traces [][]*Span) error
EncodeServices(services map[string]Service) error
Read(p []byte) (int, error)
ContentType() string
}
var mh codec.MsgpackHandle
// msgpackEncoder encodes a list of traces in Msgpack format
type msgpackEncoder struct {
buffer *bytes.Buffer
encoder *codec.Encoder
contentType string
}
func newMsgpackEncoder() *msgpackEncoder {
buffer := &bytes.Buffer{}
encoder := codec.NewEncoder(buffer, &mh)
return &msgpackEncoder{
buffer: buffer,
encoder: encoder,
contentType: msgpackContentType,
}
}
// EncodeTraces serializes the given trace list into the internal buffer,
// returning the error if any.
func (e *msgpackEncoder) EncodeTraces(traces [][]*Span) error {
return e.encoder.Encode(traces)
}
// EncodeServices serializes a service map into the internal buffer.
func (e *msgpackEncoder) EncodeServices(services map[string]Service) error {
return e.encoder.Encode(services)
}
// Read values from the internal buffer
func (e *msgpackEncoder) Read(p []byte) (int, error) {
return e.buffer.Read(p)
}
// ContentType return the msgpackEncoder content-type
func (e *msgpackEncoder) ContentType() string {
return e.contentType
}
// jsonEncoder encodes a list of traces in JSON format
type jsonEncoder struct {
buffer *bytes.Buffer
encoder *json.Encoder
contentType string
}
// newJSONEncoder returns a new encoder for the JSON format.
func newJSONEncoder() *jsonEncoder {
buffer := &bytes.Buffer{}
encoder := json.NewEncoder(buffer)
return &jsonEncoder{
buffer: buffer,
encoder: encoder,
contentType: jsonContentType,
}
}
// EncodeTraces serializes the given trace list into the internal buffer,
// returning the error if any.
func (e *jsonEncoder) EncodeTraces(traces [][]*Span) error {
return e.encoder.Encode(traces)
}
// EncodeServices serializes a service map into the internal buffer.
func (e *jsonEncoder) EncodeServices(services map[string]Service) error {
return e.encoder.Encode(services)
}
// Read values from the internal buffer
func (e *jsonEncoder) Read(p []byte) (int, error) {
return e.buffer.Read(p)
}
// ContentType return the jsonEncoder content-type
func (e *jsonEncoder) ContentType() string {
return e.contentType
}
// encoderFactory will provide a new encoder each time we want to flush traces or services.
type encoderFactory func() Encoder
func jsonEncoderFactory() Encoder {
return newJSONEncoder()
}
func msgpackEncoderFactory() Encoder {
return newMsgpackEncoder()
}
package tracer
import (
"log"
"strconv"
)
const (
errorPrefix = "Datadog Tracer Error: "
)
// errorSpanBufFull is raised when there's no more room in the buffer
type errorSpanBufFull struct {
// Len is the length of the buffer (which is full)
Len int
}
// Error provides a readable error message.
func (e *errorSpanBufFull) Error() string {
return "span buffer is full (length: " + strconv.Itoa(e.Len) + ")"
}
// errorTraceChanFull is raised when there's no more room in the channel
type errorTraceChanFull struct {
// Len is the length of the channel (which is full)
Len int
}
// Error provides a readable error message.
func (e *errorTraceChanFull) Error() string {
return "trace channel is full (length: " + strconv.Itoa(e.Len) + ")"
}
// errorServiceChanFull is raised when there's no more room in the channel
type errorServiceChanFull struct {
// Len is the length of the channel (which is full)
Len int
}
// Error provides a readable error message.
func (e *errorServiceChanFull) Error() string {
return "service channel is full (length: " + strconv.Itoa(e.Len) + ")"
}
// errorTraceIDMismatch is raised when a trying to put a span in the wrong place.
type errorTraceIDMismatch struct {
// Expected is the trace ID we should have.
Expected uint64
// Actual is the trace ID we have and is wrong.
Actual uint64
}
// Error provides a readable error message.
func (e *errorTraceIDMismatch) Error() string {
return "trace ID mismatch (expected: " +
strconv.FormatUint(e.Expected, 16) +
" actual: " +
strconv.FormatUint(e.Actual, 16) +
")"
}
// errorNoSpanBuf is raised when trying to finish/push a span that has no buffer associated to it.
type errorNoSpanBuf struct {
// SpanName is the name of the span which could not be pushed (hint for the log reader).
SpanName string
}
// Error provides a readable error message.
func (e *errorNoSpanBuf) Error() string {
return "no span buffer (span name: '" + e.SpanName + "')"
}
// errorFlushLostTraces is raised when trying to finish/push a span that has no buffer associated to it.
type errorFlushLostTraces struct {
// Nb is the number of traces lost in that flush
Nb int
}
// Error provides a readable error message.
func (e *errorFlushLostTraces) Error() string {
return "unable to flush traces, lost " + strconv.Itoa(e.Nb) + " traces"
}
// errorFlushLostServices is raised when trying to finish/push a span that has no buffer associated to it.
type errorFlushLostServices struct {
// Nb is the number of services lost in that flush
Nb int
}
// Error provides a readable error message.
func (e *errorFlushLostServices) Error() string {
return "unable to flush services, lost " + strconv.Itoa(e.Nb) + " services"
}
type errorSummary struct {
Count int
Example string
}
// errorKey returns a unique key for each error type
func errorKey(err error) string {
if err == nil {
return ""
}
switch err.(type) {
case *errorSpanBufFull:
return "ErrorSpanBufFull"
case *errorTraceChanFull:
return "ErrorTraceChanFull"
case *errorServiceChanFull:
return "ErrorServiceChanFull"
case *errorTraceIDMismatch:
return "ErrorTraceIDMismatch"
case *errorNoSpanBuf:
return "ErrorNoSpanBuf"
case *errorFlushLostTraces:
return "ErrorFlushLostTraces"
case *errorFlushLostServices:
return "ErrorFlushLostServices"
}
return err.Error() // possibly high cardinality, but this is unexpected
}
func aggregateErrors(errChan <-chan error) map[string]errorSummary {
errs := make(map[string]errorSummary, len(errChan))
for {
select {
case err := <-errChan:
if err != nil { // double-checking, we don't want to panic here...
key := errorKey(err)
summary := errs[key]
summary.Count++
summary.Example = err.Error()
errs[key] = summary
}
default: // stop when there's no more data
return errs
}
}
}
// logErrors logs the errors, preventing log file flooding, when there
// are many messages, it caps them and shows a quick summary.
// As of today it only logs using standard golang log package, but
// later we could send those stats to agent [TODO:christian].
func logErrors(errChan <-chan error) {
errs := aggregateErrors(errChan)
for _, v := range errs {
var repeat string
if v.Count > 1 {
repeat = " (repeated " + strconv.Itoa(v.Count) + " times)"
}
log.Println(errorPrefix + v.Example + repeat)
}
}
package ext
// Application types for services.
const (
AppTypeWeb = "web"
AppTypeDB = "db"
AppTypeCache = "cache"
AppTypeRPC = "rpc"
)
package ext
const (
CassandraType = "cassandra"
CassandraQuery = "cassandra.query"
CassandraConsistencyLevel = "cassandra.consistency_level"
CassandraCluster = "cassandra.cluster"
CassandraRowCount = "cassandra.row_count"
CassandraKeyspace = "cassandra.keyspace"
CassandraPaginated = "cassandra.paginated"
)
package ext
// Standard error message metadata fields.
const (
ErrorMsg = "error.msg"
ErrorType = "error.type"
ErrorStack = "error.stack"
)
package ext
// HTTP meta constants.
const (
HTTPType = "http"
HTTPMethod = "http.method"
HTTPCode = "http.status_code"
HTTPURL = "http.url"
)
package ext
const (
TargetHost = "out.host"
TargetPort = "out.port"
)
package ext
// Priority is a hint given to the backend so that it knows which traces to reject or kept.
// In a distributed context, it should be set before any context propagation (fork, RPC calls) to be effective.
const (
// PriorityUserReject informs the backend that a trace should be rejected and not stored.
// This should be used by user code overriding default priority.
PriorityUserReject = -1
// PriorityAutoReject informs the backend that a trace should be rejected and not stored.
// This is used by the builtin sampler.
PriorityAutoReject = 0
// PriorityAutoReject informs the backend that a trace should be kept and not stored.
// This is used by the builtin sampler.
PriorityAutoKeep = 1
// PriorityUserReject informs the backend that a trace should be kept and not stored.
// This should be used by user code overriding default priority.
PriorityUserKeep = 2
)
package ext
const (
SQLType = "sql"
SQLQuery = "sql.query"
)
package ext
// Standard system metadata names
const (
// The pid of the traced process
Pid = "system.pid"
)
package ext
import (
"runtime"
"strings"
)
const (
Lang = "go"
Interpreter = runtime.Compiler + "-" + runtime.GOARCH + "-" + runtime.GOOS
TracerVersion = "v0.5.0"
)
var LangVersion = strings.TrimPrefix(runtime.Version(), Lang)
package tracer
import (
cryptorand "crypto/rand"
"log"
"math"
"math/big"
"math/rand"
"sync"
"time"
)
// randGen is the global thread safe random number generator
var randGen *rand.Rand
type randSource struct {
source rand.Source
sync.Mutex
}
func newRandSource() *randSource {
var seed int64
max := big.NewInt(math.MaxInt64)
n, err := cryptorand.Int(cryptorand.Reader, max)
if err == nil {
seed = n.Int64()
} else {
log.Printf("%scannot generate random seed: %v; using current time\n", errorPrefix, err)
seed = time.Now().UnixNano()
}
source := rand.NewSource(seed)
return &randSource{source: source}
}
func (rs *randSource) Int63() int64 {
rs.Lock()
n := rs.source.Int63()
rs.Unlock()
return n
}
func (rs *randSource) Seed(seed int64) {
rs.Lock()
rs.Seed(seed)
rs.Unlock()
}
package tracer
const (
// sampleRateMetricKey is the metric key holding the applied sample rate. Has to be the same as the Agent.
sampleRateMetricKey = "_sample_rate"
// constants used for the Knuth hashing, same constants as the Agent.
maxTraceID = ^uint64(0)
maxTraceIDFloat = float64(maxTraceID)
samplerHasher = uint64(1111111111111111111)
)
// sampler is the generic interface of any sampler
type sampler interface {
Sample(span *Span) // Tells if a trace is sampled and sets `span.Sampled`
}
// allSampler samples all the traces
type allSampler struct{}
func newAllSampler() *allSampler {
return &allSampler{}
}
// Sample samples a span
func (s *allSampler) Sample(span *Span) {
// Nothing to do here, since by default a trace is sampled
}
// rateSampler samples from a sample rate
type rateSampler struct {
SampleRate float64
}
// newRateSampler returns an initialized rateSampler with its sample rate
func newRateSampler(sampleRate float64) *rateSampler {
return &rateSampler{
SampleRate: sampleRate,
}
}
// Sample samples a span
func (s *rateSampler) Sample(span *Span) {
if s.SampleRate < 1 {
span.Sampled = sampleByRate(span.TraceID, s.SampleRate)
span.SetMetric(sampleRateMetricKey, s.SampleRate)
}
}
// sampleByRate tells if a trace (from its ID) with a given rate should be sampled.
// Its implementation has to be the same as the Trace Agent.
func sampleByRate(traceID uint64, sampleRate float64) bool {
if sampleRate < 1 {
return traceID*samplerHasher < uint64(sampleRate*maxTraceIDFloat)
}
return true
}
package tracer
import (
"context"
"fmt"
"reflect"
"runtime/debug"
"strings"
"sync"
"time"
)
const (
errorMsgKey = "error.msg"
errorTypeKey = "error.type"
errorStackKey = "error.stack"
samplingPriorityKey = "_sampling_priority_v1"
)
// Span represents a computation. Callers must call Finish when a span is
// complete to ensure it's submitted.
//
// span := tracer.NewRootSpan("web.request", "datadog.com", "/user/{id}")
// defer span.Finish() // or FinishWithErr(err)
//
// In general, spans should be created with the tracer.NewSpan* functions,
// so they will be submitted on completion.
type Span struct {
// Name is the name of the operation being measured. Some examples
// might be "http.handler", "fileserver.upload" or "video.decompress".
// Name should be set on every span.
Name string `json:"name"`
// Service is the name of the process doing a particular job. Some
// examples might be "user-database" or "datadog-web-app". Services
// will be inherited from parents, so only set this in your app's
// top level span.
Service string `json:"service"`
// Resource is a query to a service. A web application might use
// resources like "/user/{user_id}". A sql database might use resources
// like "select * from user where id = ?".
//
// You can track thousands of resources (not millions or billions) so
// prefer normalized resources like "/user/{id}" to "/user/123".
//
// Resources should only be set on an app's top level spans.
Resource string `json:"resource"`
Type string `json:"type"` // protocol associated with the span
Start int64 `json:"start"` // span start time expressed in nanoseconds since epoch
Duration int64 `json:"duration"` // duration of the span expressed in nanoseconds
Meta map[string]string `json:"meta,omitempty"` // arbitrary map of metadata
Metrics map[string]float64 `json:"metrics,omitempty"` // arbitrary map of numeric metrics
SpanID uint64 `json:"span_id"` // identifier of this span
TraceID uint64 `json:"trace_id"` // identifier of the root span
ParentID uint64 `json:"parent_id"` // identifier of the span's direct parent
Error int32 `json:"error"` // error status of the span; 0 means no errors
Sampled bool `json:"-"` // if this span is sampled (and should be kept/recorded) or not
sync.RWMutex
tracer *Tracer // the tracer that generated this span
finished bool // true if the span has been submitted to a tracer.
// parent contains a link to the parent. In most cases, ParentID can be inferred from this.
// However, ParentID can technically be overridden (typical usage: distributed tracing)
// and also, parent == nil is used to identify root and top-level ("local root") spans.
parent *Span
buffer *spanBuffer
}
// NewSpan creates a new span. This is a low-level function, required for testing and advanced usage.
// Most of the time one should prefer the Tracer NewRootSpan or NewChildSpan methods.
func NewSpan(name, service, resource string, spanID, traceID, parentID uint64, tracer *Tracer) *Span {
return &Span{
Name: name,
Service: service,
Resource: resource,
Meta: tracer.getAllMeta(),
SpanID: spanID,
TraceID: traceID,
ParentID: parentID,
Start: now(),
Sampled: true,
tracer: tracer,
}
}
// setMeta adds an arbitrary meta field to the current Span. The span
// must be locked outside of this function
func (s *Span) setMeta(key, value string) {
if s == nil {
return
}
// We don't lock spans when flushing, so we could have a data race when
// modifying a span as it's being flushed. This protects us against that
// race, since spans are marked `finished` before we flush them.
if s.finished {
return
}
if s.Meta == nil {
s.Meta = make(map[string]string)
}
s.Meta[key] = value
}
// SetMeta adds an arbitrary meta field to the current Span.
// If the Span has been finished, it will not be modified by the method.
func (s *Span) SetMeta(key, value string) {
if s == nil {
return
}
s.Lock()
defer s.Unlock()
s.setMeta(key, value)
}
// SetMetas adds arbitrary meta fields from a given map to the current Span.
// If the Span has been finished, it will not be modified by the method.
func (s *Span) SetMetas(metas map[string]string) {
for k, v := range metas {
s.SetMeta(k, v)
}
}
// GetMeta will return the value for the given tag or the empty string if it
// doesn't exist.
func (s *Span) GetMeta(key string) string {
if s == nil {
return ""
}
s.RLock()
defer s.RUnlock()
if s.Meta == nil {
return ""
}
return s.Meta[key]
}
// SetMetrics adds a metric field to the current Span.
// DEPRECATED: Use SetMetric
func (s *Span) SetMetrics(key string, value float64) {
if s == nil {
return
}
s.SetMetric(key, value)
}
// SetMetric sets a float64 value for the given key. It acts
// like `set_meta()` and it simply add a tag without further processing.
// This method doesn't create a Datadog metric.
func (s *Span) SetMetric(key string, val float64) {
if s == nil {
return
}
s.Lock()
defer s.Unlock()
// We don't lock spans when flushing, so we could have a data race when
// modifying a span as it's being flushed. This protects us against that
// race, since spans are marked `finished` before we flush them.
if s.finished {
return
}
if s.Metrics == nil {
s.Metrics = make(map[string]float64)
}
s.Metrics[key] = val
}
// SetError stores an error object within the span meta. The Error status is
// updated and the error.Error() string is included with a default meta key.
// If the Span has been finished, it will not be modified by this method.
func (s *Span) SetError(err error) {
if err == nil || s == nil {
return
}
s.Lock()
defer s.Unlock()
// We don't lock spans when flushing, so we could have a data race when
// modifying a span as it's being flushed. This protects us against that
// race, since spans are marked `finished` before we flush them.
if s.finished {
return
}
s.Error = 1
s.setMeta(errorMsgKey, err.Error())
s.setMeta(errorTypeKey, reflect.TypeOf(err).String())
stack := debug.Stack()
s.setMeta(errorStackKey, string(stack))
}
// Finish closes this Span (but not its children) providing the duration
// of this part of the tracing session. This method is idempotent so
// calling this method multiple times is safe and doesn't update the
// current Span. Once a Span has been finished, methods that modify the Span
// will become no-ops.
func (s *Span) Finish() {
s.finish(now())
}
// FinishWithTime closes this Span at the given `finishTime`. The
// behavior is the same as `Finish()`.
func (s *Span) FinishWithTime(finishTime int64) {
s.finish(finishTime)
}
func (s *Span) finish(finishTime int64) {
if s == nil {
return
}
s.Lock()
finished := s.finished
if !finished {
if s.Duration == 0 {
s.Duration = finishTime - s.Start
}
s.finished = true
}
s.Unlock()
if finished {
// no-op, called twice, no state change...
return
}
if s.buffer == nil {
if s.tracer != nil {
s.tracer.channels.pushErr(&errorNoSpanBuf{SpanName: s.Name})
}
return
}
// If tracer is explicitely disabled, stop now
if s.tracer != nil && !s.tracer.Enabled() {
return
}
// If not sampled, drop it
if !s.Sampled {
return
}
s.buffer.AckFinish() // put data in channel only if trace is completely finished
// It's important that when Finish() exits, the data is put in
// the channel for real, when the trace is finished.
// Otherwise, tests could become flaky (because you never know in what state
// the channel is).
}
// FinishWithErr marks a span finished and sets the given error if it's
// non-nil.
func (s *Span) FinishWithErr(err error) {
if s == nil {
return
}
s.SetError(err)
s.Finish()
}
// String returns a human readable representation of the span. Not for
// production, just debugging.
func (s *Span) String() string {
lines := []string{
fmt.Sprintf("Name: %s", s.Name),
fmt.Sprintf("Service: %s", s.Service),
fmt.Sprintf("Resource: %s", s.Resource),
fmt.Sprintf("TraceID: %d", s.TraceID),
fmt.Sprintf("SpanID: %d", s.SpanID),
fmt.Sprintf("ParentID: %d", s.ParentID),
fmt.Sprintf("Start: %s", time.Unix(0, s.Start)),
fmt.Sprintf("Duration: %s", time.Duration(s.Duration)),
fmt.Sprintf("Error: %d", s.Error),
fmt.Sprintf("Type: %s", s.Type),
"Tags:",
}
s.RLock()
for key, val := range s.Meta {
lines = append(lines, fmt.Sprintf("\t%s:%s", key, val))
}
s.RUnlock()
return strings.Join(lines, "\n")
}
// Context returns a copy of the given context that includes this span.
// This span can be accessed downstream with SpanFromContext and friends.
func (s *Span) Context(ctx context.Context) context.Context {
if s == nil {
return ctx
}
return context.WithValue(ctx, spanKey, s)
}
// Tracer returns the tracer that created this span.
func (s *Span) Tracer() *Tracer {
if s == nil {
return nil
}
return s.tracer
}
// SetSamplingPriority sets the sampling priority.
func (s *Span) SetSamplingPriority(priority int) {
s.SetMetric(samplingPriorityKey, float64(priority))
}
// HasSamplingPriority returns true if sampling priority is set.
// It can be defined to either zero or non-zero.
func (s *Span) HasSamplingPriority() bool {
_, hasSamplingPriority := s.Metrics[samplingPriorityKey]
return hasSamplingPriority
}
// GetSamplingPriority gets the sampling priority.
func (s *Span) GetSamplingPriority() int {
return int(s.Metrics[samplingPriorityKey])
}
// NextSpanID returns a new random span id.
func NextSpanID() uint64 {
return uint64(randGen.Int63())
}
// +build !windows
package tracer
import "time"
// now returns current UTC time in nanos.
func now() int64 {
return time.Now().UTC().UnixNano()
}
package tracer
import (
"golang.org/x/sys/windows"
"log"
"time"
)
// This method is more precise than the go1.8 time.Now on Windows
// See https://msdn.microsoft.com/en-us/library/windows/desktop/hh706895(v=vs.85).aspx
// It is however ~10x slower and requires Windows 8+.
func highPrecisionNow() int64 {
var ft windows.Filetime
windows.GetSystemTimePreciseAsFileTime(&ft)
return ft.Nanoseconds()
}
func lowPrecisionNow() int64 {
return time.Now().UTC().UnixNano()
}
var now func() int64
// If GetSystemTimePreciseAsFileTime is not available we default to the less
// precise implementation based on time.Now()
func init() {
if err := windows.LoadGetSystemTimePreciseAsFileTime(); err != nil {
log.Printf("Unable to load high precison timer, defaulting to time.Now()")
now = lowPrecisionNow
} else {
log.Printf("Using high precision timer")
now = highPrecisionNow
}
}
This diff is collapsed.
package tracer
import (
"errors"
"fmt"
"log"
"net/http"
"strconv"
"time"
"github.com/DataDog/dd-trace-go/tracer/ext"
)
const (
defaultHostname = "localhost"
defaultPort = "8126"
defaultHTTPTimeout = time.Second // defines the current timeout before giving up with the send process
traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload
)
// Transport is an interface for span submission to the agent.
type Transport interface {
SendTraces(spans [][]*Span) (*http.Response, error)
SendServices(services map[string]Service) (*http.Response, error)
SetHeader(key, value string)
}
// NewTransport returns a new Transport implementation that sends traces to a
// trace agent running on the given hostname and port. If the zero values for
// hostname and port are provided, the default values will be used ("localhost"
// for hostname, and "8126" for port).
//
// In general, using this method is only necessary if you have a trace agent
// running on a non-default port or if it's located on another machine.
func NewTransport(hostname, port string) Transport {
if hostname == "" {
hostname = defaultHostname
}
if port == "" {
port = defaultPort
}
return newHTTPTransport(hostname, port)
}
// newDefaultTransport return a default transport for this tracing client
func newDefaultTransport() Transport {
return newHTTPTransport(defaultHostname, defaultPort)
}
type httpTransport struct {
traceURL string // the delivery URL for traces
legacyTraceURL string // the legacy delivery URL for traces
serviceURL string // the delivery URL for services
legacyServiceURL string // the legacy delivery URL for services
client *http.Client // the HTTP client used in the POST
headers map[string]string // the Transport headers
compatibilityMode bool // the Agent targets a legacy API for compatibility reasons
// [WARNING] We tried to reuse encoders thanks to a pool, but that led us to having race conditions.
// Indeed, when we send the encoder as the request body, the persistConn.writeLoop() goroutine
// can theoretically read the underlying buffer whereas the encoder has been returned to the pool.
// Since the underlying bytes.Buffer is not thread safe, this can make the app panicking.
// since this method will later on spawn a goroutine referencing this buffer.
// That's why we prefer the less performant yet SAFE implementation of allocating a new encoder every time we flush.
getEncoder encoderFactory
}
// newHTTPTransport returns an httpTransport for the given endpoint
func newHTTPTransport(hostname, port string) *httpTransport {
// initialize the default EncoderPool with Encoder headers
defaultHeaders := map[string]string{
"Datadog-Meta-Lang": ext.Lang,
"Datadog-Meta-Lang-Version": ext.LangVersion,
"Datadog-Meta-Lang-Interpreter": ext.Interpreter,
"Datadog-Meta-Tracer-Version": ext.TracerVersion,
}
return &httpTransport{
traceURL: fmt.Sprintf("http://%s:%s/v0.3/traces", hostname, port),
legacyTraceURL: fmt.Sprintf("http://%s:%s/v0.2/traces", hostname, port),
serviceURL: fmt.Sprintf("http://%s:%s/v0.3/services", hostname, port),
legacyServiceURL: fmt.Sprintf("http://%s:%s/v0.2/services", hostname, port),
getEncoder: msgpackEncoderFactory,
client: &http.Client{
Timeout: defaultHTTPTimeout,
},
headers: defaultHeaders,
compatibilityMode: false,
}
}
func (t *httpTransport) SendTraces(traces [][]*Span) (*http.Response, error) {
if t.traceURL == "" {
return nil, errors.New("provided an empty URL, giving up")
}
encoder := t.getEncoder()
// encode the spans and return the error if any
err := encoder.EncodeTraces(traces)
if err != nil {
return nil, err
}
// prepare the client and send the payload
req, _ := http.NewRequest("POST", t.traceURL, encoder)
for header, value := range t.headers {
req.Header.Set(header, value)
}
req.Header.Set(traceCountHeader, strconv.Itoa(len(traces)))
req.Header.Set("Content-Type", encoder.ContentType())
response, err := t.client.Do(req)
// if we have an error, return an empty Response to protect against nil pointer dereference
if err != nil {
return &http.Response{StatusCode: 0}, err
}
defer response.Body.Close()
// if we got a 404 we should downgrade the API to a stable version (at most once)
if (response.StatusCode == 404 || response.StatusCode == 415) && !t.compatibilityMode {
log.Printf("calling the endpoint '%s' but received %d; downgrading the API\n", t.traceURL, response.StatusCode)
t.apiDowngrade()
return t.SendTraces(traces)
}
if sc := response.StatusCode; sc != 200 {
return response, fmt.Errorf("SendTraces expected response code 200, received %v", sc)
}
return response, err
}
func (t *httpTransport) SendServices(services map[string]Service) (*http.Response, error) {
if t.serviceURL == "" {
return nil, errors.New("provided an empty URL, giving up")
}
encoder := t.getEncoder()
if err := encoder.EncodeServices(services); err != nil {
return nil, err
}
// Send it
req, err := http.NewRequest("POST", t.serviceURL, encoder)
if err != nil {
return nil, fmt.Errorf("cannot create http request: %v", err)
}
for header, value := range t.headers {
req.Header.Set(header, value)
}
req.Header.Set("Content-Type", encoder.ContentType())
response, err := t.client.Do(req)
if err != nil {
return &http.Response{StatusCode: 0}, err
}
defer response.Body.Close()
// Downgrade if necessary
if (response.StatusCode == 404 || response.StatusCode == 415) && !t.compatibilityMode {
log.Printf("calling the endpoint '%s' but received %d; downgrading the API\n", t.traceURL, response.StatusCode)
t.apiDowngrade()
return t.SendServices(services)
}
if sc := response.StatusCode; sc != 200 {
return response, fmt.Errorf("SendServices expected response code 200, received %v", sc)
}
return response, err
}
// SetHeader sets the internal header for the httpTransport
func (t *httpTransport) SetHeader(key, value string) {
t.headers[key] = value
}
// changeEncoder switches the encoder so that a different API with different
// format can be targeted, preventing failures because of outdated agents
func (t *httpTransport) changeEncoder(encoderFactory encoderFactory) {
t.getEncoder = encoderFactory
}
// apiDowngrade downgrades the used encoder and API level. This method must fallback to a safe
// encoder and API, so that it will success despite users' configurations. This action
// ensures that the compatibility mode is activated so that the downgrade will be
// executed only once.
func (t *httpTransport) apiDowngrade() {
t.compatibilityMode = true
t.traceURL = t.legacyTraceURL
t.serviceURL = t.legacyServiceURL
t.changeEncoder(jsonEncoderFactory)
}
Copyright (c) 2013 Shopify
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package sarama
type Resource struct {
ResourceType AclResourceType
ResourceName string
}
func (r *Resource) encode(pe packetEncoder) error {
pe.putInt8(int8(r.ResourceType))
if err := pe.putString(r.ResourceName); err != nil {
return err
}
return nil
}
func (r *Resource) decode(pd packetDecoder, version int16) (err error) {
resourceType, err := pd.getInt8()
if err != nil {
return err
}
r.ResourceType = AclResourceType(resourceType)
if r.ResourceName, err = pd.getString(); err != nil {
return err
}
return nil
}
type Acl struct {
Principal string
Host string
Operation AclOperation
PermissionType AclPermissionType
}
func (a *Acl) encode(pe packetEncoder) error {
if err := pe.putString(a.Principal); err != nil {
return err
}
if err := pe.putString(a.Host); err != nil {
return err
}
pe.putInt8(int8(a.Operation))
pe.putInt8(int8(a.PermissionType))
return nil
}
func (a *Acl) decode(pd packetDecoder, version int16) (err error) {
if a.Principal, err = pd.getString(); err != nil {
return err
}
if a.Host, err = pd.getString(); err != nil {
return err
}
operation, err := pd.getInt8()
if err != nil {
return err
}
a.Operation = AclOperation(operation)
permissionType, err := pd.getInt8()
if err != nil {
return err
}
a.PermissionType = AclPermissionType(permissionType)
return nil
}
type ResourceAcls struct {
Resource
Acls []*Acl
}
func (r *ResourceAcls) encode(pe packetEncoder) error {
if err := r.Resource.encode(pe); err != nil {
return err
}
if err := pe.putArrayLength(len(r.Acls)); err != nil {
return err
}
for _, acl := range r.Acls {
if err := acl.encode(pe); err != nil {
return err
}
}
return nil
}
func (r *ResourceAcls) decode(pd packetDecoder, version int16) error {
if err := r.Resource.decode(pd, version); err != nil {
return err
}
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Acls = make([]*Acl, n)
for i := 0; i < n; i++ {
r.Acls[i] = new(Acl)
if err := r.Acls[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
package sarama
type CreateAclsRequest struct {
AclCreations []*AclCreation
}
func (c *CreateAclsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(c.AclCreations)); err != nil {
return err
}
for _, aclCreation := range c.AclCreations {
if err := aclCreation.encode(pe); err != nil {
return err
}
}
return nil
}
func (c *CreateAclsRequest) decode(pd packetDecoder, version int16) (err error) {
n, err := pd.getArrayLength()
if err != nil {
return err
}
c.AclCreations = make([]*AclCreation, n)
for i := 0; i < n; i++ {
c.AclCreations[i] = new(AclCreation)
if err := c.AclCreations[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
func (d *CreateAclsRequest) key() int16 {
return 30
}
func (d *CreateAclsRequest) version() int16 {
return 0
}
func (d *CreateAclsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
type AclCreation struct {
Resource
Acl
}
func (a *AclCreation) encode(pe packetEncoder) error {
if err := a.Resource.encode(pe); err != nil {
return err
}
if err := a.Acl.encode(pe); err != nil {
return err
}
return nil
}
func (a *AclCreation) decode(pd packetDecoder, version int16) (err error) {
if err := a.Resource.decode(pd, version); err != nil {
return err
}
if err := a.Acl.decode(pd, version); err != nil {
return err
}
return nil
}
package sarama
import "time"
type CreateAclsResponse struct {
ThrottleTime time.Duration
AclCreationResponses []*AclCreationResponse
}
func (c *CreateAclsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
if err := pe.putArrayLength(len(c.AclCreationResponses)); err != nil {
return err
}
for _, aclCreationResponse := range c.AclCreationResponses {
if err := aclCreationResponse.encode(pe); err != nil {
return err
}
}
return nil
}
func (c *CreateAclsResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
n, err := pd.getArrayLength()
if err != nil {
return err
}
c.AclCreationResponses = make([]*AclCreationResponse, n)
for i := 0; i < n; i++ {
c.AclCreationResponses[i] = new(AclCreationResponse)
if err := c.AclCreationResponses[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
func (d *CreateAclsResponse) key() int16 {
return 30
}
func (d *CreateAclsResponse) version() int16 {
return 0
}
func (d *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
type AclCreationResponse struct {
Err KError
ErrMsg *string
}
func (a *AclCreationResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(a.Err))
if err := pe.putNullableString(a.ErrMsg); err != nil {
return err
}
return nil
}
func (a *AclCreationResponse) decode(pd packetDecoder, version int16) (err error) {
kerr, err := pd.getInt16()
if err != nil {
return err
}
a.Err = KError(kerr)
if a.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}
return nil
}
package sarama
type DeleteAclsRequest struct {
Filters []*AclFilter
}
func (d *DeleteAclsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(d.Filters)); err != nil {
return err
}
for _, filter := range d.Filters {
if err := filter.encode(pe); err != nil {
return err
}
}
return nil
}
func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error) {
n, err := pd.getArrayLength()
if err != nil {
return err
}
d.Filters = make([]*AclFilter, n)
for i := 0; i < n; i++ {
d.Filters[i] = new(AclFilter)
if err := d.Filters[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
func (d *DeleteAclsRequest) key() int16 {
return 31
}
func (d *DeleteAclsRequest) version() int16 {
return 0
}
func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
package sarama
import "time"
type DeleteAclsResponse struct {
ThrottleTime time.Duration
FilterResponses []*FilterResponse
}
func (a *DeleteAclsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
if err := pe.putArrayLength(len(a.FilterResponses)); err != nil {
return err
}
for _, filterResponse := range a.FilterResponses {
if err := filterResponse.encode(pe); err != nil {
return err
}
}
return nil
}
func (a *DeleteAclsResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
n, err := pd.getArrayLength()
if err != nil {
return err
}
a.FilterResponses = make([]*FilterResponse, n)
for i := 0; i < n; i++ {
a.FilterResponses[i] = new(FilterResponse)
if err := a.FilterResponses[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
func (d *DeleteAclsResponse) key() int16 {
return 31
}
func (d *DeleteAclsResponse) version() int16 {
return 0
}
func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
type FilterResponse struct {
Err KError
ErrMsg *string
MatchingAcls []*MatchingAcl
}
func (f *FilterResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(f.Err))
if err := pe.putNullableString(f.ErrMsg); err != nil {
return err
}
if err := pe.putArrayLength(len(f.MatchingAcls)); err != nil {
return err
}
for _, matchingAcl := range f.MatchingAcls {
if err := matchingAcl.encode(pe); err != nil {
return err
}
}
return nil
}
func (f *FilterResponse) decode(pd packetDecoder, version int16) (err error) {
kerr, err := pd.getInt16()
if err != nil {
return err
}
f.Err = KError(kerr)
if f.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}
n, err := pd.getArrayLength()
if err != nil {
return err
}
f.MatchingAcls = make([]*MatchingAcl, n)
for i := 0; i < n; i++ {
f.MatchingAcls[i] = new(MatchingAcl)
if err := f.MatchingAcls[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
type MatchingAcl struct {
Err KError
ErrMsg *string
Resource
Acl
}
func (m *MatchingAcl) encode(pe packetEncoder) error {
pe.putInt16(int16(m.Err))
if err := pe.putNullableString(m.ErrMsg); err != nil {
return err
}
if err := m.Resource.encode(pe); err != nil {
return err
}
if err := m.Acl.encode(pe); err != nil {
return err
}
return nil
}
func (m *MatchingAcl) decode(pd packetDecoder, version int16) (err error) {
kerr, err := pd.getInt16()
if err != nil {
return err
}
m.Err = KError(kerr)
if m.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}
if err := m.Resource.decode(pd, version); err != nil {
return err
}
if err := m.Acl.decode(pd, version); err != nil {
return err
}
return nil
}
package sarama
type DescribeAclsRequest struct {
AclFilter
}
func (d *DescribeAclsRequest) encode(pe packetEncoder) error {
return d.AclFilter.encode(pe)
}
func (d *DescribeAclsRequest) decode(pd packetDecoder, version int16) (err error) {
return d.AclFilter.decode(pd, version)
}
func (d *DescribeAclsRequest) key() int16 {
return 29
}
func (d *DescribeAclsRequest) version() int16 {
return 0
}
func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
package sarama
import "time"
type DescribeAclsResponse struct {
ThrottleTime time.Duration
Err KError
ErrMsg *string
ResourceAcls []*ResourceAcls
}
func (d *DescribeAclsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
pe.putInt16(int16(d.Err))
if err := pe.putNullableString(d.ErrMsg); err != nil {
return err
}
if err := pe.putArrayLength(len(d.ResourceAcls)); err != nil {
return err
}
for _, resourceAcl := range d.ResourceAcls {
if err := resourceAcl.encode(pe); err != nil {
return err
}
}
return nil
}
func (d *DescribeAclsResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
kerr, err := pd.getInt16()
if err != nil {
return err
}
d.Err = KError(kerr)
errmsg, err := pd.getString()
if err != nil {
return err
}
if errmsg != "" {
d.ErrMsg = &errmsg
}
n, err := pd.getArrayLength()
if err != nil {
return err
}
d.ResourceAcls = make([]*ResourceAcls, n)
for i := 0; i < n; i++ {
d.ResourceAcls[i] = new(ResourceAcls)
if err := d.ResourceAcls[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
func (d *DescribeAclsResponse) key() int16 {
return 29
}
func (d *DescribeAclsResponse) version() int16 {
return 0
}
func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
package sarama
type AclFilter struct {
ResourceType AclResourceType
ResourceName *string
Principal *string
Host *string
Operation AclOperation
PermissionType AclPermissionType
}
func (a *AclFilter) encode(pe packetEncoder) error {
pe.putInt8(int8(a.ResourceType))
if err := pe.putNullableString(a.ResourceName); err != nil {
return err
}
if err := pe.putNullableString(a.Principal); err != nil {
return err
}
if err := pe.putNullableString(a.Host); err != nil {
return err
}
pe.putInt8(int8(a.Operation))
pe.putInt8(int8(a.PermissionType))
return nil
}
func (a *AclFilter) decode(pd packetDecoder, version int16) (err error) {
resourceType, err := pd.getInt8()
if err != nil {
return err
}
a.ResourceType = AclResourceType(resourceType)
if a.ResourceName, err = pd.getNullableString(); err != nil {
return err
}
if a.Principal, err = pd.getNullableString(); err != nil {
return err
}
if a.Host, err = pd.getNullableString(); err != nil {
return err
}
operation, err := pd.getInt8()
if err != nil {
return err
}
a.Operation = AclOperation(operation)
permissionType, err := pd.getInt8()
if err != nil {
return err
}
a.PermissionType = AclPermissionType(permissionType)
return nil
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
package sarama
type ConfigResourceType int8
// Taken from :
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
const (
UnknownResource ConfigResourceType = 0
AnyResource ConfigResourceType = 1
TopicResource ConfigResourceType = 2
GroupResource ConfigResourceType = 3
ClusterResource ConfigResourceType = 4
BrokerResource ConfigResourceType = 5
)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment