"include/svn:/svn.code.sf.net/p/irrlicht/code/trunk@2950" did not exist on "f2e91f0e7c8026fbbe95679c93f68dc866fce95a"
Commit 1b7492be authored by varyoo's avatar varyoo Committed by Miek Gieben

WIP: middleware/dnstap (#711)

middleware/dnstap add
parent f33b0268
......@@ -373,6 +373,18 @@
revision = "1195e3a8ee1a529d53eed7c624527a68555ddf1f"
version = "v1.5.1"
[[projects]]
branch = "master"
name = "github.com/farsightsec/golang-framestream"
packages = ["."]
revision = "dec85654e8b8cf6712870afb14ee53d1c98cd5e2"
[[projects]]
branch = "master"
name = "github.com/dnstap/golang-dnstap"
packages = ["."]
revision = "c32b266c26b040b00f522a0183527a9eb2a1cf8b"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
......
......@@ -11,6 +11,7 @@ import (
_ "github.com/coredns/coredns/middleware/cache"
_ "github.com/coredns/coredns/middleware/chaos"
_ "github.com/coredns/coredns/middleware/dnssec"
_ "github.com/coredns/coredns/middleware/dnstap"
_ "github.com/coredns/coredns/middleware/erratic"
_ "github.com/coredns/coredns/middleware/errors"
_ "github.com/coredns/coredns/middleware/etcd"
......
......@@ -21,6 +21,7 @@ var directives = []string{
"prometheus",
"errors",
"log",
"dnstap",
"chaos",
"cache",
"rewrite",
......
......@@ -29,6 +29,7 @@
70:prometheus:metrics
80:errors:errors
90:log:log
95:dnstap:dnstap
100:chaos:chaos
110:cache:cache
120:rewrite:rewrite
......
# Dnstap
## Syntax
`dnstap SOCKET [full]`
* **SOCKET** is the socket path supplied to the dnstap command line tool.
* `full` to include the wire-format dns message.
## Dnstap command line tool
```sh
go get github.com/dnstap/golang-dnstap
cd $GOPATH/src/github.com/dnstap/golang-dnstap/dnstap
go build
./dnstap -u /tmp/dnstap.sock
./dnstap -u /tmp/dnstap.sock -y
```
There is a buffer, expect at least 13 requests before the server sends its dnstap messages to the socket.
package dnstap
import (
"fmt"
"golang.org/x/net/context"
"io"
"github.com/coredns/coredns/middleware"
"github.com/coredns/coredns/middleware/dnstap/msg"
"github.com/coredns/coredns/middleware/dnstap/taprw"
tap "github.com/dnstap/golang-dnstap"
"github.com/miekg/dns"
)
type Dnstap struct {
Next middleware.Handler
Out io.Writer
Pack bool
}
func tapMessageTo(w io.Writer, m *tap.Message) error {
frame, err := msg.Marshal(m)
if err != nil {
return fmt.Errorf("marshal: %s", err)
}
_, err = w.Write(frame)
return err
}
func (h Dnstap) TapMessage(m *tap.Message) error {
return tapMessageTo(h.Out, m)
}
func (h Dnstap) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
rw := taprw.ResponseWriter{ResponseWriter: w, Taper: &h, Query: r, Pack: h.Pack}
rw.QueryEpoch()
code, err := middleware.NextOrFailure(h.Name(), h.Next, ctx, &rw, r)
if err != nil {
// ignore dnstap errors
return code, err
}
if err := rw.DnstapError(); err != nil {
return code, middleware.Error("dnstap", err)
}
return code, nil
}
func (h Dnstap) Name() string { return "dnstap" }
package dnstap
import (
"errors"
"fmt"
"testing"
"github.com/coredns/coredns/middleware/dnstap/test"
mwtest "github.com/coredns/coredns/middleware/test"
tap "github.com/dnstap/golang-dnstap"
"github.com/golang/protobuf/proto"
"github.com/miekg/dns"
"golang.org/x/net/context"
)
func testCase(t *testing.T, tapq, tapr *tap.Message, q, r *dns.Msg) {
w := writer{}
w.queue = append(w.queue, tapq, tapr)
h := Dnstap{
Next: mwtest.HandlerFunc(func(_ context.Context,
w dns.ResponseWriter, _ *dns.Msg) (int, error) {
return 0, w.WriteMsg(r)
}),
Out: &w,
Pack: false,
}
_, err := h.ServeDNS(context.TODO(), &mwtest.ResponseWriter{}, q)
if err != nil {
t.Fatal(err)
}
}
type writer struct {
queue []*tap.Message
}
func (w *writer) Write(b []byte) (int, error) {
e := tap.Dnstap{}
if err := proto.Unmarshal(b, &e); err != nil {
return 0, err
}
if len(w.queue) == 0 {
return 0, errors.New("message not expected")
}
if !test.MsgEqual(w.queue[0], e.Message) {
return 0, fmt.Errorf("want: %v, have: %v", w.queue[0], e.Message)
}
w.queue = w.queue[1:]
return len(b), nil
}
func TestDnstap(t *testing.T) {
q := mwtest.Case{Qname: "example.org", Qtype: dns.TypeA}.Msg()
r := mwtest.Case{
Qname: "example.org.", Qtype: dns.TypeA,
Answer: []dns.RR{
mwtest.A("example.org. 3600 IN A 10.0.0.1"),
},
}.Msg()
tapq := test.TestingData().ToClientQuery()
tapr := test.TestingData().ToClientResponse()
testCase(t, tapq, tapr, q, r)
}
// Package msg helps to build a dnstap Message.
package msg
import (
"errors"
"net"
"time"
"github.com/coredns/coredns/request"
tap "github.com/dnstap/golang-dnstap"
"github.com/miekg/dns"
)
// Data helps to build a dnstap Message.
// It can be transformed into the actual Message using this package.
type Data struct {
Type tap.Message_Type
Packed []byte
SocketProto tap.SocketProtocol
SocketFam tap.SocketFamily
Address []byte
Port uint32
TimeSec uint64
}
func (d *Data) FromRequest(r request.Request) error {
switch addr := r.W.RemoteAddr().(type) {
case *net.TCPAddr:
d.Address = addr.IP
d.Port = uint32(addr.Port)
d.SocketProto = tap.SocketProtocol_TCP
case *net.UDPAddr:
d.Address = addr.IP
d.Port = uint32(addr.Port)
d.SocketProto = tap.SocketProtocol_UDP
default:
return errors.New("unknown remote address type")
}
if a := net.IP(d.Address); a.To4() != nil {
d.SocketFam = tap.SocketFamily_INET
} else {
d.SocketFam = tap.SocketFamily_INET6
}
return nil
}
func (d *Data) Pack(m *dns.Msg) error {
packed, err := m.Pack()
if err != nil {
return err
}
d.Packed = packed
return nil
}
func (d *Data) Epoch() {
d.TimeSec = uint64(time.Now().Unix())
}
// Transform the data into a client response message.
func (d *Data) ToClientResponse() *tap.Message {
d.Type = tap.Message_CLIENT_RESPONSE
return &tap.Message{
Type: &d.Type,
SocketFamily: &d.SocketFam,
SocketProtocol: &d.SocketProto,
ResponseTimeSec: &d.TimeSec,
ResponseMessage: d.Packed,
QueryAddress: d.Address,
QueryPort: &d.Port,
}
}
// Transform the data into a client query message.
func (d *Data) ToClientQuery() *tap.Message {
d.Type = tap.Message_CLIENT_QUERY
return &tap.Message{
Type: &d.Type,
SocketFamily: &d.SocketFam,
SocketProtocol: &d.SocketProto,
QueryTimeSec: &d.TimeSec,
QueryMessage: d.Packed,
QueryAddress: d.Address,
QueryPort: &d.Port,
}
}
package msg
import (
"net"
"reflect"
"testing"
"github.com/coredns/coredns/middleware/test"
"github.com/coredns/coredns/request"
tap "github.com/dnstap/golang-dnstap"
"github.com/miekg/dns"
)
func testRequest(t *testing.T, expected Data, r request.Request) {
d := Data{}
if err := d.FromRequest(r); err != nil {
t.Fail()
return
}
if d.SocketProto != expected.SocketProto ||
d.SocketFam != expected.SocketFam ||
!reflect.DeepEqual(d.Address, expected.Address) ||
d.Port != expected.Port {
t.Fatalf("expected: %v, have: %v", expected, d)
return
}
}
func TestRequest(t *testing.T) {
testRequest(t, Data{
SocketProto: tap.SocketProtocol_UDP,
SocketFam: tap.SocketFamily_INET,
Address: net.ParseIP("10.240.0.1"),
Port: 40212,
}, testingRequest())
}
func testingRequest() request.Request {
m := new(dns.Msg)
m.SetQuestion("example.com.", dns.TypeA)
m.SetEdns0(4097, true)
return request.Request{W: &test.ResponseWriter{}, Req: m}
}
package msg
import (
"fmt"
lib "github.com/dnstap/golang-dnstap"
"github.com/golang/protobuf/proto"
)
func wrap(m *lib.Message) *lib.Dnstap {
t := lib.Dnstap_MESSAGE
return &lib.Dnstap{
Type: &t,
Message: m,
}
}
func Marshal(m *lib.Message) (data []byte, err error) {
data, err = proto.Marshal(wrap(m))
if err != nil {
err = fmt.Errorf("proto: %s", err)
return
}
return
}
package out
import (
"fmt"
"net"
fs "github.com/farsightsec/golang-framestream"
)
// Socket is a Frame Streams encoder over a UNIX socket.
type Socket struct {
path string
enc *fs.Encoder
conn net.Conn
err error
}
func openSocket(s *Socket) error {
conn, err := net.Dial("unix", s.path)
if err != nil {
return err
}
s.conn = conn
enc, err := fs.NewEncoder(conn, &fs.EncoderOptions{
ContentType: []byte("protobuf:dnstap.Dnstap"),
Bidirectional: true,
})
if err != nil {
return err
}
s.enc = enc
s.err = nil
return nil
}
// NewSocket will always return a new Socket.
// err if nothing is listening to it, it will attempt to reconnect on the next Write.
func NewSocket(path string) (s *Socket, err error) {
s = &Socket{path: path}
if err = openSocket(s); err != nil {
err = fmt.Errorf("open socket: %s", err)
s.err = err
return
}
return
}
// Write a single Frame Streams frame.
func (s *Socket) Write(frame []byte) (int, error) {
if s.err != nil {
// is the dnstap tool listening?
if err := openSocket(s); err != nil {
return 0, fmt.Errorf("open socket: %s", err)
}
}
n, err := s.enc.Write(frame)
if err != nil {
// the dnstap command line tool is down
s.conn.Close()
s.err = err
return 0, err
}
return n, nil
}
// Close the socket and flush the remaining frames.
func (s *Socket) Close() error {
if s.err != nil {
// nothing to close
return nil
}
defer s.conn.Close()
if err := s.enc.Flush(); err != nil {
return fmt.Errorf("flush: %s", err)
}
if err := s.enc.Close(); err != nil {
return err
}
return nil
}
package out
import (
"net"
"testing"
fs "github.com/farsightsec/golang-framestream"
)
func acceptOne(t *testing.T, l net.Listener) {
server, err := l.Accept()
if err != nil {
t.Fatalf("server accept: %s", err)
return
}
dec, err := fs.NewDecoder(server, &fs.DecoderOptions{
ContentType: []byte("protobuf:dnstap.Dnstap"),
Bidirectional: true,
})
if err != nil {
t.Fatalf("server decoder: %s", err)
return
}
if _, err := dec.Decode(); err != nil {
t.Errorf("server decode: %s", err)
}
if err := server.Close(); err != nil {
t.Error(err)
}
}
func sendOne(socket *Socket) error {
if _, err := socket.Write([]byte("frame")); err != nil {
return err
}
if err := socket.enc.Flush(); err != nil {
// Would happen during Write in real life.
socket.conn.Close()
socket.err = err
return err
}
return nil
}
func TestSocket(t *testing.T) {
socket, err := NewSocket("dnstap.sock")
if err == nil {
t.Fatal("new socket: not listening but no error")
return
}
if err := sendOne(socket); err == nil {
t.Fatal("not listening but no error")
return
}
l, err := net.Listen("unix", "dnstap.sock")
if err != nil {
t.Fatal(err)
return
}
wait := make(chan bool)
go func() {
acceptOne(t, l)
wait <- true
}()
if err := sendOne(socket); err != nil {
t.Fatalf("send one: %s", err)
return
}
<-wait
if err := sendOne(socket); err == nil {
panic("must fail")
}
go func() {
acceptOne(t, l)
wait <- true
}()
if err := sendOne(socket); err != nil {
t.Fatalf("send one: %s", err)
return
}
<-wait
if err := l.Close(); err != nil {
t.Error(err)
}
}
package dnstap
import (
"fmt"
"log"
"github.com/coredns/coredns/core/dnsserver"
"github.com/coredns/coredns/middleware"
"github.com/coredns/coredns/middleware/dnstap/out"
"github.com/mholt/caddy"
"github.com/mholt/caddy/caddyfile"
)
func init() {
caddy.RegisterPlugin("dnstap", caddy.Plugin{
ServerType: "dns",
Action: wrapSetup,
})
}
func wrapSetup(c *caddy.Controller) error {
if err := setup(c); err != nil {
return middleware.Error("dnstap", err)
}
return nil
}
func parseConfig(c *caddyfile.Dispenser) (path string, full bool, err error) {
c.Next() // directive name
if !c.Args(&path) {
err = c.ArgErr()
return
}
full = c.NextArg() && c.Val() == "full"
return
}
func setup(c *caddy.Controller) error {
path, full, err := parseConfig(&c.Dispenser)
if err != nil {
return err
}
dnstap := Dnstap{Pack: full}
o, err := out.NewSocket(path)
if err != nil {
log.Printf("[WARN] Can't connect to %s at the moment", path)
}
dnstap.Out = o
c.OnShutdown(func() error {
if err := o.Close(); err != nil {
return fmt.Errorf("output: %s", err)
}
return nil
})
dnsserver.GetConfig(c).AddMiddleware(
func(next middleware.Handler) middleware.Handler {
dnstap.Next = next
return dnstap
})
return nil
}
package dnstap
import (
"github.com/mholt/caddy"
"testing"
)
func TestConfig(t *testing.T) {
file := "dnstap dnstap.sock full"
c := caddy.NewTestController("dns", file)
if path, full, err := parseConfig(&c.Dispenser); path != "dnstap.sock" || !full {
t.Fatalf("%s: %s", file, err)
}
file = "dnstap dnstap.sock"
c = caddy.NewTestController("dns", file)
if path, full, err := parseConfig(&c.Dispenser); path != "dnstap.sock" || full {
t.Fatalf("%s: %s", file, err)
}
}
// Package taprw takes a query and intercepts the response.
// It will log both after the response is written.
package taprw
import (
"fmt"
"github.com/coredns/coredns/middleware/dnstap/msg"
"github.com/coredns/coredns/request"
tap "github.com/dnstap/golang-dnstap"
"github.com/miekg/dns"
)
type Taper interface {
TapMessage(m *tap.Message) error
}
// Single request use.
type ResponseWriter struct {
queryData msg.Data
Query *dns.Msg
dns.ResponseWriter
Taper
Pack bool
err error
}
// Check if a dnstap error occured.
// Set during ResponseWriter.Write.
func (w ResponseWriter) DnstapError() error {
return w.err
}
// To be called as soon as possible.
func (w *ResponseWriter) QueryEpoch() {
w.queryData.Epoch()
}
// Write back the response to the client and THEN work on logging the request
// and response to dnstap.
// Dnstap errors to be checked by DnstapError.
func (w *ResponseWriter) WriteMsg(resp *dns.Msg) error {
writeErr := w.ResponseWriter.WriteMsg(resp)
if err := tapQuery(w); err != nil {
w.err = fmt.Errorf("client query: %s", err)
// don't forget to call DnstapError later
}
if writeErr == nil {
if err := tapResponse(w, resp); err != nil {
w.err = fmt.Errorf("client response: %s", err)
}
}
return writeErr
}
func tapQuery(w *ResponseWriter) error {
req := request.Request{W: w.ResponseWriter, Req: w.Query}
if err := w.queryData.FromRequest(req); err != nil {
return err
}
if w.Pack {
if err := w.queryData.Pack(w.Query); err != nil {
return fmt.Errorf("pack: %s", err)
}
}
return w.Taper.TapMessage(w.queryData.ToClientQuery())
}
func tapResponse(w *ResponseWriter, resp *dns.Msg) error {
d := &msg.Data{}
d.Epoch()
req := request.Request{W: w, Req: resp}
if err := d.FromRequest(req); err != nil {
return err
}
if w.Pack {
if err := d.Pack(resp); err != nil {
return fmt.Errorf("pack: %s", err)
}
}
return w.Taper.TapMessage(d.ToClientResponse())
}
package taprw
import (
"errors"
"testing"
"github.com/coredns/coredns/middleware/dnstap/test"
mwtest "github.com/coredns/coredns/middleware/test"
tap "github.com/dnstap/golang-dnstap"
"github.com/miekg/dns"
)
type TapFailer struct {
}
func (TapFailer) TapMessage(*tap.Message) error {
return errors.New("failed")
}
func TestDnstapError(t *testing.T) {
rw := ResponseWriter{
Query: new(dns.Msg),
ResponseWriter: &mwtest.ResponseWriter{},
Taper: TapFailer{},
}
if err := rw.WriteMsg(new(dns.Msg)); err != nil {
t.Errorf("dnstap error during Write: %s", err)
}
if rw.DnstapError() == nil {
t.Fatal("no dnstap error")
}
}
func testingMsg() (m *dns.Msg) {
m = new(dns.Msg)
m.SetQuestion("example.com.", dns.TypeA)
m.SetEdns0(4097, true)
return
}
func TestClientResponse(t *testing.T) {
trapper := test.TrapTaper{}
rw := ResponseWriter{
Pack: true,
Taper: &trapper,
ResponseWriter: &mwtest.ResponseWriter{},
}
d := test.TestingData()
m := testingMsg()
// will the wire-format msg be reported?
bin, err := m.Pack()
if err != nil {
t.Fatal(err)
return
}
d.Packed = bin
if err := tapResponse(&rw, m); err != nil {
t.Fatal(err)
return
}
want := d.ToClientResponse()
if l := len(trapper.Trap); l != 1 {
t.Fatalf("%d msg trapped", l)
return
}
have := trapper.Trap[0]
if !test.MsgEqual(want, have) {
t.Fatalf("want: %v\nhave: %v", want, have)
}
}
func TestClientQuery(t *testing.T) {
trapper := test.TrapTaper{}
rw := ResponseWriter{
Pack: false, // no binary this time
Taper: &trapper,
ResponseWriter: &mwtest.ResponseWriter{},
Query: testingMsg(),
}
if err := tapQuery(&rw); err != nil {
t.Fatal(err)
return
}
want := test.TestingData().ToClientQuery()
if l := len(trapper.Trap); l != 1 {
t.Fatalf("%d msg trapped", l)
return
}
have := trapper.Trap[0]
if !test.MsgEqual(want, have) {
t.Fatalf("want: %v\nhave: %v", want, have)
}
}
package test
import (
"net"
"reflect"
"github.com/coredns/coredns/middleware/dnstap/msg"
tap "github.com/dnstap/golang-dnstap"
)
func TestingData() (d *msg.Data) {
d = &msg.Data{
Type: tap.Message_CLIENT_RESPONSE,
SocketFam: tap.SocketFamily_INET,
SocketProto: tap.SocketProtocol_UDP,
Address: net.ParseIP("10.240.0.1"),
Port: 40212,
}
return
}
type comp struct {
Type *tap.Message_Type
SF *tap.SocketFamily
SP *tap.SocketProtocol
QA []byte
RA []byte
QP *uint32
RP *uint32
QTSec bool
RTSec bool
RM []byte
QM []byte
}
func toComp(m *tap.Message) comp {
return comp{
Type: m.Type,
SF: m.SocketFamily,
SP: m.SocketProtocol,
QA: m.QueryAddress,
RA: m.ResponseAddress,
QP: m.QueryPort,
RP: m.ResponsePort,
QTSec: m.QueryTimeSec != nil,
RTSec: m.ResponseTimeSec != nil,
RM: m.ResponseMessage,
QM: m.QueryMessage,
}
}
func MsgEqual(a, b *tap.Message) bool {
return reflect.DeepEqual(toComp(a), toComp(b))
}
type TrapTaper struct {
Trap []*tap.Message
}
func (t *TrapTaper) TapMessage(m *tap.Message) error {
t.Trap = append(t.Trap, m)
return nil
}
Copyright (c) 2013-2014 by Farsight Security, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
/*
* Copyright (c) 2013-2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dnstap
import (
"io"
"log"
"os"
"github.com/farsightsec/golang-framestream"
)
type FrameStreamInput struct {
wait chan bool
decoder *framestream.Decoder
}
func NewFrameStreamInput(r io.ReadWriter, bi bool) (input *FrameStreamInput, err error) {
input = new(FrameStreamInput)
decoderOptions := framestream.DecoderOptions{
ContentType: FSContentType,
Bidirectional: bi,
}
input.decoder, err = framestream.NewDecoder(r, &decoderOptions)
if err != nil {
return
}
input.wait = make(chan bool)
return
}
func NewFrameStreamInputFromFilename(fname string) (input *FrameStreamInput, err error) {
file, err := os.Open(fname)
if err != nil {
return nil, err
}
input, err = NewFrameStreamInput(file, false)
return
}
func (input *FrameStreamInput) ReadInto(output chan []byte) {
for {
buf, err := input.decoder.Decode()
if err != nil {
if err != io.EOF {
log.Printf("framestream.Decoder.Decode() failed: %s\n", err)
}
break
}
newbuf := make([]byte, len(buf))
copy(newbuf, buf)
output <- newbuf
}
close(input.wait)
}
func (input *FrameStreamInput) Wait() {
<-input.wait
}
/*
* Copyright (c) 2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dnstap
import (
"io"
"log"
"os"
"github.com/farsightsec/golang-framestream"
)
type FrameStreamOutput struct {
outputChannel chan []byte
wait chan bool
enc *framestream.Encoder
}
func NewFrameStreamOutput(w io.Writer) (o *FrameStreamOutput, err error) {
o = new(FrameStreamOutput)
o.outputChannel = make(chan []byte, outputChannelSize)
o.enc, err = framestream.NewEncoder(w, &framestream.EncoderOptions{ContentType: FSContentType})
if err != nil {
return
}
o.wait = make(chan bool)
return
}
func NewFrameStreamOutputFromFilename(fname string) (o *FrameStreamOutput, err error) {
if fname == "" || fname == "-" {
return NewFrameStreamOutput(os.Stdout)
}
w, err := os.Create(fname)
if err != nil {
return
}
return NewFrameStreamOutput(w)
}
func (o *FrameStreamOutput) GetOutputChannel() chan []byte {
return o.outputChannel
}
func (o *FrameStreamOutput) RunOutputLoop() {
for frame := range o.outputChannel {
if _, err := o.enc.Write(frame); err != nil {
log.Fatalf("framestream.Encoder.Write() failed: %s\n", err)
break
}
}
close(o.wait)
}
func (o *FrameStreamOutput) Close() {
close(o.outputChannel)
<-o.wait
o.enc.Flush()
o.enc.Close()
}
/*
* Copyright (c) 2013-2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dnstap
import (
"log"
"net"
"os"
)
type FrameStreamSockInput struct {
wait chan bool
listener net.Listener
}
func NewFrameStreamSockInput(listener net.Listener) (input *FrameStreamSockInput) {
input = new(FrameStreamSockInput)
input.listener = listener
return
}
func NewFrameStreamSockInputFromPath(socketPath string) (input *FrameStreamSockInput, err error) {
os.Remove(socketPath)
listener, err := net.Listen("unix", socketPath)
if err != nil {
return
}
return NewFrameStreamSockInput(listener), nil
}
func (input *FrameStreamSockInput) ReadInto(output chan []byte) {
for {
conn, err := input.listener.Accept()
if err != nil {
log.Printf("net.Listener.Accept() failed: %s\n", err)
continue
}
i, err := NewFrameStreamInput(conn, true)
if err != nil {
log.Printf("dnstap.NewFrameStreamInput() failed: %s\n", err)
continue
}
log.Printf("dnstap.FrameStreamSockInput: accepted a socket connection\n")
go i.ReadInto(output)
}
}
func (input *FrameStreamSockInput) Wait() {
select {}
}
This diff is collapsed.
/*
* Copyright (c) 2013-2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dnstap
import (
"bytes"
"fmt"
"net"
"strconv"
"time"
"github.com/miekg/dns"
)
const quietTimeFormat = "15:04:05"
func textConvertTime(s *bytes.Buffer, secs *uint64, nsecs *uint32) {
if secs != nil {
s.WriteString(time.Unix(int64(*secs), 0).Format(quietTimeFormat))
} else {
s.WriteString("??:??:??")
}
if nsecs != nil {
s.WriteString(fmt.Sprintf(".%06d", *nsecs/1000))
} else {
s.WriteString(".??????")
}
}
func textConvertIP(s *bytes.Buffer, ip []byte) {
if ip != nil {
s.WriteString(net.IP(ip).String())
} else {
s.WriteString("MISSING_ADDRESS")
}
}
func textConvertMessage(m *Message, s *bytes.Buffer) {
isQuery := false
printQueryAddress := false
switch *m.Type {
case Message_CLIENT_QUERY,
Message_RESOLVER_QUERY,
Message_AUTH_QUERY,
Message_FORWARDER_QUERY,
Message_TOOL_QUERY:
isQuery = true
case Message_CLIENT_RESPONSE,
Message_RESOLVER_RESPONSE,
Message_AUTH_RESPONSE,
Message_FORWARDER_RESPONSE,
Message_TOOL_RESPONSE:
isQuery = false
default:
s.WriteString("[unhandled Message.Type]\n")
return
}
if isQuery {
textConvertTime(s, m.QueryTimeSec, m.QueryTimeNsec)
} else {
textConvertTime(s, m.ResponseTimeSec, m.ResponseTimeNsec)
}
s.WriteString(" ")
switch *m.Type {
case Message_CLIENT_QUERY,
Message_CLIENT_RESPONSE:
{
s.WriteString("C")
}
case Message_RESOLVER_QUERY,
Message_RESOLVER_RESPONSE:
{
s.WriteString("R")
}
case Message_AUTH_QUERY,
Message_AUTH_RESPONSE:
{
s.WriteString("A")
}
case Message_FORWARDER_QUERY,
Message_FORWARDER_RESPONSE:
{
s.WriteString("F")
}
case Message_STUB_QUERY,
Message_STUB_RESPONSE:
{
s.WriteString("S")
}
case Message_TOOL_QUERY,
Message_TOOL_RESPONSE:
{
s.WriteString("T")
}
}
if isQuery {
s.WriteString("Q ")
} else {
s.WriteString("R ")
}
switch *m.Type {
case Message_CLIENT_QUERY,
Message_CLIENT_RESPONSE,
Message_AUTH_QUERY,
Message_AUTH_RESPONSE:
printQueryAddress = true
}
if printQueryAddress {
textConvertIP(s, m.QueryAddress)
} else {
textConvertIP(s, m.ResponseAddress)
}
s.WriteString(" ")
if m.SocketProtocol != nil {
s.WriteString(m.SocketProtocol.String())
}
s.WriteString(" ")
var err error
msg := new(dns.Msg)
if isQuery {
s.WriteString(strconv.Itoa(len(m.QueryMessage)))
s.WriteString("b ")
err = msg.Unpack(m.QueryMessage)
} else {
s.WriteString(strconv.Itoa(len(m.ResponseMessage)))
s.WriteString("b ")
err = msg.Unpack(m.ResponseMessage)
}
if err != nil {
s.WriteString("X ")
} else {
s.WriteString("\"" + msg.Question[0].Name + "\" ")
s.WriteString(dns.Class(msg.Question[0].Qclass).String() + " ")
s.WriteString(dns.Type(msg.Question[0].Qtype).String())
}
s.WriteString("\n")
}
func TextFormat(dt *Dnstap) (out []byte, ok bool) {
var s bytes.Buffer
if *dt.Type == Dnstap_MESSAGE {
textConvertMessage(dt.Message, &s)
return s.Bytes(), true
}
return nil, false
}
dnstap: flexible, structured event replication format for DNS servers
---------------------------------------------------------------------
dnstap implements an encoding format for DNS server events. It uses a
lightweight framing on top of event payloads encoded using Protocol Buffers and
is transport neutral.
dnstap can represent internal state inside a DNS server that is difficult to
obtain using techniques based on traditional packet capture or unstructured
textual format logging.
This repository contains a command-line tool named "dnstap" developed in the
Go programming language. It can be installed with the following command:
go get -u github.com/dnstap/golang-dnstap/dnstap
/*
* Copyright (c) 2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dnstap
import (
"bufio"
"io"
"log"
"os"
"github.com/golang/protobuf/proto"
)
type TextFormatFunc func(*Dnstap) ([]byte, bool)
type TextOutput struct {
format TextFormatFunc
outputChannel chan []byte
wait chan bool
writer *bufio.Writer
}
func NewTextOutput(writer io.Writer, format TextFormatFunc) (o *TextOutput) {
o = new(TextOutput)
o.format = format
o.outputChannel = make(chan []byte, outputChannelSize)
o.writer = bufio.NewWriter(writer)
o.wait = make(chan bool)
return
}
func NewTextOutputFromFilename(fname string, format TextFormatFunc) (o *TextOutput, err error) {
if fname == "" || fname == "-" {
return NewTextOutput(os.Stdout, format), nil
}
writer, err := os.Create(fname)
if err != nil {
return
}
return NewTextOutput(writer, format), nil
}
func (o *TextOutput) GetOutputChannel() chan []byte {
return o.outputChannel
}
func (o *TextOutput) RunOutputLoop() {
dt := &Dnstap{}
for frame := range o.outputChannel {
if err := proto.Unmarshal(frame, dt); err != nil {
log.Fatalf("dnstap.TextOutput: proto.Unmarshal() failed: %s\n", err)
break
}
buf, ok := o.format(dt)
if !ok {
log.Fatalf("dnstap.TextOutput: text format function failed\n")
break
}
if _, err := o.writer.Write(buf); err != nil {
log.Fatalf("dnstap.TextOutput: write failed: %s\n", err)
break
}
o.writer.Flush()
}
close(o.wait)
}
func (o *TextOutput) Close() {
close(o.outputChannel)
<-o.wait
o.writer.Flush()
}
/*
* Copyright (c) 2013-2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dnstap
import (
"bytes"
"fmt"
"net"
"strconv"
"strings"
"time"
"github.com/miekg/dns"
)
const yamlTimeFormat = "2006-01-02 15:04:05.999999999"
func yamlConvertMessage(m *Message, s *bytes.Buffer) {
s.WriteString(fmt.Sprint(" type: ", m.Type, "\n"))
if m.QueryTimeSec != nil && m.QueryTimeNsec != nil {
t := time.Unix(int64(*m.QueryTimeSec), int64(*m.QueryTimeNsec)).UTC()
s.WriteString(fmt.Sprint(" query_time: !!timestamp ", t.Format(yamlTimeFormat), "\n"))
}
if m.ResponseTimeSec != nil && m.ResponseTimeNsec != nil {
t := time.Unix(int64(*m.ResponseTimeSec), int64(*m.ResponseTimeNsec)).UTC()
s.WriteString(fmt.Sprint(" response_time: !!timestamp ", t.Format(yamlTimeFormat), "\n"))
}
if m.SocketFamily != nil {
s.WriteString(fmt.Sprint(" socket_family: ", m.SocketFamily, "\n"))
}
if m.SocketProtocol != nil {
s.WriteString(fmt.Sprint(" socket_protocol: ", m.SocketProtocol, "\n"))
}
if m.QueryAddress != nil {
s.WriteString(fmt.Sprint(" query_address: ", net.IP(m.QueryAddress), "\n"))
}
if m.ResponseAddress != nil {
s.WriteString(fmt.Sprint(" response_address: ", net.IP(m.ResponseAddress), "\n"))
}
if m.QueryPort != nil {
s.WriteString(fmt.Sprint(" query_port: ", *m.QueryPort, "\n"))
}
if m.ResponsePort != nil {
s.WriteString(fmt.Sprint(" response_port: ", *m.ResponsePort, "\n"))
}
if m.QueryZone != nil {
name, _, err := dns.UnpackDomainName(m.QueryZone, 0)
if err != nil {
s.WriteString(" # query_zone: parse failed\n")
} else {
s.WriteString(fmt.Sprint(" query_zone: ", strconv.Quote(name), "\n"))
}
}
if m.QueryMessage != nil {
msg := new(dns.Msg)
err := msg.Unpack(m.QueryMessage)
if err != nil {
s.WriteString(" # query_message: parse failed\n")
} else {
s.WriteString(" query_message: |\n")
s.WriteString(" " + strings.Replace(strings.TrimSpace(msg.String()), "\n", "\n ", -1) + "\n")
}
}
if m.ResponseMessage != nil {
msg := new(dns.Msg)
err := msg.Unpack(m.ResponseMessage)
if err != nil {
s.WriteString(fmt.Sprint(" # response_message: parse failed: ", err, "\n"))
} else {
s.WriteString(" response_message: |\n")
s.WriteString(" " + strings.Replace(strings.TrimSpace(msg.String()), "\n", "\n ", -1) + "\n")
}
}
s.WriteString("---\n")
}
func YamlFormat(dt *Dnstap) (out []byte, ok bool) {
var s bytes.Buffer
s.WriteString(fmt.Sprint("type: ", dt.Type, "\n"))
if dt.Identity != nil {
s.WriteString(fmt.Sprint("identity: ", strconv.Quote(string(dt.Identity)), "\n"))
}
if dt.Version != nil {
s.WriteString(fmt.Sprint("version: ", strconv.Quote(string(dt.Version)), "\n"))
}
if *dt.Type == Dnstap_MESSAGE {
s.WriteString("message:\n")
yamlConvertMessage(dt.Message, &s)
}
return s.Bytes(), true
}
/*
* Copyright (c) 2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dnstap
const outputChannelSize = 32
var FSContentType = []byte("protobuf:dnstap.Dnstap")
type Input interface {
ReadInto(chan []byte)
Wait()
}
type Output interface {
GetOutputChannel() chan []byte
RunOutputLoop()
Close()
}
This diff is collapsed.
.deps/
.dirstamp
.libs/
*.pb-c.c
*.pb-c.h
*.pb.cc
*.pb.h
*.pb.go
*_pb2.py
*_pb2.pyc
Creative Commons Legal Code
CC0 1.0 Universal
CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
HEREUNDER.
Statement of Purpose
The laws of most jurisdictions throughout the world automatically confer
exclusive Copyright and Related Rights (defined below) upon the creator
and subsequent owner(s) (each and all, an "owner") of an original work of
authorship and/or a database (each, a "Work").
Certain owners wish to permanently relinquish those rights to a Work for
the purpose of contributing to a commons of creative, cultural and
scientific works ("Commons") that the public can reliably and without fear
of later claims of infringement build upon, modify, incorporate in other
works, reuse and redistribute as freely as possible in any form whatsoever
and for any purposes, including without limitation commercial purposes.
These owners may contribute to the Commons to promote the ideal of a free
culture and the further production of creative, cultural and scientific
works, or to gain reputation or greater distribution for their Work in
part through the use and efforts of others.
For these and/or other purposes and motivations, and without any
expectation of additional consideration or compensation, the person
associating CC0 with a Work (the "Affirmer"), to the extent that he or she
is an owner of Copyright and Related Rights in the Work, voluntarily
elects to apply CC0 to the Work and publicly distribute the Work under its
terms, with knowledge of his or her Copyright and Related Rights in the
Work and the meaning and intended legal effect of CC0 on those rights.
1. Copyright and Related Rights. A Work made available under CC0 may be
protected by copyright and related or neighboring rights ("Copyright and
Related Rights"). Copyright and Related Rights include, but are not
limited to, the following:
i. the right to reproduce, adapt, distribute, perform, display,
communicate, and translate a Work;
ii. moral rights retained by the original author(s) and/or performer(s);
iii. publicity and privacy rights pertaining to a person's image or
likeness depicted in a Work;
iv. rights protecting against unfair competition in regards to a Work,
subject to the limitations in paragraph 4(a), below;
v. rights protecting the extraction, dissemination, use and reuse of data
in a Work;
vi. database rights (such as those arising under Directive 96/9/EC of the
European Parliament and of the Council of 11 March 1996 on the legal
protection of databases, and under any national implementation
thereof, including any amended or successor version of such
directive); and
vii. other similar, equivalent or corresponding rights throughout the
world based on applicable law or treaty, and any national
implementations thereof.
2. Waiver. To the greatest extent permitted by, but not in contravention
of, applicable law, Affirmer hereby overtly, fully, permanently,
irrevocably and unconditionally waives, abandons, and surrenders all of
Affirmer's Copyright and Related Rights and associated claims and causes
of action, whether now known or unknown (including existing as well as
future claims and causes of action), in the Work (i) in all territories
worldwide, (ii) for the maximum duration provided by applicable law or
treaty (including future time extensions), (iii) in any current or future
medium and for any number of copies, and (iv) for any purpose whatsoever,
including without limitation commercial, advertising or promotional
purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
member of the public at large and to the detriment of Affirmer's heirs and
successors, fully intending that such Waiver shall not be subject to
revocation, rescission, cancellation, termination, or any other legal or
equitable action to disrupt the quiet enjoyment of the Work by the public
as contemplated by Affirmer's express Statement of Purpose.
3. Public License Fallback. Should any part of the Waiver for any reason
be judged legally invalid or ineffective under applicable law, then the
Waiver shall be preserved to the maximum extent permitted taking into
account Affirmer's express Statement of Purpose. In addition, to the
extent the Waiver is so judged Affirmer hereby grants to each affected
person a royalty-free, non transferable, non sublicensable, non exclusive,
irrevocable and unconditional license to exercise Affirmer's Copyright and
Related Rights in the Work (i) in all territories worldwide, (ii) for the
maximum duration provided by applicable law or treaty (including future
time extensions), (iii) in any current or future medium and for any number
of copies, and (iv) for any purpose whatsoever, including without
limitation commercial, advertising or promotional purposes (the
"License"). The License shall be deemed effective as of the date CC0 was
applied by Affirmer to the Work. Should any part of the License for any
reason be judged legally invalid or ineffective under applicable law, such
partial invalidity or ineffectiveness shall not invalidate the remainder
of the License, and in such case Affirmer hereby affirms that he or she
will not (i) exercise any of his or her remaining Copyright and Related
Rights in the Work or (ii) assert any associated claims and causes of
action with respect to the Work, in either case contrary to Affirmer's
express Statement of Purpose.
4. Limitations and Disclaimers.
a. No trademark or patent rights held by Affirmer are waived, abandoned,
surrendered, licensed or otherwise affected by this document.
b. Affirmer offers the Work as-is and makes no representations or
warranties of any kind concerning the Work, express, implied,
statutory or otherwise, including without limitation warranties of
title, merchantability, fitness for a particular purpose, non
infringement, or the absence of latent or other defects, accuracy, or
the present or absence of errors, whether or not discoverable, all to
the greatest extent permissible under applicable law.
c. Affirmer disclaims responsibility for clearing rights of other persons
that may apply to the Work or any use thereof, including without
limitation any person's Copyright and Related Rights in the Work.
Further, Affirmer disclaims responsibility for obtaining any necessary
consents, permissions or other rights required for any use of the
Work.
d. Affirmer understands and acknowledges that Creative Commons is not a
party to this document and has no duty or obligation with respect to
this CC0 or use of the Work.
dnstap: flexible, structured event replication format for DNS software
----------------------------------------------------------------------
This directory contains only the protobuf schemas for dnstap, and is the root of
a repository named "dnstap.pb".
This diff is collapsed.
/*
* Copyright (c) 2013-2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"runtime"
"syscall"
"github.com/dnstap/golang-dnstap"
)
var (
flagReadFile = flag.String("r", "", "read dnstap payloads from file")
flagReadSock = flag.String("u", "", "read dnstap payloads from unix socket")
flagWriteFile = flag.String("w", "-", "write output to file")
flagQuietText = flag.Bool("q", false, "use quiet text output")
flagYamlText = flag.Bool("y", false, "use verbose YAML output")
)
func usage() {
fmt.Fprintf(os.Stderr, "Usage: %s [OPTION]...\n", os.Args[0])
flag.PrintDefaults()
fmt.Fprintf(os.Stderr, `
Quiet text output format mnemonics:
AQ: AUTH_QUERY
AR: AUTH_RESPONSE
RQ: RESOLVER_QUERY
RR: RESOLVER_RESPONSE
CQ: CLIENT_QUERY
CR: CLIENT_RESPONSE
FQ: FORWARDER_QUERY
FR: FORWARDER_RESPONSE
SQ: STUB_QUERY
SR: STUB_RESPONSE
TQ: TOOL_QUERY
TR: TOOL_RESPONSE
`)
}
func outputOpener(fname string, text, yaml bool) func() dnstap.Output {
return func() dnstap.Output {
var o dnstap.Output
var err error
if text {
o, err = dnstap.NewTextOutputFromFilename(fname, dnstap.TextFormat)
} else if yaml {
o, err = dnstap.NewTextOutputFromFilename(fname, dnstap.YamlFormat)
} else {
o, err = dnstap.NewFrameStreamOutputFromFilename(fname)
}
if err != nil {
fmt.Fprintf(os.Stderr, "dnstap: Failed to open output file: %s\n", err)
os.Exit(1)
}
go o.RunOutputLoop()
return o
}
}
func outputLoop(opener func() dnstap.Output, data <-chan []byte, done chan<- struct{}) {
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, os.Interrupt, syscall.SIGHUP)
o := opener()
defer func() {
o.Close()
close(done)
os.Exit(0)
}()
for {
select {
case b, ok := <-data:
if !ok {
return
}
o.GetOutputChannel() <- b
case sig := <-sigch:
if sig == syscall.SIGHUP {
o.Close()
o = opener()
continue
}
return
}
}
}
func main() {
var err error
var i dnstap.Input
runtime.GOMAXPROCS(runtime.NumCPU())
log.SetFlags(0)
flag.Usage = usage
// Handle command-line arguments.
flag.Parse()
if *flagReadFile == "" && *flagReadSock == "" {
fmt.Fprintf(os.Stderr, "dnstap: Error: no inputs specified.\n")
os.Exit(1)
}
if *flagWriteFile == "-" {
if *flagQuietText == false && *flagYamlText == false {
*flagQuietText = true
}
}
if *flagReadFile != "" && *flagReadSock != "" {
fmt.Fprintf(os.Stderr, "dnstap: Error: specify exactly one of -r or -u.\n")
os.Exit(1)
}
// Start the output loop.
output := make(chan []byte, 1)
opener := outputOpener(*flagWriteFile, *flagQuietText, *flagYamlText)
outDone := make(chan struct{})
go outputLoop(opener, output, outDone)
// Open the input and start the input loop.
if *flagReadFile != "" {
i, err = dnstap.NewFrameStreamInputFromFilename(*flagReadFile)
if err != nil {
fmt.Fprintf(os.Stderr, "dnstap: Failed to open input file: %s\n", err)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "dnstap: opened input file %s\n", *flagReadFile)
} else if *flagReadSock != "" {
i, err = dnstap.NewFrameStreamSockInputFromPath(*flagReadSock)
if err != nil {
fmt.Fprintf(os.Stderr, "dnstap: Failed to open input socket: %s\n", err)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "dnstap: opened input socket %s\n", *flagReadSock)
}
i.ReadInto(output)
// Wait for input loop to finish.
i.Wait()
close(output)
<-outDone
}
Copyright (c) 2014 by Farsight Security, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
package framestream
import (
"bufio"
"bytes"
"encoding/binary"
"io"
)
const CONTROL_ACCEPT = 0x01
const CONTROL_START = 0x02
const CONTROL_STOP = 0x03
const CONTROL_READY = 0x04
const CONTROL_FINISH = 0x05
const CONTROL_FIELD_CONTENT_TYPE = 0x01
type ControlFrame struct {
ControlType uint32
ContentTypes [][]byte
}
var ControlStart = ControlFrame{ControlType: CONTROL_START}
var ControlStop = ControlFrame{ControlType: CONTROL_STOP}
var ControlReady = ControlFrame{ControlType: CONTROL_READY}
var ControlAccept = ControlFrame{ControlType: CONTROL_ACCEPT}
var ControlFinish = ControlFrame{ControlType: CONTROL_FINISH}
func (c *ControlFrame) Encode(w io.Writer) (err error) {
var buf bytes.Buffer
err = binary.Write(&buf, binary.BigEndian, c.ControlType)
if err != nil {
return
}
for _, ctype := range c.ContentTypes {
err = binary.Write(&buf, binary.BigEndian, uint32(CONTROL_FIELD_CONTENT_TYPE))
if err != nil {
return
}
err = binary.Write(&buf, binary.BigEndian, uint32(len(ctype)))
if err != nil {
return
}
_, err = buf.Write(ctype)
if err != nil {
return
}
}
err = binary.Write(w, binary.BigEndian, uint32(0))
if err != nil {
return
}
err = binary.Write(w, binary.BigEndian, uint32(buf.Len()))
if err != nil {
return
}
_, err = buf.WriteTo(w)
return
}
func (c *ControlFrame) EncodeFlush(w *bufio.Writer) error {
if err := c.Encode(w); err != nil {
return err
}
return w.Flush()
}
func (c *ControlFrame) Decode(r io.Reader) (err error) {
var cflen uint32
err = binary.Read(r, binary.BigEndian, &cflen)
if err != nil {
return
}
err = binary.Read(r, binary.BigEndian, &c.ControlType)
if err != nil {
return
}
cflen -= 4
if cflen > 0 {
cfields := make([]byte, int(cflen))
_, err = io.ReadFull(r, cfields)
if err != nil {
return
}
for len(cfields) > 8 {
cftype := binary.BigEndian.Uint32(cfields[:4])
cfields = cfields[4:]
if cftype != CONTROL_FIELD_CONTENT_TYPE {
return ErrDecode
}
cflen := int(binary.BigEndian.Uint32(cfields[:4]))
cfields = cfields[4:]
if cflen > len(cfields) {
return ErrDecode
}
c.ContentTypes = append(c.ContentTypes, cfields[:cflen])
cfields = cfields[cflen:]
}
if len(cfields) > 0 {
return ErrDecode
}
}
return
}
func (c *ControlFrame) DecodeEscape(r io.Reader) error {
var zero uint32
err := binary.Read(r, binary.BigEndian, &zero)
if err != nil {
return err
}
if zero != 0 {
return ErrDecode
}
return c.Decode(r)
}
func (c *ControlFrame) DecodeTypeEscape(r io.Reader, ctype uint32) error {
err := c.DecodeEscape(r)
if err != nil {
return err
}
if ctype != c.ControlType {
return ErrDecode
}
return nil
}
func (c *ControlFrame) MatchContentType(ctype []byte) bool {
if ctype == nil {
return true
}
for _, cfctype := range c.ContentTypes {
if bytes.Compare(ctype, cfctype) == 0 {
return true
}
}
return false
}
func (c *ControlFrame) SetContentType(ctype []byte) {
if ctype != nil {
c.ContentTypes = [][]byte{ctype}
}
}
/*
* Copyright (c) 2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package framestream
import (
"bufio"
"encoding/binary"
"io"
)
type DecoderOptions struct {
MaxPayloadSize uint32
ContentType []byte
Bidirectional bool
}
type Decoder struct {
buf []byte
opt DecoderOptions
reader *bufio.Reader
writer *bufio.Writer
stopped bool
}
func NewDecoder(r io.Reader, opt *DecoderOptions) (dec *Decoder, err error) {
if opt == nil {
opt = &DecoderOptions{}
}
if opt.MaxPayloadSize == 0 {
opt.MaxPayloadSize = DEFAULT_MAX_PAYLOAD_SIZE
}
dec = &Decoder{
buf: make([]byte, opt.MaxPayloadSize),
opt: *opt,
reader: bufio.NewReader(r),
writer: nil,
}
var cf ControlFrame
if opt.Bidirectional {
w, ok := r.(io.Writer)
if !ok {
return dec, ErrType
}
dec.writer = bufio.NewWriter(w)
// Read the ready control frame.
err = cf.DecodeTypeEscape(dec.reader, CONTROL_READY)
if err != nil {
return
}
// Check content type.
if !cf.MatchContentType(dec.opt.ContentType) {
return dec, ErrContentTypeMismatch
}
// Send the accept control frame.
accept := ControlAccept
accept.SetContentType(dec.opt.ContentType)
err = accept.EncodeFlush(dec.writer)
if err != nil {
return
}
}
// Read the start control frame.
err = cf.DecodeTypeEscape(dec.reader, CONTROL_START)
if err != nil {
return
}
// Check content type.
if !cf.MatchContentType(dec.opt.ContentType) {
return dec, ErrContentTypeMismatch
}
return
}
func (dec *Decoder) readFrame(frameLen uint32) (err error) {
// Enforce limits on frame size.
if frameLen > dec.opt.MaxPayloadSize {
err = ErrDataFrameTooLarge
return
}
// Read the frame.
n, err := io.ReadFull(dec.reader, dec.buf[0:frameLen])
if err != nil || uint32(n) != frameLen {
return
}
return
}
func (dec *Decoder) Decode() (frameData []byte, err error) {
if dec.stopped {
err = EOF
return
}
// Read the frame length.
var frameLen uint32
err = binary.Read(dec.reader, binary.BigEndian, &frameLen)
if err != nil {
return
}
if frameLen == 0 {
// This is a control frame.
var cf ControlFrame
err = cf.Decode(dec.reader)
if err != nil {
return
}
if cf.ControlType == CONTROL_STOP {
dec.stopped = true
if dec.opt.Bidirectional {
ff := &ControlFrame{ControlType: CONTROL_FINISH}
err = ff.EncodeFlush(dec.writer)
if err != nil {
return
}
}
return nil, EOF
}
if err != nil {
return nil, err
}
} else {
// This is a data frame.
err = dec.readFrame(frameLen)
if err != nil {
return
}
frameData = dec.buf[0:frameLen]
}
return
}
/*
* Copyright (c) 2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package framestream
import (
"bufio"
"encoding/binary"
"io"
)
type EncoderOptions struct {
ContentType []byte
Bidirectional bool
}
type Encoder struct {
writer *bufio.Writer
reader *bufio.Reader
opt EncoderOptions
buf []byte
}
func NewEncoder(w io.Writer, opt *EncoderOptions) (enc *Encoder, err error) {
if opt == nil {
opt = &EncoderOptions{}
}
enc = &Encoder{
writer: bufio.NewWriter(w),
opt: *opt,
}
if opt.Bidirectional {
r, ok := w.(io.Reader)
if !ok {
return nil, ErrType
}
enc.reader = bufio.NewReader(r)
ready := ControlReady
ready.SetContentType(opt.ContentType)
if err = ready.EncodeFlush(enc.writer); err != nil {
return
}
var accept ControlFrame
if err = accept.DecodeTypeEscape(enc.reader, CONTROL_ACCEPT); err != nil {
return
}
if !accept.MatchContentType(opt.ContentType) {
return nil, ErrContentTypeMismatch
}
}
// Write the start control frame.
start := ControlStart
start.SetContentType(opt.ContentType)
err = start.Encode(enc.writer)
if err != nil {
return
}
return
}
func (enc *Encoder) Close() (err error) {
err = ControlStop.EncodeFlush(enc.writer)
if err != nil || !enc.opt.Bidirectional {
return
}
var finish ControlFrame
return finish.DecodeTypeEscape(enc.reader, CONTROL_FINISH)
}
func (enc *Encoder) Write(frame []byte) (n int, err error) {
err = binary.Write(enc.writer, binary.BigEndian, uint32(len(frame)))
if err != nil {
return
}
return enc.writer.Write(frame)
}
func (enc *Encoder) Flush() error {
return enc.writer.Flush()
}
This diff is collapsed.
/*
* Copyright (c) 2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package framestream
import (
"errors"
"io"
)
const DEFAULT_MAX_PAYLOAD_SIZE = 1048576
const MAX_CONTROL_FRAME_SIZE = 512
var EOF = io.EOF
var ErrContentTypeMismatch = errors.New("content type mismatch")
var ErrDataFrameTooLarge = errors.New("data frame too large")
var ErrShortRead = errors.New("short read")
var ErrDecode = errors.New("decoding error")
var ErrType = errors.New("invalid type")
/*
* Copyright (c) 2014 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"log"
"os"
"github.com/farsightsec/golang-framestream"
)
func main() {
// Arguments.
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s <INPUT FILE>\n", os.Args[0])
fmt.Fprintf(os.Stderr, "Dumps a FrameStreams formatted input file.\n\n")
os.Exit(1)
}
fname := os.Args[1]
// Open the input file.
file, err := os.Open(fname)
if err != nil {
log.Fatal(err)
}
// Create the decoder.
fs, err := framestream.NewDecoder(file, nil)
if err != nil {
log.Fatal(err)
}
// Print the frames.
fmt.Printf("Control frame [START] (%v bytes): %x\n", len(fs.ControlStart), fs.ControlStart)
for {
frame, err := fs.Decode()
if err == framestream.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("Data frame (%v bytes): %x\n", len(frame), frame)
}
if fs.ControlStop != nil {
fmt.Printf("Control frame [STOP] (%v bytes): %x\n", len(fs.ControlStop), fs.ControlStop)
}
}
package framestream_test
import (
"bytes"
"net"
"testing"
framestream "github.com/farsightsec/golang-framestream"
)
func testDecoder(t *testing.T, dec *framestream.Decoder, nframes int) {
i := 1
for {
tf, err := dec.Decode()
if err != nil {
if i < nframes+1 {
t.Fatalf("testDecoder(%d): %v", i, err)
}
if err != framestream.EOF {
t.Fatalf("unexpected error: %v != EOF", err)
}
return
}
if i > nframes {
t.Errorf("extra frame received: %d", i)
}
f := make([]byte, i)
if bytes.Compare(tf, f) != 0 {
t.Errorf("testDecoder: received %v != %v", tf, f)
}
i++
}
}
func TestUnidirectional(t *testing.T) {
buf := new(bytes.Buffer)
enc, err := framestream.NewEncoder(buf, nil)
if err != nil {
t.Fatal(err)
}
for i := 1; i < 10; i++ {
b := make([]byte, i)
if _, err = enc.Write(b); err != nil {
t.Error(err)
}
}
enc.Close()
dec, err := framestream.NewDecoder(buf, nil)
if err != nil {
t.Fatal(err)
}
testDecoder(t, dec, 9)
}
func TestBidirectional(t *testing.T) {
client, server := net.Pipe()
go func() {
dec, err := framestream.NewDecoder(server,
&framestream.DecoderOptions{
Bidirectional: true,
})
if err != nil {
t.Fatal(err)
}
testDecoder(t, dec, 9)
}()
enc, err := framestream.NewEncoder(client,
&framestream.EncoderOptions{
Bidirectional: true,
})
if err != nil {
t.Fatal(err)
}
for i := 1; i < 10; i++ {
b := make([]byte, i)
if _, err := enc.Write(b); err != nil {
t.Error(err)
}
}
enc.Close()
}
func TestContentTypeMismatch(t *testing.T) {
buf := new(bytes.Buffer)
enc, err := framestream.NewEncoder(buf,
&framestream.EncoderOptions{
ContentType: []byte("test"),
})
if err != nil {
t.Fatal(err)
}
enc.Write([]byte("hello, world"))
enc.Close()
_, err = framestream.NewDecoder(buf,
&framestream.DecoderOptions{
ContentType: []byte("wrong"),
})
if err != framestream.ErrContentTypeMismatch {
t.Error("expected %v, received %v",
framestream.ErrContentTypeMismatch,
err)
}
}
func TestOversizeFrame(t *testing.T) {
buf := new(bytes.Buffer)
enc, err := framestream.NewEncoder(buf, nil)
if err != nil {
t.Fatal(err)
}
enc.Write(make([]byte, 15))
enc.Close()
dec, err := framestream.NewDecoder(buf,
&framestream.DecoderOptions{
MaxPayloadSize: 10,
})
if err != nil {
t.Fatal(err)
}
_, err = dec.Decode()
if err != framestream.ErrDataFrameTooLarge {
t.Error("data frame too large, received %v", err)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment