Commit a136b712 authored by Miek Gieben's avatar Miek Gieben Committed by GitHub

plugin/dnstap: remove custom encoder (#4242)

* plugin/dnstap: remove encoder*.go

Those files reimplemented parts of the dnstap spec, we can just use the
dnstap functions for that. This leaves all the queuing that is enabled
and drops messages if the dnstap reader can't keep up. In the new code
flush() would never return an error (at least I couldn't make it do so),
so the reconnect functionally is moved to kick off when we get write
errors.

Some smaller cosmetic changes as well, `d.socket` is now `proto`, which
makes the dial() function smaller.

Total testing time is now <1s (which was the impetus to look into this
plugin *again*).

See #4238
The buffered channel needs to be sized correctly, as we may need to do
some queing if the dnstap reader can't keep up.
Signed-off-by: default avatarMiek Gieben <miek@miek.nl>

* add missing file
Signed-off-by: default avatarMiek Gieben <miek@miek.nl>

* update doc on queing
Signed-off-by: default avatarMiek Gieben <miek@miek.nl>
parent f286a24b
...@@ -9,8 +9,8 @@ ...@@ -9,8 +9,8 @@
dnstap is a flexible, structured binary log format for DNS software; see https://dnstap.info. With this dnstap is a flexible, structured binary log format for DNS software; see https://dnstap.info. With this
plugin you make CoreDNS output dnstap logging. plugin you make CoreDNS output dnstap logging.
Note that there is an internal buffer, so expect at least 13 requests before the server sends its Every message is sent to the socket as soon as it comes in, the *dnstap* plugin has a buffer of
dnstap messages to the socket. 10000 messages, above that number dnstap messages will be dropped (this is logged).
## Syntax ## Syntax
...@@ -100,5 +100,5 @@ func (x RandomPlugin) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns ...@@ -100,5 +100,5 @@ func (x RandomPlugin) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns
## See Also ## See Also
The website [dnstap.info](https://dnstap.info) has info on the dnstap protocol. The website [dnstap.info](https://dnstap.info) has info on the dnstap protocol. The *forward*
The *forward* plugin's `dnstap.go` uses dnstap to tap messages sent to an upstream. plugin's `dnstap.go` uses dnstap to tap messages sent to an upstream.
package dnstapio
import (
"encoding/binary"
"fmt"
"io"
tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream"
"github.com/golang/protobuf/proto"
)
const (
protobufSize = 1024 * 1024
)
type dnstapEncoder struct {
fse *fs.Encoder
opts *fs.EncoderOptions
writer io.Writer
buffer *proto.Buffer
}
func newDnstapEncoder(o *fs.EncoderOptions) *dnstapEncoder {
return &dnstapEncoder{
opts: o,
buffer: proto.NewBuffer(make([]byte, 0, protobufSize)),
}
}
func (enc *dnstapEncoder) resetWriter(w io.Writer) error {
fse, err := fs.NewEncoder(w, enc.opts)
if err != nil {
return err
}
if err = fse.Flush(); err != nil {
return err
}
enc.fse = fse
enc.writer = w
return nil
}
func (enc *dnstapEncoder) writeMsg(msg *tap.Dnstap) error {
if len(enc.buffer.Bytes()) >= protobufSize {
if err := enc.flushBuffer(); err != nil {
return err
}
}
bufLen := len(enc.buffer.Bytes())
// add placeholder for frame length
if err := enc.buffer.EncodeFixed32(0); err != nil {
enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen])
return err
}
if err := enc.buffer.Marshal(msg); err != nil {
enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen])
return err
}
enc.encodeFrameLen(enc.buffer.Bytes()[bufLen:])
return nil
}
func (enc *dnstapEncoder) flushBuffer() error {
if enc.fse == nil || enc.writer == nil {
return fmt.Errorf("no writer")
}
buf := enc.buffer.Bytes()
written := 0
for written < len(buf) {
n, err := enc.writer.Write(buf[written:])
written += n
if err != nil {
return err
}
}
enc.buffer.Reset()
return nil
}
func (enc *dnstapEncoder) encodeFrameLen(buf []byte) {
binary.BigEndian.PutUint32(buf, uint32(len(buf)-4))
}
func (enc *dnstapEncoder) close() error {
if enc.fse != nil {
return enc.fse.Close()
}
return nil
}
package dnstapio
import (
"bytes"
"testing"
tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream"
"github.com/golang/protobuf/proto"
)
func dnstapMsg() *tap.Dnstap {
t := tap.Dnstap_MESSAGE
mt := tap.Message_CLIENT_RESPONSE
msg := &tap.Message{Type: &mt}
return &tap.Dnstap{Type: &t, Message: msg}
}
func TestEncoderCompatibility(t *testing.T) {
opts := &fs.EncoderOptions{
ContentType: []byte("protobuf:dnstap.DnstapTest"),
Bidirectional: false,
}
msg := dnstapMsg()
//framestream encoder
fsW := new(bytes.Buffer)
fsEnc, err := fs.NewEncoder(fsW, opts)
if err != nil {
t.Fatal(err)
}
data, err := proto.Marshal(msg)
if err != nil {
t.Fatal(err)
}
fsEnc.Write(data)
fsEnc.Close()
//dnstap encoder
dnstapW := new(bytes.Buffer)
dnstapEnc := newDnstapEncoder(opts)
if err := dnstapEnc.resetWriter(dnstapW); err != nil {
t.Fatal(err)
}
dnstapEnc.writeMsg(msg)
dnstapEnc.flushBuffer()
dnstapEnc.close()
//compare results
if !bytes.Equal(fsW.Bytes(), dnstapW.Bytes()) {
t.Fatal("DnstapEncoder is not compatible with framestream Encoder")
}
}
// Package dnstapio is a small wrapper around golang-framestream
package dnstapio
import (
"io"
"time"
tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream"
"github.com/golang/protobuf/proto"
)
// Encoder wraps a fs.Encoder.
type Encoder struct {
fs *fs.Encoder
}
func newEncoder(w io.Writer, timeout time.Duration) (*Encoder, error) {
fs, err := fs.NewEncoder(w, &fs.EncoderOptions{
ContentType: []byte("protobuf:dnstap.Dnstap"),
Bidirectional: true,
Timeout: timeout,
})
if err != nil {
return nil, err
}
return &Encoder{fs}, nil
}
func (e *Encoder) writeMsg(msg *tap.Dnstap) error {
buf, err := proto.Marshal(msg)
if err != nil {
return err
}
_, err = e.fs.Write(buf) // n < len(buf) should return an error
return err
}
func (e *Encoder) flush() error { return e.fs.Flush() }
func (e *Encoder) close() error { return e.fs.Close() }
...@@ -8,16 +8,15 @@ import ( ...@@ -8,16 +8,15 @@ import (
clog "github.com/coredns/coredns/plugin/pkg/log" clog "github.com/coredns/coredns/plugin/pkg/log"
tap "github.com/dnstap/golang-dnstap" tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream"
) )
var log = clog.NewWithPlugin("dnstap") var log = clog.NewWithPlugin("dnstap")
const ( const (
tcpWriteBufSize = 1024 * 1024 tcpWriteBufSize = 1024 * 1024 // there is no good explanation for why this number (see #xxx)
queueSize = 10000 // see #xxxx
tcpTimeout = 4 * time.Second tcpTimeout = 4 * time.Second
flushTimeout = 1 * time.Second flushTimeout = 1 * time.Second
queueSize = 10000
) )
// Tapper interface is used in testing to mock the Dnstap method. // Tapper interface is used in testing to mock the Dnstap method.
...@@ -27,52 +26,47 @@ type Tapper interface { ...@@ -27,52 +26,47 @@ type Tapper interface {
// dio implements the Tapper interface. // dio implements the Tapper interface.
type dio struct { type dio struct {
endpoint string endpoint string
socket bool proto string
conn net.Conn conn net.Conn
enc *dnstapEncoder enc *Encoder
queue chan tap.Dnstap queue chan tap.Dnstap
dropped uint32 dropped uint32
quit chan struct{} quit chan struct{}
flushTimeout time.Duration
tcpTimeout time.Duration
} }
// New returns a new and initialized pointer to a dio. // New returns a new and initialized pointer to a dio.
func New(endpoint string, socket bool) *dio { func New(proto, endpoint string) *dio {
return &dio{ return &dio{
endpoint: endpoint, endpoint: endpoint,
socket: socket, proto: proto,
enc: newDnstapEncoder(&fs.EncoderOptions{ queue: make(chan tap.Dnstap, queueSize),
ContentType: []byte("protobuf:dnstap.Dnstap"), quit: make(chan struct{}),
Bidirectional: true, flushTimeout: flushTimeout,
}), tcpTimeout: tcpTimeout,
queue: make(chan tap.Dnstap, queueSize),
quit: make(chan struct{}),
} }
} }
func (d *dio) newConnect() error { func (d *dio) dial() error {
var err error conn, err := net.DialTimeout(d.proto, d.endpoint, d.tcpTimeout)
if d.socket { if err != nil {
if d.conn, err = net.Dial("unix", d.endpoint); err != nil { return err
return err }
} if tcpConn, ok := conn.(*net.TCPConn); ok {
} else { tcpConn.SetWriteBuffer(tcpWriteBufSize)
if d.conn, err = net.DialTimeout("tcp", d.endpoint, tcpTimeout); err != nil { tcpConn.SetNoDelay(false)
return err
}
if tcpConn, ok := d.conn.(*net.TCPConn); ok {
tcpConn.SetWriteBuffer(tcpWriteBufSize)
tcpConn.SetNoDelay(false)
}
} }
return d.enc.resetWriter(d.conn) d.enc, err = newEncoder(conn, d.tcpTimeout)
return err
} }
// Connect connects to the dnstap endpoint. // Connect connects to the dnstap endpoint.
func (d *dio) Connect() { func (d *dio) Connect() {
if err := d.newConnect(); err != nil { if err := d.dial(); err != nil {
log.Error("No connection to dnstap endpoint") log.Errorf("No connection to dnstap endpoint: %s", err)
} }
go d.serve() go d.serve()
} }
...@@ -86,58 +80,46 @@ func (d *dio) Dnstap(payload tap.Dnstap) { ...@@ -86,58 +80,46 @@ func (d *dio) Dnstap(payload tap.Dnstap) {
} }
} }
func (d *dio) closeConnection() {
d.enc.close()
if d.conn != nil {
d.conn.Close()
d.conn = nil
}
}
// Close waits until the I/O routine is finished to return. // Close waits until the I/O routine is finished to return.
func (d *dio) Close() { close(d.quit) } func (d *dio) Close() { close(d.quit) }
func (d *dio) flushBuffer() { func (d *dio) write(payload *tap.Dnstap) error {
if d.conn == nil { if d.enc == nil {
if err := d.newConnect(); err != nil { atomic.AddUint32(&d.dropped, 1)
return return nil
}
log.Info("Reconnected to dnstap")
}
if err := d.enc.flushBuffer(); err != nil {
log.Warningf("Connection lost: %s", err)
d.closeConnection()
if err := d.newConnect(); err != nil {
log.Errorf("Cannot connect to dnstap: %s", err)
} else {
log.Info("Reconnected to dnstap")
}
} }
}
func (d *dio) write(payload *tap.Dnstap) {
if err := d.enc.writeMsg(payload); err != nil { if err := d.enc.writeMsg(payload); err != nil {
atomic.AddUint32(&d.dropped, 1) atomic.AddUint32(&d.dropped, 1)
return err
} }
return nil
} }
func (d *dio) serve() { func (d *dio) serve() {
timeout := time.After(flushTimeout) timeout := time.After(d.flushTimeout)
for { for {
select { select {
case <-d.quit: case <-d.quit:
d.flushBuffer() if d.enc == nil {
d.closeConnection() return
}
d.enc.flush()
d.enc.close()
return return
case payload := <-d.queue: case payload := <-d.queue:
d.write(&payload) if err := d.write(&payload); err != nil {
d.dial()
}
case <-timeout: case <-timeout:
if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 { if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 {
log.Warningf("Dropped dnstap messages: %d", dropped) log.Warningf("Dropped dnstap messages: %d", dropped)
} }
d.flushBuffer() if d.enc == nil {
timeout = time.After(flushTimeout) d.dial()
} else {
d.enc.flush()
}
timeout = time.After(d.flushTimeout)
} }
} }
} }
...@@ -12,11 +12,6 @@ import ( ...@@ -12,11 +12,6 @@ import (
fs "github.com/farsightsec/golang-framestream" fs "github.com/farsightsec/golang-framestream"
) )
const (
endpointTCP = "localhost:0"
endpointSocket = "dnstap.sock"
)
var ( var (
msgType = tap.Dnstap_MESSAGE msgType = tap.Dnstap_MESSAGE
msg = tap.Dnstap{Type: &msgType} msg = tap.Dnstap{Type: &msgType}
...@@ -27,7 +22,6 @@ func accept(t *testing.T, l net.Listener, count int) { ...@@ -27,7 +22,6 @@ func accept(t *testing.T, l net.Listener, count int) {
if err != nil { if err != nil {
t.Fatalf("Server accepted: %s", err) t.Fatalf("Server accepted: %s", err)
} }
dec, err := fs.NewDecoder(server, &fs.DecoderOptions{ dec, err := fs.NewDecoder(server, &fs.DecoderOptions{
ContentType: []byte("protobuf:dnstap.Dnstap"), ContentType: []byte("protobuf:dnstap.Dnstap"),
Bidirectional: true, Bidirectional: true,
...@@ -48,9 +42,10 @@ func accept(t *testing.T, l net.Listener, count int) { ...@@ -48,9 +42,10 @@ func accept(t *testing.T, l net.Listener, count int) {
} }
func TestTransport(t *testing.T) { func TestTransport(t *testing.T) {
transport := [2][3]string{
{"tcp", endpointTCP, "false"}, transport := [2][2]string{
{"unix", endpointSocket, "true"}, {"tcp", ":0"},
{"unix", "dnstap.sock"},
} }
for _, param := range transport { for _, param := range transport {
...@@ -67,7 +62,9 @@ func TestTransport(t *testing.T) { ...@@ -67,7 +62,9 @@ func TestTransport(t *testing.T) {
wg.Done() wg.Done()
}() }()
dio := New(l.Addr().String(), param[2] == "true") dio := New(param[0], l.Addr().String())
dio.tcpTimeout = 10 * time.Millisecond
dio.flushTimeout = 30 * time.Millisecond
dio.Connect() dio.Connect()
dio.Dnstap(msg) dio.Dnstap(msg)
...@@ -81,8 +78,7 @@ func TestTransport(t *testing.T) { ...@@ -81,8 +78,7 @@ func TestTransport(t *testing.T) {
func TestRace(t *testing.T) { func TestRace(t *testing.T) {
count := 10 count := 10
// Start TCP listener l, err := reuseport.Listen("tcp", ":0")
l, err := reuseport.Listen("tcp", endpointTCP)
if err != nil { if err != nil {
t.Fatalf("Cannot start listener: %s", err) t.Fatalf("Cannot start listener: %s", err)
} }
...@@ -95,27 +91,27 @@ func TestRace(t *testing.T) { ...@@ -95,27 +91,27 @@ func TestRace(t *testing.T) {
wg.Done() wg.Done()
}() }()
dio := New(l.Addr().String(), false) dio := New("tcp", l.Addr().String())
dio.tcpTimeout = 10 * time.Millisecond
dio.flushTimeout = 30 * time.Millisecond
dio.Connect() dio.Connect()
defer dio.Close() defer dio.Close()
wg.Add(count) wg.Add(count)
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
go func() { go func() {
time.Sleep(50 * time.Millisecond) msg := tap.Dnstap_MESSAGE
dio.Dnstap(msg) dio.Dnstap(tap.Dnstap{Type: &msg})
wg.Done() wg.Done()
}() }()
} }
wg.Wait() wg.Wait()
} }
func TestReconnect(t *testing.T) { func TestReconnect(t *testing.T) {
count := 5 count := 5
// Start TCP listener l, err := reuseport.Listen("tcp", ":0")
l, err := reuseport.Listen("tcp", endpointTCP)
if err != nil { if err != nil {
t.Fatalf("Cannot start listener: %s", err) t.Fatalf("Cannot start listener: %s", err)
} }
...@@ -128,18 +124,18 @@ func TestReconnect(t *testing.T) { ...@@ -128,18 +124,18 @@ func TestReconnect(t *testing.T) {
}() }()
addr := l.Addr().String() addr := l.Addr().String()
dio := New(addr, false) dio := New("tcp", addr)
dio.tcpTimeout = 10 * time.Millisecond
dio.flushTimeout = 30 * time.Millisecond
dio.Connect() dio.Connect()
defer dio.Close() defer dio.Close()
msg := tap.Dnstap_MESSAGE dio.Dnstap(msg)
dio.Dnstap(tap.Dnstap{Type: &msg})
wg.Wait() wg.Wait()
// Close listener // Close listener
l.Close() l.Close()
// And start TCP listener again on the same port // And start TCP listener again on the same port
l, err = reuseport.Listen("tcp", addr) l, err = reuseport.Listen("tcp", addr)
if err != nil { if err != nil {
...@@ -154,9 +150,8 @@ func TestReconnect(t *testing.T) { ...@@ -154,9 +150,8 @@ func TestReconnect(t *testing.T) {
}() }()
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
time.Sleep(time.Second) time.Sleep(100 * time.Millisecond)
dio.Dnstap(tap.Dnstap{Type: &msg}) dio.Dnstap(msg)
} }
wg.Wait() wg.Wait()
} }
...@@ -13,8 +13,8 @@ import ( ...@@ -13,8 +13,8 @@ import (
func init() { plugin.Register("dnstap", setup) } func init() { plugin.Register("dnstap", setup) }
type config struct { type config struct {
proto string
target string target string
socket bool
full bool full bool
} }
...@@ -32,10 +32,10 @@ func parseConfig(d *caddy.Controller) (c config, err error) { ...@@ -32,10 +32,10 @@ func parseConfig(d *caddy.Controller) (c config, err error) {
return c, d.ArgErr() return c, d.ArgErr()
} }
c.target = servers[0] c.target = servers[0]
c.proto = "tcp"
} else { } else {
// default to UNIX socket
c.target = strings.TrimPrefix(c.target, "unix://") c.target = strings.TrimPrefix(c.target, "unix://")
c.socket = true c.proto = "unix"
} }
c.full = d.NextArg() && d.Val() == "full" c.full = d.NextArg() && d.Val() == "full"
...@@ -49,7 +49,7 @@ func setup(c *caddy.Controller) error { ...@@ -49,7 +49,7 @@ func setup(c *caddy.Controller) error {
return plugin.Error("dnstap", err) return plugin.Error("dnstap", err)
} }
dio := dnstapio.New(conf.target, conf.socket) dio := dnstapio.New(conf.proto, conf.target)
dnstap := Dnstap{io: dio, IncludeRawMessage: conf.full} dnstap := Dnstap{io: dio, IncludeRawMessage: conf.full}
c.OnStartup(func() error { c.OnStartup(func() error {
......
...@@ -8,16 +8,16 @@ import ( ...@@ -8,16 +8,16 @@ import (
func TestConfig(t *testing.T) { func TestConfig(t *testing.T) {
tests := []struct { tests := []struct {
file string file string
path string path string
full bool full bool
socket bool proto string
fail bool fail bool
}{ }{
{"dnstap dnstap.sock full", "dnstap.sock", true, true, false}, {"dnstap dnstap.sock full", "dnstap.sock", true, "unix", false},
{"dnstap unix://dnstap.sock", "dnstap.sock", false, true, false}, {"dnstap unix://dnstap.sock", "dnstap.sock", false, "unix", false},
{"dnstap tcp://127.0.0.1:6000", "127.0.0.1:6000", false, false, false}, {"dnstap tcp://127.0.0.1:6000", "127.0.0.1:6000", false, "tcp", false},
{"dnstap", "fail", false, true, true}, {"dnstap", "fail", false, "tcp", true},
} }
for _, c := range tests { for _, c := range tests {
cad := caddy.NewTestController("dns", c.file) cad := caddy.NewTestController("dns", c.file)
...@@ -26,7 +26,7 @@ func TestConfig(t *testing.T) { ...@@ -26,7 +26,7 @@ func TestConfig(t *testing.T) {
if err == nil { if err == nil {
t.Errorf("%s: %s", c.file, err) t.Errorf("%s: %s", c.file, err)
} }
} else if err != nil || conf.target != c.path || conf.full != c.full || conf.socket != c.socket { } else if err != nil || conf.target != c.path || conf.full != c.full || conf.proto != c.proto {
t.Errorf("Expected: %+v\nhave: %+v\nerror: %s", c, conf, err) t.Errorf("Expected: %+v\nhave: %+v\nerror: %s", c, conf, err)
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment