Commit 9018451d authored by Yong Tang's avatar Yong Tang Committed by Miek Gieben

Update Gopkg.toml to remove the constraint on zipkin-go-opentracing (#1231)

* Update vendor directory for latest changes.
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>

* Update Gopkg.toml to remove the constraint on zipkin-go-opentracing

As the issue on zipkin-go-opentracing has been fixed. See #1193
for details.
Signed-off-by: default avatarYong Tang <yong.tang.github@outlook.com>
parent 7c7a233b
......@@ -16,19 +16,20 @@
[[projects]]
name = "github.com/Shopify/sarama"
packages = ["."]
revision = "bbdbe644099b7fdc8327d5cc69c030945188b2e9"
version = "v1.13.0"
revision = "240fd146ce68bcafb034cc5dc977229ffbafa8ea"
version = "v1.14.0"
[[projects]]
branch = "master"
name = "github.com/apache/thrift"
packages = ["lib/go/thrift"]
revision = "4f77ab8e296d64c57e6ea1c6e3f0f152bc7d6a3a"
revision = "95d5fb3a1e38125b9eabcbe9cda1a6c7bbe3e93d"
[[projects]]
name = "github.com/asaskevich/govalidator"
packages = ["."]
revision = "73945b6115bfbbcc57d89b7316e28109364124e1"
version = "v7"
revision = "521b25f4b05fd26bec69d9dedeb8f9c9a83939a8"
version = "v8"
[[projects]]
branch = "master"
......@@ -160,7 +161,7 @@
branch = "master"
name = "github.com/go-openapi/swag"
packages = ["."]
revision = "f3f9494671f93fcff853e3c6e9e948b3eb71e590"
revision = "cf0bdb963811675a4d7e74901cefc7411a1df939"
[[projects]]
name = "github.com/gogo/protobuf"
......@@ -262,6 +263,7 @@
name = "github.com/openzipkin/zipkin-go-opentracing"
packages = [".","flag","thrift/gen-go/scribe","thrift/gen-go/zipkincore","types","wire"]
revision = "45e90b00710a4c34a1a7d8a78d90f9b010b0bd4d"
version = "v0.3.2"
[[projects]]
name = "github.com/pierrec/lz4"
......@@ -326,7 +328,7 @@
branch = "master"
name = "golang.org/x/sys"
packages = ["unix","windows"]
revision = "75813c647272dd855bda156405bf844a5414f5bf"
revision = "1e2299c37cc91a509f1b12369872d27be0ce98a6"
[[projects]]
branch = "master"
......@@ -378,6 +380,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "c7279ef091bb11a42d1421f51e53d761113ea23d9e9b993823605883da0f80ff"
inputs-digest = "be9300a30414c93aa44756868a7906a0a295b0910a662880741bcfac58b7b679"
solver-name = "gps-cdcl"
solver-version = 1
......@@ -13,12 +13,9 @@ ignored = [
"golang.org/x/net/trace",
]
[[constraint]]
name = "github.com/openzipkin/zipkin-go-opentracing"
revision = "45e90b00710a4c34a1a7d8a78d90f9b010b0bd4d"
[[override]]
name = "github.com/apache/thrift"
revision = "4f77ab8e296d64c57e6ea1c6e3f0f152bc7d6a3a"
branch = "master"
[[override]]
name = "github.com/ugorji/go"
......
......@@ -22,3 +22,5 @@ _cgo_export.*
_testmain.go
*.exe
coverage.txt
......@@ -31,4 +31,7 @@ script:
- make errcheck
- make fmt
after_success:
- bash <(curl -s https://codecov.io/bash)
sudo: false
# Changelog
#### Version 1.14.0 (2017-11-13)
New Features:
- Add support for the new Kafka 0.11 record-batch format, including the wire
protocol and the necessary behavioural changes in the producer and consumer.
Transactions and idempotency are not yet supported, but producing and
consuming should work with all the existing bells and whistles (batching,
compression, etc) as well as the new custom headers. Thanks to Vlad Hanciuta
of Arista Networks for this work. Part of
([#901](https://github.com/Shopify/sarama/issues/901)).
Bug Fixes:
- Fix encoding of ProduceResponse versions in test
([#970](https://github.com/Shopify/sarama/pull/970)).
- Return partial replicas list when we have it
([#975](https://github.com/Shopify/sarama/pull/975)).
#### Version 1.13.0 (2017-10-04)
New Features:
......
default: fmt vet errcheck test
# Taken from https://github.com/codecov/example-go#caveat-multiple-files
test:
go test -v -timeout 60s -race ./...
echo "" > coverage.txt
for d in `go list ./... | grep -v vendor`; do \
go test -v -timeout 60s -race -coverprofile=profile.out -covermode=atomic $$d; \
if [ -f profile.out ]; then \
cat profile.out >> coverage.txt; \
rm profile.out; \
fi \
done
vet:
go vet ./...
......
......@@ -3,6 +3,7 @@ sarama
[![GoDoc](https://godoc.org/github.com/Shopify/sarama?status.png)](https://godoc.org/github.com/Shopify/sarama)
[![Build Status](https://travis-ci.org/Shopify/sarama.svg?branch=master)](https://travis-ci.org/Shopify/sarama)
[![Coverage](https://codecov.io/gh/Shopify/sarama/branch/master/graph/badge.svg)](https://codecov.io/gh/Shopify/sarama)
Sarama is an MIT-licensed Go client library for [Apache Kafka](https://kafka.apache.org/) version 0.8 (and later).
......
package sarama
import (
"encoding/binary"
"fmt"
"sync"
"time"
......@@ -119,6 +120,10 @@ type ProducerMessage struct {
// StringEncoder and ByteEncoder.
Value Encoder
// The headers are key-value pairs that are transparently passed
// by Kafka between producers and consumers.
Headers []RecordHeader
// This field is used to hold arbitrary data you wish to include so it
// will be available when receiving on the Successes and Errors channels.
// Sarama completely ignores this field and is only to be used for
......@@ -146,8 +151,16 @@ type ProducerMessage struct {
const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
func (m *ProducerMessage) byteSize() int {
size := producerMessageOverhead
func (m *ProducerMessage) byteSize(version int) int {
var size int
if version >= 2 {
size = maximumRecordOverhead
for _, h := range m.Headers {
size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
}
} else {
size = producerMessageOverhead
}
if m.Key != nil {
size += m.Key.Length()
}
......@@ -254,7 +267,11 @@ func (p *asyncProducer) dispatcher() {
p.inFlight.Add(1)
}
if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
version := 1
if p.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
}
if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
p.returnError(msg, ErrMessageSizeTooLarge)
continue
}
......
......@@ -49,9 +49,9 @@ type Client interface {
RefreshMetadata(topics ...string) error
// GetOffset queries the cluster to get the most recent available offset at the
// given time on the topic/partition combination. Time should be OffsetOldest for
// the earliest available offset, OffsetNewest for the offset of the message that
// will be produced next, or a time.
// given time (in milliseconds) on the topic/partition combination.
// Time should be OffsetOldest for the earliest available offset,
// OffsetNewest for the offset of the message that will be produced next, or a time.
GetOffset(topic string, partitionID int32, time int64) (int64, error)
// Coordinator returns the coordinating broker for a consumer group. It will
......@@ -297,7 +297,7 @@ func (client *client) Replicas(topic string, partitionID int32) ([]int32, error)
}
if metadata.Err == ErrReplicaNotAvailable {
return nil, metadata.Err
return dupInt32Slice(metadata.Replicas), metadata.Err
}
return dupInt32Slice(metadata.Replicas), nil
}
......@@ -322,7 +322,7 @@ func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32,
}
if metadata.Err == ErrReplicaNotAvailable {
return nil, metadata.Err
return dupInt32Slice(metadata.Isr), metadata.Err
}
return dupInt32Slice(metadata.Isr), nil
}
......
......@@ -33,6 +33,169 @@ func TestEmptyClientIDConfigValidates(t *testing.T) {
}
}
func TestNetConfigValidates(t *testing.T) {
tests := []struct {
name string
cfg func(*Config) // resorting to using a function as a param because of internal composite structs
err string
}{
{
"OpenRequests",
func(cfg *Config) {
cfg.Net.MaxOpenRequests = 0
},
"Net.MaxOpenRequests must be > 0"},
{"DialTimeout",
func(cfg *Config) {
cfg.Net.DialTimeout = 0
},
"Net.DialTimeout must be > 0"},
{"ReadTimeout",
func(cfg *Config) {
cfg.Net.ReadTimeout = 0
},
"Net.ReadTimeout must be > 0"},
{"WriteTimeout",
func(cfg *Config) {
cfg.Net.WriteTimeout = 0
},
"Net.WriteTimeout must be > 0"},
{"KeepAlive",
func(cfg *Config) {
cfg.Net.KeepAlive = -1
},
"Net.KeepAlive must be >= 0"},
{"SASL.User",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = ""
},
"Net.SASL.User must not be empty when SASL is enabled"},
{"SASL.Password",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = "user"
cfg.Net.SASL.Password = ""
},
"Net.SASL.Password must not be empty when SASL is enabled"},
}
for i, test := range tests {
c := NewConfig()
test.cfg(c)
if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
}
}
}
func TestMetadataConfigValidates(t *testing.T) {
tests := []struct {
name string
cfg func(*Config) // resorting to using a function as a param because of internal composite structs
err string
}{
{
"Retry.Max",
func(cfg *Config) {
cfg.Metadata.Retry.Max = -1
},
"Metadata.Retry.Max must be >= 0"},
{"Retry.Backoff",
func(cfg *Config) {
cfg.Metadata.Retry.Backoff = -1
},
"Metadata.Retry.Backoff must be >= 0"},
{"RefreshFrequency",
func(cfg *Config) {
cfg.Metadata.RefreshFrequency = -1
},
"Metadata.RefreshFrequency must be >= 0"},
}
for i, test := range tests {
c := NewConfig()
test.cfg(c)
if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
}
}
}
func TestProducerConfigValidates(t *testing.T) {
tests := []struct {
name string
cfg func(*Config) // resorting to using a function as a param because of internal composite structs
err string
}{
{
"MaxMessageBytes",
func(cfg *Config) {
cfg.Producer.MaxMessageBytes = 0
},
"Producer.MaxMessageBytes must be > 0"},
{"RequiredAcks",
func(cfg *Config) {
cfg.Producer.RequiredAcks = -2
},
"Producer.RequiredAcks must be >= -1"},
{"Timeout",
func(cfg *Config) {
cfg.Producer.Timeout = 0
},
"Producer.Timeout must be > 0"},
{"Partitioner",
func(cfg *Config) {
cfg.Producer.Partitioner = nil
},
"Producer.Partitioner must not be nil"},
{"Flush.Bytes",
func(cfg *Config) {
cfg.Producer.Flush.Bytes = -1
},
"Producer.Flush.Bytes must be >= 0"},
{"Flush.Messages",
func(cfg *Config) {
cfg.Producer.Flush.Messages = -1
},
"Producer.Flush.Messages must be >= 0"},
{"Flush.Frequency",
func(cfg *Config) {
cfg.Producer.Flush.Frequency = -1
},
"Producer.Flush.Frequency must be >= 0"},
{"Flush.MaxMessages",
func(cfg *Config) {
cfg.Producer.Flush.MaxMessages = -1
},
"Producer.Flush.MaxMessages must be >= 0"},
{"Flush.MaxMessages with Producer.Flush.Messages",
func(cfg *Config) {
cfg.Producer.Flush.MaxMessages = 1
cfg.Producer.Flush.Messages = 2
},
"Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
{"Flush.Retry.Max",
func(cfg *Config) {
cfg.Producer.Retry.Max = -1
},
"Producer.Retry.Max must be >= 0"},
{"Flush.Retry.Backoff",
func(cfg *Config) {
cfg.Producer.Retry.Backoff = -1
},
"Producer.Retry.Backoff must be >= 0"},
}
for i, test := range tests {
c := NewConfig()
test.cfg(c)
if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
}
}
}
func TestLZ4ConfigValidation(t *testing.T) {
config := NewConfig()
config.Producer.Compression = CompressionLZ4
......
......@@ -14,8 +14,9 @@ type ConsumerMessage struct {
Topic string
Partition int32
Offset int64
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
Headers []*RecordHeader // only set if kafka is version 0.11+
}
// ConsumerError is what is provided to the user when an error occurs.
......@@ -478,44 +479,12 @@ feederLoop:
close(child.errors)
}
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return nil, ErrIncompleteResponse
}
if block.Err != ErrNoError {
return nil, block.Err
}
if len(block.MsgSet.Messages) == 0 {
// We got no messages. If we got a trailing one then we need to ask for more data.
// Otherwise we just poll again and wait for one to be produced...
if block.MsgSet.PartialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageTooLarge)
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
}
}
return nil, nil
}
// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
incomplete := false
prelude := true
func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
var messages []*ConsumerMessage
for _, msgBlock := range block.MsgSet.Messages {
var incomplete bool
prelude := true
for _, msgBlock := range msgSet.Messages {
for _, msg := range msgBlock.Messages() {
offset := msg.Offset
if msg.Msg.Version >= 1 {
......@@ -542,7 +511,46 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
incomplete = true
}
}
}
if incomplete || len(messages) == 0 {
return nil, ErrIncompleteResponse
}
return messages, nil
}
func (child *partitionConsumer) parseRecords(block *FetchResponseBlock) ([]*ConsumerMessage, error) {
var messages []*ConsumerMessage
var incomplete bool
prelude := true
batch := block.Records.recordBatch
for _, rec := range batch.Records {
offset := batch.FirstOffset + rec.OffsetDelta
if prelude && offset < child.offset {
continue
}
prelude = false
if offset >= child.offset {
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: rec.Key,
Value: rec.Value,
Offset: offset,
Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
Headers: rec.Headers,
})
child.offset = offset + 1
} else {
incomplete = true
}
if child.offset > block.LastStableOffset {
// We reached the end of closed transactions
break
}
}
if incomplete || len(messages) == 0 {
......@@ -551,6 +559,57 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
return messages, nil
}
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return nil, ErrIncompleteResponse
}
if block.Err != ErrNoError {
return nil, block.Err
}
nRecs, err := block.Records.numRecords()
if err != nil {
return nil, err
}
if nRecs == 0 {
partialTrailingMessage, err := block.Records.isPartial()
if err != nil {
return nil, err
}
// We got no messages. If we got a trailing one then we need to ask for more data.
// Otherwise we just poll again and wait for one to be produced...
if partialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageTooLarge)
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
}
}
return nil, nil
}
// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
if control, err := block.Records.isControl(); err != nil || control {
return nil, err
}
if response.Version < 4 {
return child.parseMessages(block.Records.msgSet)
}
return child.parseRecords(block)
}
// brokerConsumer
type brokerConsumer struct {
......@@ -740,6 +799,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
request.Version = 3
request.MaxBytes = MaxResponseSize
}
if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 4
request.Isolation = ReadUncommitted // We don't support yet transactions.
}
for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
......
......@@ -379,86 +379,118 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) {
// requested, then such messages are ignored.
func TestConsumerExtraOffsets(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)
fetchResponse1 := &FetchResponse{}
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 4)
fetchResponse2 := &FetchResponse{}
fetchResponse2.AddError("my_topic", 0, ErrNoError)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
})
legacyFetchResponse := &FetchResponse{}
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
newFetchResponse := &FetchResponse{Version: 4}
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
var offsetResponseVersion int16
cfg := NewConfig()
if fetchResponse1.Version >= 4 {
cfg.Version = V0_11_0_0
offsetResponseVersion = 1
}
master, err := NewConsumer([]string{broker0.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
broker0 := NewMockBroker(t, 0)
fetchResponse2 := &FetchResponse{}
fetchResponse2.Version = fetchResponse1.Version
fetchResponse2.AddError("my_topic", 0, ErrNoError)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(offsetResponseVersion).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
})
master, err := NewConsumer([]string{broker0.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
// When
consumer, err := master.ConsumePartition("my_topic", 0, 3)
if err != nil {
t.Fatal(err)
}
// When
consumer, err := master.ConsumePartition("my_topic", 0, 3)
if err != nil {
t.Fatal(err)
}
// Then: messages with offsets 1 and 2 are not returned even though they
// are present in the response.
assertMessageOffset(t, <-consumer.Messages(), 3)
assertMessageOffset(t, <-consumer.Messages(), 4)
// Then: messages with offsets 1 and 2 are not returned even though they
// are present in the response.
assertMessageOffset(t, <-consumer.Messages(), 3)
assertMessageOffset(t, <-consumer.Messages(), 4)
safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}
}
// It is fine if offsets of fetched messages are not sequential (although
// strictly increasing!).
func TestConsumerNonSequentialOffsets(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)
fetchResponse1 := &FetchResponse{}
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 5)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 7)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 11)
fetchResponse2 := &FetchResponse{}
fetchResponse2.AddError("my_topic", 0, ErrNoError)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
})
legacyFetchResponse := &FetchResponse{}
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
newFetchResponse := &FetchResponse{Version: 4}
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
var offsetResponseVersion int16
cfg := NewConfig()
if fetchResponse1.Version >= 4 {
cfg.Version = V0_11_0_0
offsetResponseVersion = 1
}
master, err := NewConsumer([]string{broker0.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
broker0 := NewMockBroker(t, 0)
fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
fetchResponse2.AddError("my_topic", 0, ErrNoError)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(offsetResponseVersion).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
})
master, err := NewConsumer([]string{broker0.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
// When
consumer, err := master.ConsumePartition("my_topic", 0, 3)
if err != nil {
t.Fatal(err)
}
// When
consumer, err := master.ConsumePartition("my_topic", 0, 3)
if err != nil {
t.Fatal(err)
}
// Then: messages with offsets 1 and 2 are not returned even though they
// are present in the response.
assertMessageOffset(t, <-consumer.Messages(), 5)
assertMessageOffset(t, <-consumer.Messages(), 7)
assertMessageOffset(t, <-consumer.Messages(), 11)
// Then: messages with offsets 1 and 2 are not returned even though they
// are present in the response.
assertMessageOffset(t, <-consumer.Messages(), 5)
assertMessageOffset(t, <-consumer.Messages(), 7)
assertMessageOffset(t, <-consumer.Messages(), 11)
safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}
}
// If leadership for a partition is changing then consumer resolves the new
......
......@@ -6,9 +6,19 @@ import (
"hash/crc32"
)
type crcPolynomial int8
const (
crcIEEE crcPolynomial = iota
crcCastagnoli
)
var castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
// crc32Field implements the pushEncoder and pushDecoder interfaces for calculating CRC32s.
type crc32Field struct {
startOffset int
polynomial crcPolynomial
}
func (c *crc32Field) saveOffset(in int) {
......@@ -19,14 +29,24 @@ func (c *crc32Field) reserveLength() int {
return 4
}
func newCRC32Field(polynomial crcPolynomial) *crc32Field {
return &crc32Field{polynomial: polynomial}
}
func (c *crc32Field) run(curOffset int, buf []byte) error {
crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
crc, err := c.crc(curOffset, buf)
if err != nil {
return err
}
binary.BigEndian.PutUint32(buf[c.startOffset:], crc)
return nil
}
func (c *crc32Field) check(curOffset int, buf []byte) error {
crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
crc, err := c.crc(curOffset, buf)
if err != nil {
return err
}
expected := binary.BigEndian.Uint32(buf[c.startOffset:])
if crc != expected {
......@@ -35,3 +55,15 @@ func (c *crc32Field) check(curOffset int, buf []byte) error {
return nil
}
func (c *crc32Field) crc(curOffset int, buf []byte) (uint32, error) {
var tab *crc32.Table
switch c.polynomial {
case crcIEEE:
tab = crc32.IEEETable
case crcCastagnoli:
tab = castagnoliTable
default:
return 0, PacketDecodingError{"invalid CRC type"}
}
return crc32.Checksum(buf[c.startOffset+4:curOffset], tab), nil
}
......@@ -29,16 +29,27 @@ type FetchRequest struct {
MinBytes int32
MaxBytes int32
Version int16
Isolation IsolationLevel
blocks map[string]map[int32]*fetchRequestBlock
}
type IsolationLevel int8
const (
ReadUncommitted IsolationLevel = 0
ReadCommitted IsolationLevel = 1
)
func (r *FetchRequest) encode(pe packetEncoder) (err error) {
pe.putInt32(-1) // replica ID is always -1 for clients
pe.putInt32(r.MaxWaitTime)
pe.putInt32(r.MinBytes)
if r.Version == 3 {
if r.Version >= 3 {
pe.putInt32(r.MaxBytes)
}
if r.Version >= 4 {
pe.putInt8(int8(r.Isolation))
}
err = pe.putArrayLength(len(r.blocks))
if err != nil {
return err
......@@ -74,11 +85,18 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
if r.MinBytes, err = pd.getInt32(); err != nil {
return err
}
if r.Version == 3 {
if r.Version >= 3 {
if r.MaxBytes, err = pd.getInt32(); err != nil {
return err
}
}
if r.Version >= 4 {
isolation, err := pd.getInt8()
if err != nil {
return err
}
r.Isolation = IsolationLevel(isolation)
}
topicCount, err := pd.getArrayLength()
if err != nil {
return err
......@@ -128,6 +146,8 @@ func (r *FetchRequest) requiredVersion() KafkaVersion {
return V0_10_0_0
case 3:
return V0_10_1_0
case 4:
return V0_11_0_0
default:
return minVersion
}
......
......@@ -17,6 +17,15 @@ var (
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56}
fetchRequestOneBlockV4 = []byte{
0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0xFF,
0x01,
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56}
)
func TestFetchRequest(t *testing.T) {
......@@ -31,4 +40,9 @@ func TestFetchRequest(t *testing.T) {
request.MinBytes = 0
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block", request, fetchRequestOneBlock)
request.Version = 4
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
}
......@@ -2,13 +2,39 @@ package sarama
import "time"
type AbortedTransaction struct {
ProducerID int64
FirstOffset int64
}
func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
if t.ProducerID, err = pd.getInt64(); err != nil {
return err
}
if t.FirstOffset, err = pd.getInt64(); err != nil {
return err
}
return nil
}
func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
pe.putInt64(t.ProducerID)
pe.putInt64(t.FirstOffset)
return nil
}
type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
MsgSet MessageSet
LastStableOffset int64
AbortedTransactions []*AbortedTransaction
Records Records
}
func (b *FetchResponseBlock) decode(pd packetDecoder) (err error) {
func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
tmp, err := pd.getInt16()
if err != nil {
return err
......@@ -20,27 +46,75 @@ func (b *FetchResponseBlock) decode(pd packetDecoder) (err error) {
return err
}
msgSetSize, err := pd.getInt32()
if version >= 4 {
b.LastStableOffset, err = pd.getInt64()
if err != nil {
return err
}
numTransact, err := pd.getArrayLength()
if err != nil {
return err
}
if numTransact >= 0 {
b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
}
for i := 0; i < numTransact; i++ {
transact := new(AbortedTransaction)
if err = transact.decode(pd); err != nil {
return err
}
b.AbortedTransactions[i] = transact
}
}
recordsSize, err := pd.getInt32()
if err != nil {
return err
}
msgSetDecoder, err := pd.getSubset(int(msgSetSize))
recordsDecoder, err := pd.getSubset(int(recordsSize))
if err != nil {
return err
}
err = (&b.MsgSet).decode(msgSetDecoder)
var records Records
if version >= 4 {
records = newDefaultRecords(nil)
} else {
records = newLegacyRecords(nil)
}
if recordsSize > 0 {
if err = records.decode(recordsDecoder); err != nil {
return err
}
}
b.Records = records
return err
return nil
}
func (b *FetchResponseBlock) encode(pe packetEncoder) (err error) {
func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(int16(b.Err))
pe.putInt64(b.HighWaterMarkOffset)
if version >= 4 {
pe.putInt64(b.LastStableOffset)
if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
return err
}
for _, transact := range b.AbortedTransactions {
if err = transact.encode(pe); err != nil {
return err
}
}
}
pe.push(&lengthField{})
err = b.MsgSet.encode(pe)
err = b.Records.encode(pe)
if err != nil {
return err
}
......@@ -90,7 +164,7 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
}
block := new(FetchResponseBlock)
err = block.decode(pd)
err = block.decode(pd, version)
if err != nil {
return err
}
......@@ -124,7 +198,7 @@ func (r *FetchResponse) encode(pe packetEncoder) (err error) {
for id, block := range partitions {
pe.putInt32(id)
err = block.encode(pe)
err = block.encode(pe, r.Version)
if err != nil {
return err
}
......@@ -148,6 +222,10 @@ func (r *FetchResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
case 4:
return V0_11_0_0
default:
return minVersion
}
......@@ -182,7 +260,7 @@ func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
frb.Err = err
}
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
if r.Blocks == nil {
r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
}
......@@ -196,6 +274,11 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
frb = new(FetchResponseBlock)
partitions[partition] = frb
}
return frb
}
func encodeKV(key, value Encoder) ([]byte, []byte) {
var kb []byte
var vb []byte
if key != nil {
......@@ -204,7 +287,36 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
if value != nil {
vb, _ = value.Encode()
}
return kb, vb
}
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
frb := r.getOrCreateBlock(topic, partition)
kb, vb := encodeKV(key, value)
msg := &Message{Key: kb, Value: vb}
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
frb.MsgSet.Messages = append(frb.MsgSet.Messages, msgBlock)
set := frb.Records.msgSet
if set == nil {
set = &MessageSet{}
frb.Records = newLegacyRecords(set)
}
set.Messages = append(set.Messages, msgBlock)
}
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
frb := r.getOrCreateBlock(topic, partition)
kb, vb := encodeKV(key, value)
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
batch := frb.Records.recordBatch
if batch == nil {
batch = &RecordBatch{Version: 2}
frb.Records = newDefaultRecords(batch)
}
batch.addRecord(rec)
}
func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
frb := r.getOrCreateBlock(topic, partition)
frb.LastStableOffset = offset
}
......@@ -26,6 +26,43 @@ var (
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
oneRecordFetchResponse = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x00, 0x00, 0x01, // Number of Topics
0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
0x00, 0x00, 0x00, 0x01, // Number of Partitions
0x00, 0x00, 0x00, 0x05, // Partition
0x00, 0x01, // Error
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
0x00, 0x00, 0x00, 0x52, // Records length
// recordBatch
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x46,
0x00, 0x00, 0x00, 0x00,
0x02,
0xDB, 0x47, 0x14, 0xC9,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
// record
0x28,
0x00,
0x0A,
0x00,
0x08, 0x01, 0x02, 0x03, 0x04,
0x06, 0x05, 0x06, 0x07,
0x02,
0x06, 0x08, 0x09, 0x0A,
0x04, 0x0B, 0x0C,
}
)
func TestEmptyFetchResponse(t *testing.T) {
......@@ -60,14 +97,22 @@ func TestOneMessageFetchResponse(t *testing.T) {
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
if block.MsgSet.PartialTrailingMessage {
partial, err := block.Records.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding detected a partial trailing message where there wasn't one.")
}
if len(block.MsgSet.Messages) != 1 {
n, err := block.Records.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 1 {
t.Fatal("Decoding produced incorrect number of messages.")
}
msgBlock := block.MsgSet.Messages[0]
msgBlock := block.Records.msgSet.Messages[0]
if msgBlock.Offset != 0x550000 {
t.Error("Decoding produced incorrect message offset.")
}
......@@ -82,3 +127,49 @@ func TestOneMessageFetchResponse(t *testing.T) {
t.Error("Decoding produced incorrect message value.")
}
}
func TestOneRecordFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
}
if len(response.Blocks["topic"]) != 1 {
t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
}
block := response.GetBlock("topic", 5)
if block == nil {
t.Fatal("GetBlock didn't return block.")
}
if block.Err != ErrOffsetOutOfRange {
t.Error("Decoding didn't produce correct error code.")
}
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
partial, err := block.Records.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding detected a partial trailing record where there wasn't one.")
}
n, err := block.Records.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 1 {
t.Fatal("Decoding produced incorrect number of records.")
}
rec := block.Records.recordBatch.Records[0]
if !bytes.Equal(rec.Key, []byte{0x01, 0x02, 0x03, 0x04}) {
t.Error("Decoding produced incorrect record key.")
}
if !bytes.Equal(rec.Value, []byte{0x05, 0x06, 0x07}) {
t.Error("Decoding produced incorrect record value.")
}
}
......@@ -27,3 +27,43 @@ func (l *lengthField) check(curOffset int, buf []byte) error {
return nil
}
type varintLengthField struct {
startOffset int
length int64
}
func (l *varintLengthField) decode(pd packetDecoder) error {
var err error
l.length, err = pd.getVarint()
return err
}
func (l *varintLengthField) saveOffset(in int) {
l.startOffset = in
}
func (l *varintLengthField) adjustLength(currOffset int) int {
oldFieldSize := l.reserveLength()
l.length = int64(currOffset - l.startOffset - oldFieldSize)
return l.reserveLength() - oldFieldSize
}
func (l *varintLengthField) reserveLength() int {
var tmp [binary.MaxVarintLen64]byte
return binary.PutVarint(tmp[:], l.length)
}
func (l *varintLengthField) run(curOffset int, buf []byte) error {
binary.PutVarint(buf[l.startOffset:], l.length)
return nil
}
func (l *varintLengthField) check(curOffset int, buf []byte) error {
if int64(curOffset-l.startOffset-l.reserveLength()) != l.length {
return PacketDecodingError{"length field invalid"}
}
return nil
}
......@@ -37,7 +37,7 @@ type Message struct {
}
func (m *Message) encode(pe packetEncoder) error {
pe.push(&crc32Field{})
pe.push(newCRC32Field(crcIEEE))
pe.putInt8(m.Version)
......@@ -45,15 +45,9 @@ func (m *Message) encode(pe packetEncoder) error {
pe.putInt8(attributes)
if m.Version >= 1 {
timestamp := int64(-1)
if !m.Timestamp.Before(time.Unix(0, 0)) {
timestamp = m.Timestamp.UnixNano() / int64(time.Millisecond)
} else if !m.Timestamp.IsZero() {
return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", m.Timestamp)}
if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil {
return err
}
pe.putInt64(timestamp)
}
err := pe.putBytes(m.Key)
......@@ -112,7 +106,7 @@ func (m *Message) encode(pe packetEncoder) error {
}
func (m *Message) decode(pd packetDecoder) (err error) {
err = pd.push(&crc32Field{})
err = pd.push(newCRC32Field(crcIEEE))
if err != nil {
return err
}
......@@ -133,19 +127,9 @@ func (m *Message) decode(pd packetDecoder) (err error) {
m.Codec = CompressionCodec(attribute & compressionCodecMask)
if m.Version == 1 {
millis, err := pd.getInt64()
if err != nil {
if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
return err
}
// negative timestamps are invalid, in these cases we should return
// a zero time
timestamp := time.Time{}
if millis >= 0 {
timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
}
m.Timestamp = timestamp
}
m.Key, err = pd.getBytes()
......
......@@ -122,6 +122,7 @@ func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
type MockOffsetResponse struct {
offsets map[string]map[int32]map[int64]int64
t TestReporter
version int16
}
func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
......@@ -131,6 +132,11 @@ func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
}
}
func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
mor.version = version
return mor
}
func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
partitions := mor.offsets[topic]
if partitions == nil {
......@@ -148,7 +154,7 @@ func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, of
func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
offsetRequest := reqBody.(*OffsetRequest)
offsetResponse := &OffsetResponse{}
offsetResponse := &OffsetResponse{Version: mor.version}
for topic, partitions := range offsetRequest.blocks {
for partition, block := range partitions {
offset := mor.getOffset(topic, partition, block.time)
......@@ -402,7 +408,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE
func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*ProduceRequest)
res := &ProduceResponse{}
for topic, partitions := range req.msgSets {
for topic, partitions := range req.records {
for partition := range partitions {
res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
}
......
......@@ -9,11 +9,15 @@ type packetDecoder interface {
getInt16() (int16, error)
getInt32() (int32, error)
getInt64() (int64, error)
getVarint() (int64, error)
getArrayLength() (int, error)
// Collections
getBytes() ([]byte, error)
getVarintBytes() ([]byte, error)
getRawBytes(length int) ([]byte, error)
getString() (string, error)
getNullableString() (*string, error)
getInt32Array() ([]int32, error)
getInt64Array() ([]int64, error)
getStringArray() ([]string, error)
......@@ -43,3 +47,12 @@ type pushDecoder interface {
// of data from the saved offset, and verify it based on the data between the saved offset and curOffset.
check(curOffset int, buf []byte) error
}
// dynamicPushDecoder extends the interface of pushDecoder for uses cases where the length of the
// fields itself is unknown until its value was decoded (for instance varint encoded length
// fields).
// During push, dynamicPushDecoder.decode() method will be called instead of reserveLength()
type dynamicPushDecoder interface {
pushDecoder
decoder
}
......@@ -11,12 +11,15 @@ type packetEncoder interface {
putInt16(in int16)
putInt32(in int32)
putInt64(in int64)
putVarint(in int64)
putArrayLength(in int) error
// Collections
putBytes(in []byte) error
putVarintBytes(in []byte) error
putRawBytes(in []byte) error
putString(in string) error
putNullableString(in *string) error
putStringArray(in []string) error
putInt32Array(in []int32) error
putInt64Array(in []int64) error
......@@ -48,3 +51,14 @@ type pushEncoder interface {
// of data to the saved offset, based on the data between the saved offset and curOffset.
run(curOffset int, buf []byte) error
}
// dynamicPushEncoder extends the interface of pushEncoder for uses cases where the length of the
// fields itself is unknown until its value was computed (for instance varint encoded length
// fields).
type dynamicPushEncoder interface {
pushEncoder
// Called during pop() to adjust the length of the field.
// It should return the difference in bytes between the last computed length and current length.
adjustLength(currOffset int) int
}
package sarama
import (
"encoding/binary"
"fmt"
"math"
......@@ -8,6 +9,7 @@ import (
)
type prepEncoder struct {
stack []pushEncoder
length int
}
......@@ -29,6 +31,11 @@ func (pe *prepEncoder) putInt64(in int64) {
pe.length += 8
}
func (pe *prepEncoder) putVarint(in int64) {
var buf [binary.MaxVarintLen64]byte
pe.length += binary.PutVarint(buf[:], in)
}
func (pe *prepEncoder) putArrayLength(in int) error {
if in > math.MaxInt32 {
return PacketEncodingError{fmt.Sprintf("array too long (%d)", in)}
......@@ -44,11 +51,16 @@ func (pe *prepEncoder) putBytes(in []byte) error {
if in == nil {
return nil
}
if len(in) > math.MaxInt32 {
return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))}
return pe.putRawBytes(in)
}
func (pe *prepEncoder) putVarintBytes(in []byte) error {
if in == nil {
pe.putVarint(-1)
return nil
}
pe.length += len(in)
return nil
pe.putVarint(int64(len(in)))
return pe.putRawBytes(in)
}
func (pe *prepEncoder) putRawBytes(in []byte) error {
......@@ -59,6 +71,14 @@ func (pe *prepEncoder) putRawBytes(in []byte) error {
return nil
}
func (pe *prepEncoder) putNullableString(in *string) error {
if in == nil {
pe.length += 2
return nil
}
return pe.putString(*in)
}
func (pe *prepEncoder) putString(in string) error {
pe.length += 2
if len(in) > math.MaxInt16 {
......@@ -108,10 +128,18 @@ func (pe *prepEncoder) offset() int {
// stackable
func (pe *prepEncoder) push(in pushEncoder) {
in.saveOffset(pe.length)
pe.length += in.reserveLength()
pe.stack = append(pe.stack, in)
}
func (pe *prepEncoder) pop() error {
in := pe.stack[len(pe.stack)-1]
pe.stack = pe.stack[:len(pe.stack)-1]
if dpe, ok := in.(dynamicPushEncoder); ok {
pe.length += dpe.adjustLength(pe.length)
}
return nil
}
......
......@@ -21,19 +21,56 @@ const (
)
type ProduceRequest struct {
RequiredAcks RequiredAcks
Timeout int32
Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10
msgSets map[string]map[int32]*MessageSet
TransactionalID *string
RequiredAcks RequiredAcks
Timeout int32
Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
records map[string]map[int32]Records
}
func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram,
topicCompressionRatioMetric metrics.Histogram) int64 {
var topicRecordCount int64
for _, messageBlock := range msgSet.Messages {
// Is this a fake "message" wrapping real messages?
if messageBlock.Msg.Set != nil {
topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
} else {
// A single uncompressed message
topicRecordCount++
}
// Better be safe than sorry when computing the compression ratio
if messageBlock.Msg.compressedSize != 0 {
compressionRatio := float64(len(messageBlock.Msg.Value)) /
float64(messageBlock.Msg.compressedSize)
// Histogram do not support decimal values, let's multiple it by 100 for better precision
intCompressionRatio := int64(100 * compressionRatio)
compressionRatioMetric.Update(intCompressionRatio)
topicCompressionRatioMetric.Update(intCompressionRatio)
}
}
return topicRecordCount
}
func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram,
topicCompressionRatioMetric metrics.Histogram) int64 {
if recordBatch.compressedRecords != nil {
compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100)
compressionRatioMetric.Update(compressionRatio)
topicCompressionRatioMetric.Update(compressionRatio)
}
return int64(len(recordBatch.Records))
}
func (r *ProduceRequest) encode(pe packetEncoder) error {
if r.Version >= 3 {
if err := pe.putNullableString(r.TransactionalID); err != nil {
return err
}
}
pe.putInt16(int16(r.RequiredAcks))
pe.putInt32(r.Timeout)
err := pe.putArrayLength(len(r.msgSets))
if err != nil {
return err
}
metricRegistry := pe.metricRegistry()
var batchSizeMetric metrics.Histogram
var compressionRatioMetric metrics.Histogram
......@@ -41,9 +78,14 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
}
totalRecordCount := int64(0)
for topic, partitions := range r.msgSets {
err := pe.putArrayLength(len(r.records))
if err != nil {
return err
}
for topic, partitions := range r.records {
err = pe.putString(topic)
if err != nil {
return err
......@@ -57,11 +99,11 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
if metricRegistry != nil {
topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
}
for id, msgSet := range partitions {
for id, records := range partitions {
startOffset := pe.offset()
pe.putInt32(id)
pe.push(&lengthField{})
err = msgSet.encode(pe)
err = records.encode(pe)
if err != nil {
return err
}
......@@ -70,23 +112,10 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
return err
}
if metricRegistry != nil {
for _, messageBlock := range msgSet.Messages {
// Is this a fake "message" wrapping real messages?
if messageBlock.Msg.Set != nil {
topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
} else {
// A single uncompressed message
topicRecordCount++
}
// Better be safe than sorry when computing the compression ratio
if messageBlock.Msg.compressedSize != 0 {
compressionRatio := float64(len(messageBlock.Msg.Value)) /
float64(messageBlock.Msg.compressedSize)
// Histogram do not support decimal values, let's multiple it by 100 for better precision
intCompressionRatio := int64(100 * compressionRatio)
compressionRatioMetric.Update(intCompressionRatio)
topicCompressionRatioMetric.Update(intCompressionRatio)
}
if r.Version >= 3 {
topicRecordCount += updateBatchMetrics(records.recordBatch, compressionRatioMetric, topicCompressionRatioMetric)
} else {
topicRecordCount += updateMsgSetMetrics(records.msgSet, compressionRatioMetric, topicCompressionRatioMetric)
}
batchSize := int64(pe.offset() - startOffset)
batchSizeMetric.Update(batchSize)
......@@ -108,6 +137,15 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
}
func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
r.Version = version
if version >= 3 {
id, err := pd.getNullableString()
if err != nil {
return err
}
r.TransactionalID = id
}
requiredAcks, err := pd.getInt16()
if err != nil {
return err
......@@ -123,7 +161,8 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
if topicCount == 0 {
return nil
}
r.msgSets = make(map[string]map[int32]*MessageSet)
r.records = make(map[string]map[int32]Records)
for i := 0; i < topicCount; i++ {
topic, err := pd.getString()
if err != nil {
......@@ -133,28 +172,34 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
if err != nil {
return err
}
r.msgSets[topic] = make(map[int32]*MessageSet)
r.records[topic] = make(map[int32]Records)
for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
messageSetSize, err := pd.getInt32()
size, err := pd.getInt32()
if err != nil {
return err
}
msgSetDecoder, err := pd.getSubset(int(messageSetSize))
recordsDecoder, err := pd.getSubset(int(size))
if err != nil {
return err
}
msgSet := &MessageSet{}
err = msgSet.decode(msgSetDecoder)
if err != nil {
var records Records
if version >= 3 {
records = newDefaultRecords(nil)
} else {
records = newLegacyRecords(nil)
}
if err := records.decode(recordsDecoder); err != nil {
return err
}
r.msgSets[topic][partition] = msgSet
r.records[topic][partition] = records
}
}
return nil
}
......@@ -172,38 +217,41 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion {
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_11_0_0
default:
return minVersion
}
}
func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
if r.msgSets == nil {
r.msgSets = make(map[string]map[int32]*MessageSet)
func (r *ProduceRequest) ensureRecords(topic string, partition int32) {
if r.records == nil {
r.records = make(map[string]map[int32]Records)
}
if r.msgSets[topic] == nil {
r.msgSets[topic] = make(map[int32]*MessageSet)
if r.records[topic] == nil {
r.records[topic] = make(map[int32]Records)
}
}
set := r.msgSets[topic][partition]
func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
r.ensureRecords(topic, partition)
set := r.records[topic][partition].msgSet
if set == nil {
set = new(MessageSet)
r.msgSets[topic][partition] = set
r.records[topic][partition] = newLegacyRecords(set)
}
set.addMessage(msg)
}
func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
if r.msgSets == nil {
r.msgSets = make(map[string]map[int32]*MessageSet)
}
if r.msgSets[topic] == nil {
r.msgSets[topic] = make(map[int32]*MessageSet)
}
r.ensureRecords(topic, partition)
r.records[topic][partition] = newLegacyRecords(set)
}
r.msgSets[topic][partition] = set
func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) {
r.ensureRecords(topic, partition)
r.records[topic][partition] = newDefaultRecords(batch)
}
......@@ -2,6 +2,7 @@ package sarama
import (
"testing"
"time"
)
var (
......@@ -32,6 +33,41 @@ var (
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
produceRequestOneRecord = []byte{
0xFF, 0xFF, // Transaction ID
0x01, 0x23, // Required Acks
0x00, 0x00, 0x04, 0x44, // Timeout
0x00, 0x00, 0x00, 0x01, // Number of Topics
0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
0x00, 0x00, 0x00, 0x01, // Number of Partitions
0x00, 0x00, 0x00, 0xAD, // Partition
0x00, 0x00, 0x00, 0x52, // Records length
// recordBatch
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x46,
0x00, 0x00, 0x00, 0x00,
0x02,
0x54, 0x79, 0x61, 0xFD,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
// record
0x28,
0x00,
0x0A,
0x00,
0x08, 0x01, 0x02, 0x03, 0x04,
0x06, 0x05, 0x06, 0x07,
0x02,
0x06, 0x08, 0x09, 0x0A,
0x04, 0x0B, 0x0C,
}
)
func TestProduceRequest(t *testing.T) {
......@@ -44,4 +80,24 @@ func TestProduceRequest(t *testing.T) {
request.AddMessage("topic", 0xAD, &Message{Codec: CompressionNone, Key: nil, Value: []byte{0x00, 0xEE}})
testRequest(t, "one message", request, produceRequestOneMessage)
request.Version = 3
batch := &RecordBatch{
Version: 2,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5 * time.Millisecond,
Key: []byte{0x01, 0x02, 0x03, 0x04},
Value: []byte{0x05, 0x06, 0x07},
Headers: []*RecordHeader{{
Key: []byte{0x08, 0x09, 0x0A},
Value: []byte{0x0B, 0x0C},
}},
}},
}
request.AddBatch("topic", 0xAD, batch)
packet := testRequestEncode(t, "one record", request, produceRequestOneRecord)
batch.Records[0].length.startOffset = 0
testRequestDecode(t, "one record", request, packet)
}
package sarama
import "time"
import (
"fmt"
"time"
)
type ProduceResponseBlock struct {
Err KError
......@@ -32,6 +35,23 @@ func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err erro
return nil
}
func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(int16(b.Err))
pe.putInt64(b.Offset)
if version >= 2 {
timestamp := int64(-1)
if !b.Timestamp.Before(time.Unix(0, 0)) {
timestamp = b.Timestamp.UnixNano() / int64(time.Millisecond)
} else if !b.Timestamp.IsZero() {
return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", b.Timestamp)}
}
pe.putInt64(timestamp)
}
return nil
}
type ProduceResponse struct {
Blocks map[string]map[int32]*ProduceResponseBlock
Version int16
......@@ -103,8 +123,10 @@ func (r *ProduceResponse) encode(pe packetEncoder) error {
}
for id, prb := range partitions {
pe.putInt32(id)
pe.putInt16(int16(prb.Err))
pe.putInt64(prb.Offset)
err = prb.encode(pe, r.Version)
if err != nil {
return err
}
}
}
if r.Version >= 1 {
......@@ -127,6 +149,8 @@ func (r *ProduceResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_11_0_0
default:
return minVersion
}
......
package sarama
import "testing"
import (
"fmt"
"testing"
"time"
)
var (
produceResponseNoBlocks = []byte{
produceResponseNoBlocksV0 = []byte{
0x00, 0x00, 0x00, 0x00}
produceResponseManyBlocks = []byte{
0x00, 0x00, 0x00, 0x02,
produceResponseManyBlocksVersions = [][]byte{
{
0x00, 0x00, 0x00, 0x01,
0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
}, {
0x00, 0x00, 0x00, 0x01,
0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x00,
0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x01,
0x00, 0x03, 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF,
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
}, {
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x02,
0x00, 0x02,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
},
}
)
func TestProduceResponse(t *testing.T) {
func TestProduceResponseDecode(t *testing.T) {
response := ProduceResponse{}
testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocks, 0)
testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocksV0, 0)
if len(response.Blocks) != 0 {
t.Error("Decoding produced", len(response.Blocks), "topics where there were none")
}
testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, 0)
if len(response.Blocks) != 2 {
t.Error("Decoding produced", len(response.Blocks), "topics where there were 2")
}
if len(response.Blocks["foo"]) != 0 {
t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there were none")
}
if len(response.Blocks["bar"]) != 2 {
t.Error("Decoding produced", len(response.Blocks["bar"]), "partitions for 'bar' where there were two")
}
block := response.GetBlock("bar", 1)
if block == nil {
t.Error("Decoding did not produce a block for bar/1")
} else {
if block.Err != ErrNoError {
t.Error("Decoding failed for bar/1/Err, got:", int16(block.Err))
for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
t.Logf("Decoding produceResponseManyBlocks version %d", v)
testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, int16(v))
if len(response.Blocks) != 1 {
t.Error("Decoding produced", len(response.Blocks), "topics where there was 1")
}
if block.Offset != 0xFF {
t.Error("Decoding failed for bar/1/Offset, got:", block.Offset)
if len(response.Blocks["foo"]) != 1 {
t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there was one")
}
}
block = response.GetBlock("bar", 2)
if block == nil {
t.Error("Decoding did not produce a block for bar/2")
} else {
if block.Err != ErrInvalidMessage {
t.Error("Decoding failed for bar/2/Err, got:", int16(block.Err))
block := response.GetBlock("foo", 1)
if block == nil {
t.Error("Decoding did not produce a block for foo/1")
} else {
if block.Err != ErrInvalidMessage {
t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err))
}
if block.Offset != 255 {
t.Error("Decoding failed for foo/1/Offset, got:", block.Offset)
}
if v >= 2 {
if block.Timestamp != time.Unix(1, 0) {
t.Error("Decoding failed for foo/2/Timestamp, got:", block.Timestamp)
}
}
}
if block.Offset != 0 {
t.Error("Decoding failed for bar/2/Offset, got:", block.Offset)
if v >= 1 {
if expected := 100 * time.Millisecond; response.ThrottleTime != expected {
t.Error("Failed decoding produced throttle time, expected:", expected, ", got:", response.ThrottleTime)
}
}
}
}
func TestProduceResponseEncode(t *testing.T) {
response := ProduceResponse{}
response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
testEncodable(t, "empty", &response, produceResponseNoBlocksV0)
response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
response.Blocks["foo"][1] = &ProduceResponseBlock{
Err: ErrInvalidMessage,
Offset: 255,
Timestamp: time.Unix(1, 0),
}
response.ThrottleTime = 100 * time.Millisecond
for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
response.Version = int16(v)
testEncodable(t, fmt.Sprintf("many blocks version %d", v), &response, produceResponseManyBlocks)
}
}
func TestProduceResponseEncodeInvalidTimestamp(t *testing.T) {
response := ProduceResponse{}
response.Version = 2
response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
response.Blocks["t"] = make(map[int32]*ProduceResponseBlock)
response.Blocks["t"][0] = &ProduceResponseBlock{
Err: ErrNoError,
Offset: 0,
// Use a timestamp before Unix time
Timestamp: time.Unix(0, 0).Add(-1 * time.Millisecond),
}
response.ThrottleTime = 100 * time.Millisecond
_, err := encode(&response, nil)
if err == nil {
t.Error("Expecting error, got nil")
}
if _, ok := err.(PacketEncodingError); !ok {
t.Error("Expecting PacketEncodingError, got:", err)
}
}
package sarama
import "time"
import (
"encoding/binary"
"time"
)
type partitionSet struct {
msgs []*ProducerMessage
setToSend *MessageSet
bufferBytes int
msgs []*ProducerMessage
recordsToSend Records
bufferBytes int
}
type produceSet struct {
......@@ -39,31 +42,64 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
}
}
timestamp := msg.Timestamp
if msg.Timestamp.IsZero() {
timestamp = time.Now()
}
partitions := ps.msgs[msg.Topic]
if partitions == nil {
partitions = make(map[int32]*partitionSet)
ps.msgs[msg.Topic] = partitions
}
var size int
set := partitions[msg.Partition]
if set == nil {
set = &partitionSet{setToSend: new(MessageSet)}
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
batch := &RecordBatch{
FirstTimestamp: timestamp,
Version: 2,
ProducerID: -1, /* No producer id */
Codec: ps.parent.conf.Producer.Compression,
}
set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
size = recordBatchOverhead
} else {
set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
}
partitions[msg.Partition] = set
}
set.msgs = append(set.msgs, msg)
msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
if msg.Timestamp.IsZero() {
msgToSend.Timestamp = time.Now()
} else {
msgToSend.Timestamp = msg.Timestamp
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
// We are being conservative here to avoid having to prep encode the record
size += maximumRecordOverhead
rec := &Record{
Key: key,
Value: val,
TimestampDelta: timestamp.Sub(set.recordsToSend.recordBatch.FirstTimestamp),
}
size += len(key) + len(val)
if len(msg.Headers) > 0 {
rec.Headers = make([]*RecordHeader, len(msg.Headers))
for i, h := range msg.Headers {
rec.Headers[i] = &h
size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
}
}
set.recordsToSend.recordBatch.addRecord(rec)
} else {
msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
msgToSend.Timestamp = timestamp
msgToSend.Version = 1
}
msgToSend.Version = 1
set.recordsToSend.msgSet.addMessage(msgToSend)
size = producerMessageOverhead + len(key) + len(val)
}
set.setToSend.addMessage(msgToSend)
size := producerMessageOverhead + len(key) + len(val)
set.bufferBytes += size
ps.bufferBytes += size
ps.bufferCount++
......@@ -79,17 +115,24 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
req.Version = 2
}
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
req.Version = 3
}
for topic, partitionSet := range ps.msgs {
for partition, set := range partitionSet {
if req.Version >= 3 {
req.AddBatch(topic, partition, set.recordsToSend.recordBatch)
continue
}
if ps.parent.conf.Producer.Compression == CompressionNone {
req.AddSet(topic, partition, set.setToSend)
req.AddSet(topic, partition, set.recordsToSend.msgSet)
} else {
// When compression is enabled, the entire set for each partition is compressed
// and sent as the payload of a single fake "message" with the appropriate codec
// set and no key. When the server sees a message with a compression codec, it
// decompresses the payload and treats the result as its message set.
payload, err := encode(set.setToSend, ps.parent.conf.MetricRegistry)
payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
......@@ -98,11 +141,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
Codec: ps.parent.conf.Producer.Compression,
Key: nil,
Value: payload,
Set: set.setToSend, // Provide the underlying message set for accurate metrics
Set: set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics
}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
compMsg.Version = 1
compMsg.Timestamp = set.setToSend.Messages[0].Msg.Timestamp
compMsg.Timestamp = set.recordsToSend.msgSet.Messages[0].Msg.Timestamp
}
req.AddMessage(topic, partition, compMsg)
}
......@@ -135,14 +178,19 @@ func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMe
}
func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
version := 1
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
}
switch {
// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
return true
// Would we overflow the size-limit of a compressed message-batch for this partition?
case ps.parent.conf.Producer.Compression != CompressionNone &&
ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes:
ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
return true
// Would we overflow simply in number of messages?
case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
......
......@@ -137,7 +137,7 @@ func TestProduceSetRequestBuilding(t *testing.T) {
t.Error("Timeout not set properly")
}
if len(req.msgSets) != 2 {
if len(req.records) != 2 {
t.Error("Wrong number of topics in request")
}
}
......@@ -166,7 +166,7 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) {
t.Error("Wrong request version")
}
for _, msgBlock := range req.msgSets["t1"][0].Messages {
for _, msgBlock := range req.records["t1"][0].msgSet.Messages {
msg := msgBlock.Msg
err := msg.decodeSet()
if err != nil {
......@@ -183,3 +183,40 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) {
}
}
}
func TestProduceSetV3RequestBuilding(t *testing.T) {
parent, ps := makeProduceSet()
parent.conf.Producer.RequiredAcks = WaitForAll
parent.conf.Producer.Timeout = 10 * time.Second
parent.conf.Version = V0_11_0_0
now := time.Now()
msg := &ProducerMessage{
Topic: "t1",
Partition: 0,
Key: StringEncoder(TestMessage),
Value: StringEncoder(TestMessage),
Timestamp: now,
}
for i := 0; i < 10; i++ {
safeAddMessage(t, ps, msg)
msg.Timestamp = msg.Timestamp.Add(time.Second)
}
req := ps.buildRequest()
if req.Version != 3 {
t.Error("Wrong request version")
}
batch := req.records["t1"][0].recordBatch
if batch.FirstTimestamp != now {
t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
}
for i := 0; i < 10; i++ {
rec := batch.Records[i]
if rec.TimestampDelta != time.Duration(i)*time.Second {
t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
}
}
}
......@@ -7,8 +7,10 @@ import (
var errInvalidArrayLength = PacketDecodingError{"invalid array length"}
var errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"}
var errInvalidByteSliceLengthType = PacketDecodingError{"invalid byteslice length type"}
var errInvalidStringLength = PacketDecodingError{"invalid string length"}
var errInvalidSubsetSize = PacketDecodingError{"invalid subset size"}
var errVarintOverflow = PacketDecodingError{"varint overflow"}
type realDecoder struct {
raw []byte
......@@ -58,12 +60,26 @@ func (rd *realDecoder) getInt64() (int64, error) {
return tmp, nil
}
func (rd *realDecoder) getVarint() (int64, error) {
tmp, n := binary.Varint(rd.raw[rd.off:])
if n == 0 {
rd.off = len(rd.raw)
return -1, ErrInsufficientData
}
if n < 0 {
rd.off -= n
return -1, errVarintOverflow
}
rd.off += n
return tmp, nil
}
func (rd *realDecoder) getArrayLength() (int, error) {
if rd.remaining() < 4 {
rd.off = len(rd.raw)
return -1, ErrInsufficientData
}
tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
tmp := int(int32(binary.BigEndian.Uint32(rd.raw[rd.off:])))
rd.off += 4
if tmp > rd.remaining() {
rd.off = len(rd.raw)
......@@ -78,28 +94,26 @@ func (rd *realDecoder) getArrayLength() (int, error) {
func (rd *realDecoder) getBytes() ([]byte, error) {
tmp, err := rd.getInt32()
if err != nil {
return nil, err
}
if tmp == -1 {
return nil, nil
}
n := int(tmp)
return rd.getRawBytes(int(tmp))
}
switch {
case n < -1:
return nil, errInvalidByteSliceLength
case n == -1:
func (rd *realDecoder) getVarintBytes() ([]byte, error) {
tmp, err := rd.getVarint()
if err != nil {
return nil, err
}
if tmp == -1 {
return nil, nil
case n == 0:
return make([]byte, 0), nil
case n > rd.remaining():
rd.off = len(rd.raw)
return nil, ErrInsufficientData
}
tmpStr := rd.raw[rd.off : rd.off+n]
rd.off += n
return tmpStr, nil
return rd.getRawBytes(int(tmp))
}
func (rd *realDecoder) getString() (string, error) {
......@@ -128,6 +142,15 @@ func (rd *realDecoder) getString() (string, error) {
return tmpStr, nil
}
func (rd *realDecoder) getNullableString() (*string, error) {
tmp, err := rd.getInt16()
if err != nil || tmp == -1 {
return nil, err
}
str, err := rd.getString()
return &str, err
}
func (rd *realDecoder) getInt32Array() ([]int32, error) {
if rd.remaining() < 4 {
rd.off = len(rd.raw)
......@@ -221,8 +244,16 @@ func (rd *realDecoder) remaining() int {
}
func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
buf, err := rd.getRawBytes(length)
if err != nil {
return nil, err
}
return &realDecoder{raw: buf}, nil
}
func (rd *realDecoder) getRawBytes(length int) ([]byte, error) {
if length < 0 {
return nil, errInvalidSubsetSize
return nil, errInvalidByteSliceLength
} else if length > rd.remaining() {
rd.off = len(rd.raw)
return nil, ErrInsufficientData
......@@ -230,7 +261,7 @@ func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
start := rd.off
rd.off += length
return &realDecoder{raw: rd.raw[start:rd.off]}, nil
return rd.raw[start:rd.off], nil
}
// stacks
......@@ -238,10 +269,17 @@ func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
func (rd *realDecoder) push(in pushDecoder) error {
in.saveOffset(rd.off)
reserve := in.reserveLength()
if rd.remaining() < reserve {
rd.off = len(rd.raw)
return ErrInsufficientData
var reserve int
if dpd, ok := in.(dynamicPushDecoder); ok {
if err := dpd.decode(rd); err != nil {
return err
}
} else {
reserve = in.reserveLength()
if rd.remaining() < reserve {
rd.off = len(rd.raw)
return ErrInsufficientData
}
}
rd.stack = append(rd.stack, in)
......
......@@ -35,6 +35,10 @@ func (re *realEncoder) putInt64(in int64) {
re.off += 8
}
func (re *realEncoder) putVarint(in int64) {
re.off += binary.PutVarint(re.raw[re.off:], in)
}
func (re *realEncoder) putArrayLength(in int) error {
re.putInt32(int32(in))
return nil
......@@ -54,9 +58,16 @@ func (re *realEncoder) putBytes(in []byte) error {
return nil
}
re.putInt32(int32(len(in)))
copy(re.raw[re.off:], in)
re.off += len(in)
return nil
return re.putRawBytes(in)
}
func (re *realEncoder) putVarintBytes(in []byte) error {
if in == nil {
re.putVarint(-1)
return nil
}
re.putVarint(int64(len(in)))
return re.putRawBytes(in)
}
func (re *realEncoder) putString(in string) error {
......@@ -66,6 +77,14 @@ func (re *realEncoder) putString(in string) error {
return nil
}
func (re *realEncoder) putNullableString(in *string) error {
if in == nil {
re.putInt16(-1)
return nil
}
return re.putString(*in)
}
func (re *realEncoder) putStringArray(in []string) error {
err := re.putArrayLength(len(in))
if err != nil {
......
package sarama
import (
"encoding/binary"
"time"
)
const (
controlMask = 0x20
maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
)
type RecordHeader struct {
Key []byte
Value []byte
}
func (h *RecordHeader) encode(pe packetEncoder) error {
if err := pe.putVarintBytes(h.Key); err != nil {
return err
}
return pe.putVarintBytes(h.Value)
}
func (h *RecordHeader) decode(pd packetDecoder) (err error) {
if h.Key, err = pd.getVarintBytes(); err != nil {
return err
}
if h.Value, err = pd.getVarintBytes(); err != nil {
return err
}
return nil
}
type Record struct {
Attributes int8
TimestampDelta time.Duration
OffsetDelta int64
Key []byte
Value []byte
Headers []*RecordHeader
length varintLengthField
}
func (r *Record) encode(pe packetEncoder) error {
pe.push(&r.length)
pe.putInt8(r.Attributes)
pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
pe.putVarint(r.OffsetDelta)
if err := pe.putVarintBytes(r.Key); err != nil {
return err
}
if err := pe.putVarintBytes(r.Value); err != nil {
return err
}
pe.putVarint(int64(len(r.Headers)))
for _, h := range r.Headers {
if err := h.encode(pe); err != nil {
return err
}
}
return pe.pop()
}
func (r *Record) decode(pd packetDecoder) (err error) {
if err = pd.push(&r.length); err != nil {
return err
}
if r.Attributes, err = pd.getInt8(); err != nil {
return err
}
timestamp, err := pd.getVarint()
if err != nil {
return err
}
r.TimestampDelta = time.Duration(timestamp) * time.Millisecond
if r.OffsetDelta, err = pd.getVarint(); err != nil {
return err
}
if r.Key, err = pd.getVarintBytes(); err != nil {
return err
}
if r.Value, err = pd.getVarintBytes(); err != nil {
return err
}
numHeaders, err := pd.getVarint()
if err != nil {
return err
}
if numHeaders >= 0 {
r.Headers = make([]*RecordHeader, numHeaders)
}
for i := int64(0); i < numHeaders; i++ {
hdr := new(RecordHeader)
if err := hdr.decode(pd); err != nil {
return err
}
r.Headers[i] = hdr
}
return pd.pop()
}
package sarama
import (
"bytes"
"compress/gzip"
"fmt"
"io/ioutil"
"time"
"github.com/eapache/go-xerial-snappy"
"github.com/pierrec/lz4"
)
const recordBatchOverhead = 49
type recordsArray []*Record
func (e recordsArray) encode(pe packetEncoder) error {
for _, r := range e {
if err := r.encode(pe); err != nil {
return err
}
}
return nil
}
func (e recordsArray) decode(pd packetDecoder) error {
for i := range e {
rec := &Record{}
if err := rec.decode(pd); err != nil {
return err
}
e[i] = rec
}
return nil
}
type RecordBatch struct {
FirstOffset int64
PartitionLeaderEpoch int32
Version int8
Codec CompressionCodec
Control bool
LastOffsetDelta int32
FirstTimestamp time.Time
MaxTimestamp time.Time
ProducerID int64
ProducerEpoch int16
FirstSequence int32
Records []*Record
PartialTrailingRecord bool
compressedRecords []byte
recordsLen int // uncompressed records size
}
func (b *RecordBatch) encode(pe packetEncoder) error {
if b.Version != 2 {
return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
}
pe.putInt64(b.FirstOffset)
pe.push(&lengthField{})
pe.putInt32(b.PartitionLeaderEpoch)
pe.putInt8(b.Version)
pe.push(newCRC32Field(crcCastagnoli))
pe.putInt16(b.computeAttributes())
pe.putInt32(b.LastOffsetDelta)
if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
return err
}
if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
return err
}
pe.putInt64(b.ProducerID)
pe.putInt16(b.ProducerEpoch)
pe.putInt32(b.FirstSequence)
if err := pe.putArrayLength(len(b.Records)); err != nil {
return err
}
if b.compressedRecords == nil {
if err := b.encodeRecords(pe); err != nil {
return err
}
}
if err := pe.putRawBytes(b.compressedRecords); err != nil {
return err
}
if err := pe.pop(); err != nil {
return err
}
return pe.pop()
}
func (b *RecordBatch) decode(pd packetDecoder) (err error) {
if b.FirstOffset, err = pd.getInt64(); err != nil {
return err
}
batchLen, err := pd.getInt32()
if err != nil {
return err
}
if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
return err
}
if b.Version, err = pd.getInt8(); err != nil {
return err
}
if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
return err
}
attributes, err := pd.getInt16()
if err != nil {
return err
}
b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
b.Control = attributes&controlMask == controlMask
if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
return err
}
if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
return err
}
if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
return err
}
if b.ProducerID, err = pd.getInt64(); err != nil {
return err
}
if b.ProducerEpoch, err = pd.getInt16(); err != nil {
return err
}
if b.FirstSequence, err = pd.getInt32(); err != nil {
return err
}
numRecs, err := pd.getArrayLength()
if err != nil {
return err
}
if numRecs >= 0 {
b.Records = make([]*Record, numRecs)
}
bufSize := int(batchLen) - recordBatchOverhead
recBuffer, err := pd.getRawBytes(bufSize)
if err != nil {
return err
}
if err = pd.pop(); err != nil {
return err
}
switch b.Codec {
case CompressionNone:
case CompressionGZIP:
reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
if err != nil {
return err
}
if recBuffer, err = ioutil.ReadAll(reader); err != nil {
return err
}
case CompressionSnappy:
if recBuffer, err = snappy.Decode(recBuffer); err != nil {
return err
}
case CompressionLZ4:
reader := lz4.NewReader(bytes.NewReader(recBuffer))
if recBuffer, err = ioutil.ReadAll(reader); err != nil {
return err
}
default:
return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
}
b.recordsLen = len(recBuffer)
err = decode(recBuffer, recordsArray(b.Records))
if err == ErrInsufficientData {
b.PartialTrailingRecord = true
b.Records = nil
return nil
}
return err
}
func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
var raw []byte
if b.Codec != CompressionNone {
var err error
if raw, err = encode(recordsArray(b.Records), nil); err != nil {
return err
}
b.recordsLen = len(raw)
}
switch b.Codec {
case CompressionNone:
offset := pe.offset()
if err := recordsArray(b.Records).encode(pe); err != nil {
return err
}
b.recordsLen = pe.offset() - offset
case CompressionGZIP:
var buf bytes.Buffer
writer := gzip.NewWriter(&buf)
if _, err := writer.Write(raw); err != nil {
return err
}
if err := writer.Close(); err != nil {
return err
}
b.compressedRecords = buf.Bytes()
case CompressionSnappy:
b.compressedRecords = snappy.Encode(raw)
case CompressionLZ4:
var buf bytes.Buffer
writer := lz4.NewWriter(&buf)
if _, err := writer.Write(raw); err != nil {
return err
}
if err := writer.Close(); err != nil {
return err
}
b.compressedRecords = buf.Bytes()
default:
return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
}
return nil
}
func (b *RecordBatch) computeAttributes() int16 {
attr := int16(b.Codec) & int16(compressionCodecMask)
if b.Control {
attr |= controlMask
}
return attr
}
func (b *RecordBatch) addRecord(r *Record) {
b.Records = append(b.Records, r)
}
package sarama
import (
"reflect"
"runtime"
"strconv"
"strings"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
)
var recordBatchTestCases = []struct {
name string
batch RecordBatch
encoded []byte
oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
}{
{
name: "empty record",
batch: RecordBatch{
Version: 2,
FirstTimestamp: time.Unix(0, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{},
},
encoded: []byte{
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 49, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
89, 95, 183, 221, // CRC
0, 0, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
0, 0, 0, 0, // First Sequence
0, 0, 0, 0, // Number of Records
},
},
{
name: "control batch",
batch: RecordBatch{
Version: 2,
Control: true,
FirstTimestamp: time.Unix(0, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{},
},
encoded: []byte{
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 49, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
81, 46, 67, 217, // CRC
0, 32, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
0, 0, 0, 0, // First Sequence
0, 0, 0, 0, // Number of Records
},
},
{
name: "uncompressed record",
batch: RecordBatch{
Version: 2,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Value: []byte{5, 6, 7},
Headers: []*RecordHeader{{
Key: []byte{8, 9, 10},
Value: []byte{11, 12},
}},
}},
recordsLen: 21,
},
encoded: []byte{
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 70, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
84, 121, 97, 253, // CRC
0, 0, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
0, 0, 0, 0, // First Sequence
0, 0, 0, 1, // Number of Records
40, // Record Length
0, // Attributes
10, // Timestamp Delta
0, // Offset Delta
8, // Key Length
1, 2, 3, 4,
6, // Value Length
5, 6, 7,
2, // Number of Headers
6, // Header Key Length
8, 9, 10, // Header Key
4, // Header Value Length
11, 12, // Header Value
},
},
{
name: "gzipped record",
batch: RecordBatch{
Version: 2,
Codec: CompressionGZIP,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Value: []byte{5, 6, 7},
Headers: []*RecordHeader{{
Key: []byte{8, 9, 10},
Value: []byte{11, 12},
}},
}},
recordsLen: 21,
},
encoded: []byte{
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 94, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
159, 236, 182, 189, // CRC
0, 1, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
0, 0, 0, 0, // First Sequence
0, 0, 0, 1, // Number of Records
31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
},
oldGoEncoded: []byte{
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 94, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
0, 216, 14, 210, // CRC
0, 1, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
0, 0, 0, 0, // First Sequence
0, 0, 0, 1, // Number of Records
31, 139, 8, 0, 0, 9, 110, 136, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
},
},
{
name: "snappy compressed record",
batch: RecordBatch{
Version: 2,
Codec: CompressionSnappy,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Value: []byte{5, 6, 7},
Headers: []*RecordHeader{{
Key: []byte{8, 9, 10},
Value: []byte{11, 12},
}},
}},
recordsLen: 21,
},
encoded: []byte{
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 72, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
21, 0, 159, 97, // CRC
0, 2, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
0, 0, 0, 0, // First Sequence
0, 0, 0, 1, // Number of Records
21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12,
},
},
{
name: "lz4 compressed record",
batch: RecordBatch{
Version: 2,
Codec: CompressionLZ4,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Value: []byte{5, 6, 7},
Headers: []*RecordHeader{{
Key: []byte{8, 9, 10},
Value: []byte{11, 12},
}},
}},
recordsLen: 21,
},
encoded: []byte{
0, 0, 0, 0, 0, 0, 0, 0, // First Offset
0, 0, 0, 89, // Length
0, 0, 0, 0, // Partition Leader Epoch
2, // Version
169, 74, 119, 197, // CRC
0, 3, // Attributes
0, 0, 0, 0, // Last Offset Delta
0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
0, 0, // Producer Epoch
0, 0, 0, 0, // First Sequence
0, 0, 0, 1, // Number of Records
4, 34, 77, 24, 100, 112, 185, 21, 0, 0, 128, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2,
6, 8, 9, 10, 4, 11, 12, 0, 0, 0, 0, 12, 59, 239, 146,
},
},
}
func isOldGo(t *testing.T) bool {
v := strings.Split(runtime.Version()[2:], ".")
if len(v) < 2 {
t.Logf("Can't parse version: %s", runtime.Version())
return false
}
maj, err := strconv.Atoi(v[0])
if err != nil {
t.Logf("Can't parse version: %s", runtime.Version())
return false
}
min, err := strconv.Atoi(v[1])
if err != nil {
t.Logf("Can't parse version: %s", runtime.Version())
return false
}
return maj < 1 || (maj == 1 && min < 8)
}
func TestRecordBatchEncoding(t *testing.T) {
for _, tc := range recordBatchTestCases {
if tc.oldGoEncoded != nil && isOldGo(t) {
testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded)
} else {
testEncodable(t, tc.name, &tc.batch, tc.encoded)
}
}
}
func TestRecordBatchDecoding(t *testing.T) {
for _, tc := range recordBatchTestCases {
batch := RecordBatch{}
testDecodable(t, tc.name, &batch, tc.encoded)
for _, r := range batch.Records {
r.length = varintLengthField{}
}
for _, r := range tc.batch.Records {
r.length = varintLengthField{}
}
if !reflect.DeepEqual(batch, tc.batch) {
t.Errorf(spew.Sprintf("invalid decode of %s\ngot %+v\nwanted %+v", tc.name, batch, tc.batch))
}
}
}
package sarama
import "fmt"
const (
legacyRecords = iota
defaultRecords
)
// Records implements a union type containing either a RecordBatch or a legacy MessageSet.
type Records struct {
recordsType int
msgSet *MessageSet
recordBatch *RecordBatch
}
func newLegacyRecords(msgSet *MessageSet) Records {
return Records{recordsType: legacyRecords, msgSet: msgSet}
}
func newDefaultRecords(batch *RecordBatch) Records {
return Records{recordsType: defaultRecords, recordBatch: batch}
}
func (r *Records) encode(pe packetEncoder) error {
switch r.recordsType {
case legacyRecords:
if r.msgSet == nil {
return nil
}
return r.msgSet.encode(pe)
case defaultRecords:
if r.recordBatch == nil {
return nil
}
return r.recordBatch.encode(pe)
}
return fmt.Errorf("unknown records type: %v", r.recordsType)
}
func (r *Records) decode(pd packetDecoder) error {
switch r.recordsType {
case legacyRecords:
r.msgSet = &MessageSet{}
return r.msgSet.decode(pd)
case defaultRecords:
r.recordBatch = &RecordBatch{}
return r.recordBatch.decode(pd)
}
return fmt.Errorf("unknown records type: %v", r.recordsType)
}
func (r *Records) numRecords() (int, error) {
switch r.recordsType {
case legacyRecords:
if r.msgSet == nil {
return 0, nil
}
return len(r.msgSet.Messages), nil
case defaultRecords:
if r.recordBatch == nil {
return 0, nil
}
return len(r.recordBatch.Records), nil
}
return 0, fmt.Errorf("unknown records type: %v", r.recordsType)
}
func (r *Records) isPartial() (bool, error) {
switch r.recordsType {
case legacyRecords:
if r.msgSet == nil {
return false, nil
}
return r.msgSet.PartialTrailingMessage, nil
case defaultRecords:
if r.recordBatch == nil {
return false, nil
}
return r.recordBatch.PartialTrailingRecord, nil
}
return false, fmt.Errorf("unknown records type: %v", r.recordsType)
}
func (r *Records) isControl() (bool, error) {
switch r.recordsType {
case legacyRecords:
return false, nil
case defaultRecords:
if r.recordBatch == nil {
return false, nil
}
return r.recordBatch.Control, nil
}
return false, fmt.Errorf("unknown records type: %v", r.recordsType)
}
package sarama
import (
"bytes"
"reflect"
"testing"
)
func TestLegacyRecords(t *testing.T) {
set := &MessageSet{
Messages: []*MessageBlock{
{
Msg: &Message{
Version: 1,
},
},
},
}
r := newLegacyRecords(set)
exp, err := encode(set, nil)
if err != nil {
t.Fatal(err)
}
buf, err := encode(&r, nil)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, exp) {
t.Errorf("Wrong encoding for legacy records, wanted %v, got %v", exp, buf)
}
set = &MessageSet{}
r = newLegacyRecords(nil)
err = decode(exp, set)
if err != nil {
t.Fatal(err)
}
err = decode(buf, &r)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(set, r.msgSet) {
t.Errorf("Wrong decoding for legacy records, wanted %#+v, got %#+v", set, r.msgSet)
}
n, err := r.numRecords()
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Errorf("Wrong number of records, wanted 1, got %d", n)
}
p, err := r.isPartial()
if err != nil {
t.Fatal(err)
}
if p {
t.Errorf("MessageSet shouldn't have a partial trailing message")
}
c, err := r.isControl()
if err != nil {
t.Fatal(err)
}
if c {
t.Errorf("MessageSet can't be a control batch")
}
}
func TestDefaultRecords(t *testing.T) {
batch := &RecordBatch{
Version: 2,
Records: []*Record{
{
Value: []byte{1},
},
},
}
r := newDefaultRecords(batch)
exp, err := encode(batch, nil)
if err != nil {
t.Fatal(err)
}
buf, err := encode(&r, nil)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, exp) {
t.Errorf("Wrong encoding for default records, wanted %v, got %v", exp, buf)
}
batch = &RecordBatch{}
r = newDefaultRecords(nil)
err = decode(exp, batch)
if err != nil {
t.Fatal(err)
}
err = decode(buf, &r)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(batch, r.recordBatch) {
t.Errorf("Wrong decoding for default records, wanted %#+v, got %#+v", batch, r.recordBatch)
}
n, err := r.numRecords()
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Errorf("Wrong number of records, wanted 1, got %d", n)
}
p, err := r.isPartial()
if err != nil {
t.Fatal(err)
}
if p {
t.Errorf("RecordBatch shouldn't have a partial trailing record")
}
c, err := r.isControl()
if err != nil {
t.Fatal(err)
}
if c {
t.Errorf("RecordBatch shouldn't be a control batch")
}
}
package sarama
import (
"fmt"
"time"
)
type Timestamp struct {
*time.Time
}
func (t Timestamp) encode(pe packetEncoder) error {
timestamp := int64(-1)
if !t.Before(time.Unix(0, 0)) {
timestamp = t.UnixNano() / int64(time.Millisecond)
} else if !t.IsZero() {
return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", t)}
}
pe.putInt64(timestamp)
return nil
}
func (t Timestamp) decode(pd packetDecoder) error {
millis, err := pd.getInt64()
if err != nil {
return err
}
// negative timestamps are invalid, in these cases we should return
// a zero time
timestamp := time.Time{}
if millis >= 0 {
timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
}
*t.Time = timestamp
return nil
}
......@@ -146,5 +146,6 @@ var (
V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
minVersion = V0_8_2_0
)
......@@ -6,6 +6,14 @@ Thank you for your interest in contributing to the Apache Thrift project! Infor
* [Get involved!](http://www.apache.org/foundation/getinvolved.html)
* [Legal aspects on Submission of Contributions (Patches)](http://www.apache.org/licenses/LICENSE-2.0.html#contributions)
## If you want to build the project locally ##
For Windows systems, see our detailed instructions on the [CMake README](/build/cmake/README.md).
For Windows Native C++ builds, see our detailed instructions on the [WinCPP README](/build/wincpp/README.md).
For unix systems, see our detailed instructions on the [Docker README](/build/docker/README.md).
## If you want to review open issues... ##
1. Review the [GitHub Pull Request Backlog](https://github.com/apache/thrift/pulls). Code reviews are open to all.
......
Apache Thrift
=============
Last Modified: 2014-03-16
Last Modified: 2017--10
License
=======
......@@ -171,3 +171,8 @@ To run the cross-language test suite, please run:
This will run a set of tests that use different language clients and
servers.
Development
===========
To build the same way Travis CI builds the project you should use docker.
We have [comprehensive building instructions for docker](build/docker/README.md).
#### Support
If you do have a contribution to the package, feel free to create a Pull Request or an Issue.
#### What to contribute
If you don't know what to do, there are some features and functions that need to be done
- [ ] Refactor code
- [ ] Edit docs and [README](https://github.com/asaskevich/govalidator/README.md): spellcheck, grammar and typo check
- [ ] Create actual list of contributors and projects that currently using this package
- [ ] Resolve [issues and bugs](https://github.com/asaskevich/govalidator/issues)
- [ ] Update actual [list of functions](https://github.com/asaskevich/govalidator#list-of-functions)
- [ ] Update [list of validators](https://github.com/asaskevich/govalidator#validatestruct-2) that available for `ValidateStruct` and add new
- [ ] Implement new validators: `IsFQDN`, `IsIMEI`, `IsPostalCode`, `IsISIN`, `IsISRC` etc
- [ ] Implement [validation by maps](https://github.com/asaskevich/govalidator/issues/224)
- [ ] Implement fuzzing testing
- [ ] Implement some struct/map/array utilities
- [ ] Implement map/array validation
- [ ] Implement benchmarking
- [ ] Implement batch of examples
- [ ] Look at forks for new features and fixes
#### Advice
Feel free to create what you want, but keep in mind when you implement new features:
- Code must be clear and readable, names of variables/constants clearly describes what they are doing
- Public functions must be documented and described in source file and added to README.md to the list of available functions
- There are must be unit-tests for any new functions and improvements
\ No newline at end of file
......@@ -156,6 +156,7 @@ func IsPort(str string) bool
func IsPositive(value float64) bool
func IsPrintableASCII(str string) bool
func IsRFC3339(str string) bool
func IsRFC3339WithoutZone(str string) bool
func IsRGBcolor(str string) bool
func IsRequestURI(rawurl string) bool
func IsRequestURL(rawurl string) bool
......@@ -269,56 +270,57 @@ For completely custom validators (interface-based), see below.
Here is a list of available validators for struct fields (validator - used function):
```go
"email": IsEmail,
"url": IsURL,
"dialstring": IsDialString,
"requrl": IsRequestURL,
"requri": IsRequestURI,
"alpha": IsAlpha,
"utfletter": IsUTFLetter,
"alphanum": IsAlphanumeric,
"utfletternum": IsUTFLetterNumeric,
"numeric": IsNumeric,
"utfnumeric": IsUTFNumeric,
"utfdigit": IsUTFDigit,
"hexadecimal": IsHexadecimal,
"hexcolor": IsHexcolor,
"rgbcolor": IsRGBcolor,
"lowercase": IsLowerCase,
"uppercase": IsUpperCase,
"int": IsInt,
"float": IsFloat,
"null": IsNull,
"uuid": IsUUID,
"uuidv3": IsUUIDv3,
"uuidv4": IsUUIDv4,
"uuidv5": IsUUIDv5,
"creditcard": IsCreditCard,
"isbn10": IsISBN10,
"isbn13": IsISBN13,
"json": IsJSON,
"multibyte": IsMultibyte,
"ascii": IsASCII,
"printableascii": IsPrintableASCII,
"fullwidth": IsFullWidth,
"halfwidth": IsHalfWidth,
"variablewidth": IsVariableWidth,
"base64": IsBase64,
"datauri": IsDataURI,
"ip": IsIP,
"port": IsPort,
"ipv4": IsIPv4,
"ipv6": IsIPv6,
"dns": IsDNSName,
"host": IsHost,
"mac": IsMAC,
"latitude": IsLatitude,
"longitude": IsLongitude,
"ssn": IsSSN,
"semver": IsSemver,
"rfc3339": IsRFC3339,
"ISO3166Alpha2": IsISO3166Alpha2,
"ISO3166Alpha3": IsISO3166Alpha3,
"email": IsEmail,
"url": IsURL,
"dialstring": IsDialString,
"requrl": IsRequestURL,
"requri": IsRequestURI,
"alpha": IsAlpha,
"utfletter": IsUTFLetter,
"alphanum": IsAlphanumeric,
"utfletternum": IsUTFLetterNumeric,
"numeric": IsNumeric,
"utfnumeric": IsUTFNumeric,
"utfdigit": IsUTFDigit,
"hexadecimal": IsHexadecimal,
"hexcolor": IsHexcolor,
"rgbcolor": IsRGBcolor,
"lowercase": IsLowerCase,
"uppercase": IsUpperCase,
"int": IsInt,
"float": IsFloat,
"null": IsNull,
"uuid": IsUUID,
"uuidv3": IsUUIDv3,
"uuidv4": IsUUIDv4,
"uuidv5": IsUUIDv5,
"creditcard": IsCreditCard,
"isbn10": IsISBN10,
"isbn13": IsISBN13,
"json": IsJSON,
"multibyte": IsMultibyte,
"ascii": IsASCII,
"printableascii": IsPrintableASCII,
"fullwidth": IsFullWidth,
"halfwidth": IsHalfWidth,
"variablewidth": IsVariableWidth,
"base64": IsBase64,
"datauri": IsDataURI,
"ip": IsIP,
"port": IsPort,
"ipv4": IsIPv4,
"ipv6": IsIPv6,
"dns": IsDNSName,
"host": IsHost,
"mac": IsMAC,
"latitude": IsLatitude,
"longitude": IsLongitude,
"ssn": IsSSN,
"semver": IsSemver,
"rfc3339": IsRFC3339,
"rfc3339WithoutZone": IsRFC3339WithoutZone,
"ISO3166Alpha2": IsISO3166Alpha2,
"ISO3166Alpha3": IsISO3166Alpha3,
```
Validators with parameters
......@@ -409,7 +411,31 @@ Documentation is available here: [godoc.org](https://godoc.org/github.com/asaske
Full information about code coverage is also available here: [govalidator on gocover.io](http://gocover.io/github.com/asaskevich/govalidator).
#### Support
If you do have a contribution for the package feel free to put up a Pull Request or open Issue.
If you do have a contribution to the package, feel free to create a Pull Request or an Issue.
#### What to contribute
If you don't know what to do, there are some features and functions that need to be done
- [ ] Refactor code
- [ ] Edit docs and [README](https://github.com/asaskevich/govalidator/README.md): spellcheck, grammar and typo check
- [ ] Create actual list of contributors and projects that currently using this package
- [ ] Resolve [issues and bugs](https://github.com/asaskevich/govalidator/issues)
- [ ] Update actual [list of functions](https://github.com/asaskevich/govalidator#list-of-functions)
- [ ] Update [list of validators](https://github.com/asaskevich/govalidator#validatestruct-2) that available for `ValidateStruct` and add new
- [ ] Implement new validators: `IsFQDN`, `IsIMEI`, `IsPostalCode`, `IsISIN`, `IsISRC` etc
- [ ] Implement [validation by maps](https://github.com/asaskevich/govalidator/issues/224)
- [ ] Implement fuzzing testing
- [ ] Implement some struct/map/array utilities
- [ ] Implement map/array validation
- [ ] Implement benchmarking
- [ ] Implement batch of examples
- [ ] Look at forks for new features and fixes
#### Advice
Feel free to create what you want, but keep in mind when you implement new features:
- Code must be clear and readable, names of variables/constants clearly describes what they are doing
- Public functions must be documented and described in source file and added to README.md to the list of available functions
- There are must be unit-tests for any new functions and improvements
#### Special thanks to [contributors](https://github.com/asaskevich/govalidator/graphs/contributors)
* [Daniel Lohse](https://github.com/annismckenzie)
......
package govalidator
import "strings"
// Errors is an array of multiple errors and conforms to the error interface.
type Errors []error
......@@ -9,11 +11,11 @@ func (es Errors) Errors() []error {
}
func (es Errors) Error() string {
var err string
var errs []string
for _, e := range es {
err += e.Error() + ";"
errs = append(errs, e.Error())
}
return err
return strings.Join(errs, ";")
}
// Error encapsulates a name, an error and whether there's a custom error message or not.
......@@ -21,6 +23,9 @@ type Error struct {
Name string
Err error
CustomErrorMessageExists bool
// Validator indicates the name of the validator that failed
Validator string
}
func (e Error) Error() string {
......
......@@ -15,10 +15,10 @@ func TestErrorsToString(t *testing.T) {
expected string
}{
{Errors{}, ""},
{Errors{fmt.Errorf("Error 1")}, "Error 1;"},
{Errors{fmt.Errorf("Error 1"), fmt.Errorf("Error 2")}, "Error 1;Error 2;"},
{Errors{customErr, fmt.Errorf("Error 2")}, "Custom Error Name: stdlib error;Error 2;"},
{Errors{fmt.Errorf("Error 123"), customErrWithCustomErrorMessage}, "Error 123;Bad stuff happened;"},
{Errors{fmt.Errorf("Error 1")}, "Error 1"},
{Errors{fmt.Errorf("Error 1"), fmt.Errorf("Error 2")}, "Error 1;Error 2"},
{Errors{customErr, fmt.Errorf("Error 2")}, "Custom Error Name: stdlib error;Error 2"},
{Errors{fmt.Errorf("Error 123"), customErrWithCustomErrorMessage}, "Error 123;Bad stuff happened"},
}
for _, test := range tests {
actual := test.param1.Error()
......
package govalidator
import "math"
import (
"math"
"reflect"
)
// Abs returns absolute value of number
func Abs(value float64) float64 {
......@@ -39,13 +42,47 @@ func IsNonPositive(value float64) bool {
}
// InRange returns true if value lies between left and right border
func InRange(value, left, right float64) bool {
func InRangeInt(value, left, right int) bool {
if left > right {
left, right = right, left
}
return value >= left && value <= right
}
// InRange returns true if value lies between left and right border
func InRangeFloat32(value, left, right float32) bool {
if left > right {
left, right = right, left
}
return value >= left && value <= right
}
// InRange returns true if value lies between left and right border
func InRangeFloat64(value, left, right float64) bool {
if left > right {
left, right = right, left
}
return value >= left && value <= right
}
// InRange returns true if value lies between left and right border, generic type to handle int, float32 or float64, all types must the same type
func InRange(value interface{}, left interface{}, right interface{}) bool {
reflectValue := reflect.TypeOf(value).Kind()
reflectLeft := reflect.TypeOf(left).Kind()
reflectRight := reflect.TypeOf(right).Kind()
if reflectValue == reflect.Int && reflectLeft == reflect.Int && reflectRight == reflect.Int {
return InRangeInt(value.(int), left.(int), right.(int))
} else if reflectValue == reflect.Float32 && reflectLeft == reflect.Float32 && reflectRight == reflect.Float32 {
return InRangeFloat32(value.(float32), left.(float32), right.(float32))
} else if reflectValue == reflect.Float64 && reflectLeft == reflect.Float64 && reflectRight == reflect.Float64 {
return InRangeFloat64(value.(float64), left.(float64), right.(float64))
} else {
return false
}
}
// IsWhole returns true if value is whole number
func IsWhole(value float64) bool {
return math.Remainder(value, 1) == 0
......
......@@ -177,7 +177,60 @@ func TestIsNatural(t *testing.T) {
}
}
}
func TestInRange(t *testing.T) {
func TestInRangeInt(t *testing.T) {
t.Parallel()
var tests = []struct {
param int
left int
right int
expected bool
}{
{0, 0, 0, true},
{1, 0, 0, false},
{-1, 0, 0, false},
{0, -1, 1, true},
{0, 0, 1, true},
{0, -1, 0, true},
{0, 0, -1, true},
{0, 10, 5, false},
}
for _, test := range tests {
actual := InRangeInt(test.param, test.left, test.right)
if actual != test.expected {
t.Errorf("Expected InRangeInt(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual)
}
}
}
func TestInRangeFloat32(t *testing.T) {
t.Parallel()
var tests = []struct {
param float32
left float32
right float32
expected bool
}{
{0, 0, 0, true},
{1, 0, 0, false},
{-1, 0, 0, false},
{0, -1, 1, true},
{0, 0, 1, true},
{0, -1, 0, true},
{0, 0, -1, true},
{0, 10, 5, false},
}
for _, test := range tests {
actual := InRangeFloat32(test.param, test.left, test.right)
if actual != test.expected {
t.Errorf("Expected InRangeFloat32(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual)
}
}
}
func TestInRangeFloat64(t *testing.T) {
t.Parallel()
var tests = []struct {
......@@ -196,6 +249,98 @@ func TestInRange(t *testing.T) {
{0, 10, 5, false},
}
for _, test := range tests {
actual := InRangeFloat64(test.param, test.left, test.right)
if actual != test.expected {
t.Errorf("Expected InRangeFloat64(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual)
}
}
}
func TestInRange(t *testing.T) {
t.Parallel()
var testsInt = []struct {
param int
left int
right int
expected bool
}{
{0, 0, 0, true},
{1, 0, 0, false},
{-1, 0, 0, false},
{0, -1, 1, true},
{0, 0, 1, true},
{0, -1, 0, true},
{0, 0, -1, true},
{0, 10, 5, false},
}
for _, test := range testsInt {
actual := InRange(test.param, test.left, test.right)
if actual != test.expected {
t.Errorf("Expected InRange(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual)
}
}
var testsFloat32 = []struct {
param float32
left float32
right float32
expected bool
}{
{0, 0, 0, true},
{1, 0, 0, false},
{-1, 0, 0, false},
{0, -1, 1, true},
{0, 0, 1, true},
{0, -1, 0, true},
{0, 0, -1, true},
{0, 10, 5, false},
}
for _, test := range testsFloat32 {
actual := InRange(test.param, test.left, test.right)
if actual != test.expected {
t.Errorf("Expected InRange(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual)
}
}
var testsFloat64 = []struct {
param float64
left float64
right float64
expected bool
}{
{0, 0, 0, true},
{1, 0, 0, false},
{-1, 0, 0, false},
{0, -1, 1, true},
{0, 0, 1, true},
{0, -1, 0, true},
{0, 0, -1, true},
{0, 10, 5, false},
}
for _, test := range testsFloat64 {
actual := InRange(test.param, test.left, test.right)
if actual != test.expected {
t.Errorf("Expected InRange(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual)
}
}
var testsTypeMix = []struct {
param int
left float64
right float64
expected bool
}{
{0, 0, 0, false},
{1, 0, 0, false},
{-1, 0, 0, false},
{0, -1, 1, false},
{0, 0, 1, false},
{0, -1, 0, false},
{0, 0, -1, false},
{0, 10, 5, false},
}
for _, test := range testsTypeMix {
actual := InRange(test.param, test.left, test.right)
if actual != test.expected {
t.Errorf("Expected InRange(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual)
......
......@@ -33,7 +33,6 @@ const (
IP string = `(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))`
URLSchema string = `((ftp|tcp|udp|wss?|https?):\/\/)`
URLUsername string = `(\S+(:\S*)?@)`
Hostname string = ``
URLPath string = `((\/|\?|#)[^\s]*)`
URLPort string = `(:(\d{1,5}))`
URLIP string = `([1-9]\d?|1\d\d|2[01]\d|22[0-3])(\.(1?\d{1,2}|2[0-4]\d|25[0-5])){2}(?:\.([0-9]\d?|1\d\d|2[0-4]\d|25[0-4]))`
......
......@@ -34,6 +34,7 @@ var ParamTagMap = map[string]ParamValidator{
"stringlength": StringLength,
"matches": StringMatches,
"in": isInRaw,
"rsapub": IsRsaPub,
}
// ParamTagRegexMap maps param tags to their respective regexes.
......@@ -44,6 +45,7 @@ var ParamTagRegexMap = map[string]*regexp.Regexp{
"stringlength": regexp.MustCompile("^stringlength\\((\\d+)\\|(\\d+)\\)$"),
"in": regexp.MustCompile(`^in\((.*)\)`),
"matches": regexp.MustCompile(`^matches\((.+)\)$`),
"rsapub": regexp.MustCompile("^rsapub\\((\\d+)\\)$"),
}
type customTypeTagMap struct {
......@@ -72,57 +74,58 @@ var CustomTypeTagMap = &customTypeTagMap{validators: make(map[string]CustomTypeV
// TagMap is a map of functions, that can be used as tags for ValidateStruct function.
var TagMap = map[string]Validator{
"email": IsEmail,
"url": IsURL,
"dialstring": IsDialString,
"requrl": IsRequestURL,
"requri": IsRequestURI,
"alpha": IsAlpha,
"utfletter": IsUTFLetter,
"alphanum": IsAlphanumeric,
"utfletternum": IsUTFLetterNumeric,
"numeric": IsNumeric,
"utfnumeric": IsUTFNumeric,
"utfdigit": IsUTFDigit,
"hexadecimal": IsHexadecimal,
"hexcolor": IsHexcolor,
"rgbcolor": IsRGBcolor,
"lowercase": IsLowerCase,
"uppercase": IsUpperCase,
"int": IsInt,
"float": IsFloat,
"null": IsNull,
"uuid": IsUUID,
"uuidv3": IsUUIDv3,
"uuidv4": IsUUIDv4,
"uuidv5": IsUUIDv5,
"creditcard": IsCreditCard,
"isbn10": IsISBN10,
"isbn13": IsISBN13,
"json": IsJSON,
"multibyte": IsMultibyte,
"ascii": IsASCII,
"printableascii": IsPrintableASCII,
"fullwidth": IsFullWidth,
"halfwidth": IsHalfWidth,
"variablewidth": IsVariableWidth,
"base64": IsBase64,
"datauri": IsDataURI,
"ip": IsIP,
"port": IsPort,
"ipv4": IsIPv4,
"ipv6": IsIPv6,
"dns": IsDNSName,
"host": IsHost,
"mac": IsMAC,
"latitude": IsLatitude,
"longitude": IsLongitude,
"ssn": IsSSN,
"semver": IsSemver,
"rfc3339": IsRFC3339,
"ISO3166Alpha2": IsISO3166Alpha2,
"ISO3166Alpha3": IsISO3166Alpha3,
"ISO4217": IsISO4217,
"email": IsEmail,
"url": IsURL,
"dialstring": IsDialString,
"requrl": IsRequestURL,
"requri": IsRequestURI,
"alpha": IsAlpha,
"utfletter": IsUTFLetter,
"alphanum": IsAlphanumeric,
"utfletternum": IsUTFLetterNumeric,
"numeric": IsNumeric,
"utfnumeric": IsUTFNumeric,
"utfdigit": IsUTFDigit,
"hexadecimal": IsHexadecimal,
"hexcolor": IsHexcolor,
"rgbcolor": IsRGBcolor,
"lowercase": IsLowerCase,
"uppercase": IsUpperCase,
"int": IsInt,
"float": IsFloat,
"null": IsNull,
"uuid": IsUUID,
"uuidv3": IsUUIDv3,
"uuidv4": IsUUIDv4,
"uuidv5": IsUUIDv5,
"creditcard": IsCreditCard,
"isbn10": IsISBN10,
"isbn13": IsISBN13,
"json": IsJSON,
"multibyte": IsMultibyte,
"ascii": IsASCII,
"printableascii": IsPrintableASCII,
"fullwidth": IsFullWidth,
"halfwidth": IsHalfWidth,
"variablewidth": IsVariableWidth,
"base64": IsBase64,
"datauri": IsDataURI,
"ip": IsIP,
"port": IsPort,
"ipv4": IsIPv4,
"ipv6": IsIPv6,
"dns": IsDNSName,
"host": IsHost,
"mac": IsMAC,
"latitude": IsLatitude,
"longitude": IsLongitude,
"ssn": IsSSN,
"semver": IsSemver,
"rfc3339": IsRFC3339,
"rfc3339WithoutZone": IsRFC3339WithoutZone,
"ISO3166Alpha2": IsISO3166Alpha2,
"ISO3166Alpha3": IsISO3166Alpha3,
"ISO4217": IsISO4217,
}
// ISO3166Entry stores country codes
......
......@@ -108,7 +108,7 @@ func CamelCaseToUnderscore(str string) string {
var output []rune
var segment []rune
for _, r := range str {
if !unicode.IsLower(r) {
if !unicode.IsLower(r) && string(r) != "_" {
output = addSegment(output, segment)
segment = nil
}
......
......@@ -269,6 +269,7 @@ func TestCamelCaseToUnderscore(t *testing.T) {
{"MyFunc", "my_func"},
{"ABC", "a_b_c"},
{"1B", "1_b"},
{"foo_bar", "foo_bar"},
}
for _, test := range tests {
actual := CamelCaseToUnderscore(test.param)
......
......@@ -17,8 +17,8 @@ package swag
import (
"io/ioutil"
"os"
"path/filepath"
"path"
"path/filepath"
"runtime"
"testing"
......@@ -75,7 +75,7 @@ func TestFindPackage(t *testing.T) {
os.RemoveAll(pth2)
}()
searchPath := pth + string(filepath.ListSeparator) + pth2
searchPath := pth + string(filepath.ListSeparator) + pth2
// finds package when real name mentioned
pkg := FindInSearchPath(searchPath, "foo/bar")
assert.NotEmpty(t, pkg)
......
......@@ -40,6 +40,7 @@ var commonInitialisms = map[string]bool{
"IP": true,
"JSON": true,
"LHS": true,
"OAI": true,
"QPS": true,
"RAM": true,
"RHS": true,
......@@ -163,8 +164,8 @@ func split(str string) (words []string) {
// Split when uppercase is found (needed for Snake)
str = rex1.ReplaceAllString(str, " $1")
// check if consecutive single char things make up an initialism
// check if consecutive single char things make up an initialism
for _, k := range initialisms {
str = strings.Replace(str, rex1.ReplaceAllString(k, " $1"), " "+k, -1)
}
......@@ -189,10 +190,47 @@ func lower(str string) string {
return strings.ToLower(trim(str))
}
// Camelize an uppercased word
func Camelize(word string) (camelized string) {
for pos, ru := range word {
if pos > 0 {
camelized += string(unicode.ToLower(ru))
} else {
camelized += string(unicode.ToUpper(ru))
}
}
return
}
// ToFileName lowercases and underscores a go type name
func ToFileName(name string) string {
var out []string
for _, w := range split(name) {
cml := trim(name)
// Camelize any capital word preceding a reserved keyword ("initialism")
// thus, upper-cased words preceding a common initialism will get separated
// e.g: ELBHTTPLoadBalancer becomes elb_http_load_balancer
rexPrevious := regexp.MustCompile(`(?P<word>\p{Lu}{2,})(?:HTTP|OAI)`)
cml = rexPrevious.ReplaceAllStringFunc(cml, func(match string) (replaceInMatch string) {
for _, m := range rexPrevious.FindAllStringSubmatch(match, -1) { // [ match submatch ]
if m[1] != "" {
replaceInMatch = strings.Replace(m[0], m[1], Camelize(m[1]), -1)
}
}
return
})
// Pre-camelize reserved keywords ("initialisms") to avoid unnecessary hyphenization
for _, k := range initialisms {
cml = strings.Replace(cml, k, Camelize(k), -1)
}
// Camelize other capital words to avoid unnecessary hyphenization
rexCase := regexp.MustCompile(`(\p{Lu}{2,})`)
cml = rexCase.ReplaceAllStringFunc(cml, Camelize)
// Final split with hyphens
for _, w := range split(cml) {
out = append(out, lower(w))
}
return strings.Join(out, "_")
......
......@@ -39,6 +39,7 @@ func TestToGoName(t *testing.T) {
{"findThingById", "FindThingByID"},
{"日本語sample 2 Text", "X日本語sample2Text"},
{"日本語findThingById", "X日本語findThingByID"},
{"findTHINGSbyID", "FindTHINGSbyID"},
}
for k := range commonInitialisms {
......@@ -122,8 +123,16 @@ func TestToFileName(t *testing.T) {
samples := []translationSample{
{"SampleText", "sample_text"},
{"FindThingByID", "find_thing_by_id"},
{"CAPWD.folwdBYlc", "capwd_folwd_bylc"},
{"CAPWDfolwdBYlc", "capwdfolwd_bylc"},
{"CAP_WD_folwdBYlc", "cap_wd_folwd_bylc"},
{"TypeOAI_alias", "type_oai_alias"},
{"Type_OAI_alias", "type_oai_alias"},
{"Type_OAIAlias", "type_oai_alias"},
{"ELB.HTTPLoadBalancer", "elb_http_load_balancer"},
{"elbHTTPLoadBalancer", "elb_http_load_balancer"},
{"ELBHTTPLoadBalancer", "elb_http_load_balancer"},
}
for k := range commonInitialisms {
samples = append(samples,
translationSample{"Sample" + k + "Text", "sample_" + lower(k) + "_text"},
......
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package unix
import (
"os"
"syscall"
)
// FIXME: unexported function from os
// syscallMode returns the syscall-specific mode bits from Go's portable mode bits.
func syscallMode(i os.FileMode) (o uint32) {
o |= uint32(i.Perm())
if i&os.ModeSetuid != 0 {
o |= syscall.S_ISUID
}
if i&os.ModeSetgid != 0 {
o |= syscall.S_ISGID
}
if i&os.ModeSticky != 0 {
o |= syscall.S_ISVTX
}
// No mapping for Go's ModeTemporary (plan9 only).
return
}
......@@ -1125,6 +1125,10 @@ func PtracePokeData(pid int, addr uintptr, data []byte) (count int, err error) {
return ptracePoke(PTRACE_POKEDATA, PTRACE_PEEKDATA, pid, addr, data)
}
func PtracePokeUser(pid int, addr uintptr, data []byte) (count int, err error) {
return ptracePoke(PTRACE_POKEUSR, PTRACE_PEEKUSR, pid, addr, data)
}
func PtraceGetRegs(pid int, regsout *PtraceRegs) (err error) {
return ptrace(PTRACE_GETREGS, pid, 0, uintptr(unsafe.Pointer(regsout)))
}
......
......@@ -796,6 +796,75 @@ func ConnectEx(fd Handle, sa Sockaddr, sendBuf *byte, sendDataLen uint32, bytesS
return connectEx(fd, ptr, n, sendBuf, sendDataLen, bytesSent, overlapped)
}
var sendRecvMsgFunc struct {
once sync.Once
sendAddr uintptr
recvAddr uintptr
err error
}
func loadWSASendRecvMsg() error {
sendRecvMsgFunc.once.Do(func() {
var s Handle
s, sendRecvMsgFunc.err = Socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
if sendRecvMsgFunc.err != nil {
return
}
defer CloseHandle(s)
var n uint32
sendRecvMsgFunc.err = WSAIoctl(s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
(*byte)(unsafe.Pointer(&WSAID_WSARECVMSG)),
uint32(unsafe.Sizeof(WSAID_WSARECVMSG)),
(*byte)(unsafe.Pointer(&sendRecvMsgFunc.recvAddr)),
uint32(unsafe.Sizeof(sendRecvMsgFunc.recvAddr)),
&n, nil, 0)
if sendRecvMsgFunc.err != nil {
return
}
sendRecvMsgFunc.err = WSAIoctl(s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
(*byte)(unsafe.Pointer(&WSAID_WSASENDMSG)),
uint32(unsafe.Sizeof(WSAID_WSASENDMSG)),
(*byte)(unsafe.Pointer(&sendRecvMsgFunc.sendAddr)),
uint32(unsafe.Sizeof(sendRecvMsgFunc.sendAddr)),
&n, nil, 0)
})
return sendRecvMsgFunc.err
}
func WSASendMsg(fd Handle, msg *WSAMsg, flags uint32, bytesSent *uint32, overlapped *Overlapped, croutine *byte) error {
err := loadWSASendRecvMsg()
if err != nil {
return err
}
r1, _, e1 := syscall.Syscall6(sendRecvMsgFunc.sendAddr, 6, uintptr(fd), uintptr(unsafe.Pointer(msg)), uintptr(flags), uintptr(unsafe.Pointer(bytesSent)), uintptr(unsafe.Pointer(overlapped)), uintptr(unsafe.Pointer(croutine)))
if r1 == socket_error {
if e1 != 0 {
err = errnoErr(e1)
} else {
err = syscall.EINVAL
}
}
return err
}
func WSARecvMsg(fd Handle, msg *WSAMsg, bytesReceived *uint32, overlapped *Overlapped, croutine *byte) error {
err := loadWSASendRecvMsg()
if err != nil {
return err
}
r1, _, e1 := syscall.Syscall6(sendRecvMsgFunc.recvAddr, 5, uintptr(fd), uintptr(unsafe.Pointer(msg)), uintptr(unsafe.Pointer(bytesReceived)), uintptr(unsafe.Pointer(overlapped)), uintptr(unsafe.Pointer(croutine)), 0)
if r1 == socket_error {
if e1 != 0 {
err = errnoErr(e1)
} else {
err = syscall.EINVAL
}
}
return err
}
// Invented structures to support what package os expects.
type Rusage struct {
CreationTime Filetime
......
......@@ -29,6 +29,7 @@ const (
ERROR_NOT_FOUND syscall.Errno = 1168
ERROR_PRIVILEGE_NOT_HELD syscall.Errno = 1314
WSAEACCES syscall.Errno = 10013
WSAEMSGSIZE syscall.Errno = 10040
WSAECONNRESET syscall.Errno = 10054
)
......@@ -567,6 +568,16 @@ const (
IPV6_JOIN_GROUP = 0xc
IPV6_LEAVE_GROUP = 0xd
MSG_OOB = 0x1
MSG_PEEK = 0x2
MSG_DONTROUTE = 0x4
MSG_WAITALL = 0x8
MSG_TRUNC = 0x0100
MSG_CTRUNC = 0x0200
MSG_BCAST = 0x0400
MSG_MCAST = 0x0800
SOMAXCONN = 0x7fffffff
TCP_NODELAY = 1
......@@ -584,6 +595,15 @@ type WSABuf struct {
Buf *byte
}
type WSAMsg struct {
Name *syscall.RawSockaddrAny
Namelen int32
Buffers *WSABuf
BufferCount uint32
Control WSABuf
Flags uint32
}
// Invented values to support what package os expects.
const (
S_IFMT = 0x1f000
......@@ -1011,6 +1031,20 @@ var WSAID_CONNECTEX = GUID{
[8]byte{0x8e, 0xe9, 0x76, 0xe5, 0x8c, 0x74, 0x06, 0x3e},
}
var WSAID_WSASENDMSG = GUID{
0xa441e712,
0x754f,
0x43ca,
[8]byte{0x84, 0xa7, 0x0d, 0xee, 0x44, 0xcf, 0x60, 0x6d},
}
var WSAID_WSARECVMSG = GUID{
0xf689d7c8,
0x6f1f,
0x436b,
[8]byte{0x8a, 0x53, 0xe5, 0x4f, 0xe3, 0x51, 0xc3, 0x22},
}
const (
FILE_SKIP_COMPLETION_PORT_ON_SUCCESS = 1
FILE_SKIP_SET_EVENT_ON_HANDLE = 2
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment