Commit 45ef657d authored by Ruslan Drozhdzh's avatar Ruslan Drozhdzh Committed by Miek Gieben

Increase performance of Dnstap plugin (#1280)

- added dnstapEncoder object which incapsulates marshalling of dnstap
   messages to protobuf and writing data to connection

 - dnstapEncoder writes data directly to connection object. It doesn't
   use the framestream's "write" method, because it writes data to
   intermediate buffer (bufio.Writer) which leads to unnecessary
   data copying and drops the performance

 - dnstapEncoder reuses a preallocated buffer for marshalling dnstap
   messages. Many messages are added to the same buffer. They are
   separated with a "frame length" 4-byte values, so the buffer content
   is writen to connection object in the format compatible with
   framestream library

 - added test which guarantees that dnstapEncoder output is the same
   as framestream Encoder output

 - the performance increase is about 50% in (dio *dnstapIO) serve() method
   of dnstap plugin. The overall coredns performance increase is about 10%
   in the following configuration:

   .:1053 {
       erratic {
           drop 0
           truncate 0
           delay 0
       }
       dnstap tcp://127.0.0.1:6000 full
       errors stdout
   }

   tested with dnsperf tool
parent 98632cd4
package dnstapio
import (
"encoding/binary"
"fmt"
"io"
tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream"
"github.com/golang/protobuf/proto"
)
const (
frameLenSize = 4
protobufSize = 1024 * 1024
)
type dnstapEncoder struct {
fse *fs.Encoder
opts *fs.EncoderOptions
writer io.Writer
buffer *proto.Buffer
}
func newDnstapEncoder(o *fs.EncoderOptions) *dnstapEncoder {
return &dnstapEncoder{
opts: o,
buffer: proto.NewBuffer(make([]byte, 0, protobufSize)),
}
}
func (enc *dnstapEncoder) resetWriter(w io.Writer) error {
fse, err := fs.NewEncoder(w, enc.opts)
if err != nil {
return err
}
if err = fse.Flush(); err != nil {
return err
}
enc.fse = fse
enc.writer = w
return nil
}
func (enc *dnstapEncoder) writeMsg(msg *tap.Dnstap) error {
if len(enc.buffer.Bytes()) >= protobufSize {
if err := enc.flushBuffer(); err != nil {
return err
}
}
bufLen := len(enc.buffer.Bytes())
// add placeholder for frame length
if err := enc.buffer.EncodeFixed32(0); err != nil {
enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen])
return err
}
if err := enc.buffer.Marshal(msg); err != nil {
enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen])
return err
}
enc.encodeFrameLen(enc.buffer.Bytes()[bufLen:])
return nil
}
func (enc *dnstapEncoder) flushBuffer() error {
if enc.fse == nil || enc.writer == nil {
return fmt.Errorf("no writer")
}
buf := enc.buffer.Bytes()
written := 0
for written < len(buf) {
n, err := enc.writer.Write(buf[written:])
written += n
if err != nil {
return err
}
}
enc.buffer.Reset()
return nil
}
func (enc *dnstapEncoder) encodeFrameLen(buf []byte) {
binary.BigEndian.PutUint32(buf, uint32(len(buf)-4))
}
func (enc *dnstapEncoder) close() error {
if enc.fse != nil {
return enc.fse.Close()
}
return nil
}
package dnstapio
import (
"bytes"
"testing"
tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream"
"github.com/golang/protobuf/proto"
)
func dnstapMsg() *tap.Dnstap {
t := tap.Dnstap_MESSAGE
mt := tap.Message_CLIENT_RESPONSE
msg := &tap.Message{Type: &mt}
return &tap.Dnstap{Type: &t, Message: msg}
}
func TestEncoderCompatibility(t *testing.T) {
opts := &fs.EncoderOptions{
ContentType: []byte("protobuf:dnstap.DnstapTest"),
Bidirectional: false,
}
msg := dnstapMsg()
//framestream encoder
fsW := new(bytes.Buffer)
fsEnc, err := fs.NewEncoder(fsW, opts)
if err != nil {
t.Fatal(err)
}
data, err := proto.Marshal(msg)
if err != nil {
t.Fatal(err)
}
fsEnc.Write(data)
fsEnc.Close()
//dnstap encoder
dnstapW := new(bytes.Buffer)
dnstapEnc := newDnstapEncoder(opts)
if err := dnstapEnc.resetWriter(dnstapW); err != nil {
t.Fatal(err)
}
dnstapEnc.writeMsg(msg)
dnstapEnc.flushBuffer()
dnstapEnc.close()
//compare results
if !bytes.Equal(fsW.Bytes(), dnstapW.Bytes()) {
t.Fatal("dnstapEncoder is not compatible with framestream Encoder")
}
}
...@@ -3,25 +3,27 @@ package dnstapio ...@@ -3,25 +3,27 @@ package dnstapio
import ( import (
"log" "log"
"net" "net"
"sync/atomic"
"time" "time"
tap "github.com/dnstap/golang-dnstap" tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream" fs "github.com/farsightsec/golang-framestream"
"github.com/golang/protobuf/proto"
) )
const ( const (
tcpTimeout = 4 * time.Second tcpWriteBufSize = 1024 * 1024
flushTimeout = 1 * time.Second tcpTimeout = 4 * time.Second
queueSize = 10000 flushTimeout = 1 * time.Second
queueSize = 10000
) )
type dnstapIO struct { type dnstapIO struct {
endpoint string endpoint string
socket bool socket bool
conn net.Conn conn net.Conn
enc *fs.Encoder enc *dnstapEncoder
queue chan tap.Dnstap queue chan tap.Dnstap
dropped uint32
} }
// New returns a new and initialized DnstapIO. // New returns a new and initialized DnstapIO.
...@@ -29,7 +31,11 @@ func New(endpoint string, socket bool) DnstapIO { ...@@ -29,7 +31,11 @@ func New(endpoint string, socket bool) DnstapIO {
return &dnstapIO{ return &dnstapIO{
endpoint: endpoint, endpoint: endpoint,
socket: socket, socket: socket,
queue: make(chan tap.Dnstap, queueSize), enc: newDnstapEncoder(&fs.EncoderOptions{
ContentType: []byte("protobuf:dnstap.Dnstap"),
Bidirectional: true,
}),
queue: make(chan tap.Dnstap, queueSize),
} }
} }
...@@ -43,18 +49,20 @@ type DnstapIO interface { ...@@ -43,18 +49,20 @@ type DnstapIO interface {
func (dio *dnstapIO) newConnect() error { func (dio *dnstapIO) newConnect() error {
var err error var err error
if dio.socket { if dio.socket {
dio.conn, err = net.Dial("unix", dio.endpoint) if dio.conn, err = net.Dial("unix", dio.endpoint); err != nil {
return err
}
} else { } else {
dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout) if dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout); err != nil {
} return err
if err != nil { }
return err if tcpConn, ok := dio.conn.(*net.TCPConn); ok {
tcpConn.SetWriteBuffer(tcpWriteBufSize)
tcpConn.SetNoDelay(false)
}
} }
dio.enc, err = fs.NewEncoder(dio.conn, &fs.EncoderOptions{
ContentType: []byte("protobuf:dnstap.Dnstap"), if err = dio.enc.resetWriter(dio.conn); err != nil {
Bidirectional: true,
})
if err != nil {
return err return err
} }
return nil return nil
...@@ -73,15 +81,16 @@ func (dio *dnstapIO) Dnstap(payload tap.Dnstap) { ...@@ -73,15 +81,16 @@ func (dio *dnstapIO) Dnstap(payload tap.Dnstap) {
select { select {
case dio.queue <- payload: case dio.queue <- payload:
default: default:
log.Printf("[ERROR] Dnstap payload dropped") atomic.AddUint32(&dio.dropped, 1)
} }
} }
func (dio *dnstapIO) closeConnection() { func (dio *dnstapIO) closeConnection() {
dio.enc.Close() dio.enc.close()
dio.conn.Close() if dio.conn != nil {
dio.enc = nil dio.conn.Close()
dio.conn = nil dio.conn = nil
}
} }
// Close waits until the I/O routine is finished to return. // Close waits until the I/O routine is finished to return.
...@@ -89,32 +98,28 @@ func (dio *dnstapIO) Close() { ...@@ -89,32 +98,28 @@ func (dio *dnstapIO) Close() {
close(dio.queue) close(dio.queue)
} }
func (dio *dnstapIO) write(payload *tap.Dnstap) { func (dio *dnstapIO) flushBuffer() {
if dio.enc == nil { if dio.conn == nil {
if err := dio.newConnect(); err != nil { if err := dio.newConnect(); err != nil {
return return
} }
log.Printf("[INFO] Reconnected to dnstap")
} }
var err error
if payload != nil { if err := dio.enc.flushBuffer(); err != nil {
frame, e := proto.Marshal(payload) log.Printf("[WARN] Connection lost: %s", err)
if err != nil { dio.closeConnection()
log.Printf("[ERROR] Invalid dnstap payload dropped: %s", e) if err := dio.newConnect(); err != nil {
return log.Printf("[ERROR] Cannot connect to dnstap: %s", err)
} else {
log.Printf("[INFO] Reconnected to dnstap")
} }
_, err = dio.enc.Write(frame)
} else {
err = dio.enc.Flush()
} }
if err == nil { }
return
} func (dio *dnstapIO) write(payload *tap.Dnstap) {
log.Printf("[WARN] Connection lost: %s", err) if err := dio.enc.writeMsg(payload); err != nil {
dio.closeConnection() atomic.AddUint32(&dio.dropped, 1)
if err := dio.newConnect(); err != nil {
log.Printf("[ERROR] Cannot write dnstap payload: %s", err)
} else {
log.Printf("[INFO] Reconnect to dnstap done")
} }
} }
...@@ -124,12 +129,16 @@ func (dio *dnstapIO) serve() { ...@@ -124,12 +129,16 @@ func (dio *dnstapIO) serve() {
select { select {
case payload, ok := <-dio.queue: case payload, ok := <-dio.queue:
if !ok { if !ok {
dio.flushBuffer()
dio.closeConnection() dio.closeConnection()
return return
} }
dio.write(&payload) dio.write(&payload)
case <-timeout: case <-timeout:
dio.write(nil) if dropped := atomic.SwapUint32(&dio.dropped, 0); dropped > 0 {
log.Printf("[WARN] Dropped dnstap messages: %d", dropped)
}
dio.flushBuffer()
timeout = time.After(flushTimeout) timeout = time.After(flushTimeout)
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment