Commit 671d1706 authored by James Hartig's avatar James Hartig Committed by Miek Gieben

plugin/metrics: Switch to using promhttp instead of deprecated Handler (#1312)

prometheus.Handler is deprecated according to the godoc for the package so
instead we're using promhttp.

Additionally, we are exposing the Registry that metrics is using so other
plugins that are not inside of coredns can read the registry. Otherwise, if
we kept using the Default one, there's no way to access that from outside
of the coredns repo since it is vendored.
parent 1919913c

Too many changes to show.

To preserve performance only 1000 of 1000+ files are displayed.

......@@ -279,7 +279,7 @@
[[projects]]
name = "github.com/prometheus/client_golang"
packages = ["prometheus"]
packages = ["prometheus","prometheus/promhttp"]
revision = "c5b7fccd204277076155f10851dad72b76a49317"
version = "v0.8.0"
......@@ -380,6 +380,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "be9300a30414c93aa44756868a7906a0a295b0910a662880741bcfac58b7b679"
inputs-digest = "802b3c477e5c45a1fc7283cfc3e9ed03c4dffb4bfe9fb406d252a8d8194975d1"
solver-name = "gps-cdcl"
solver-version = 1
......@@ -5,29 +5,20 @@ import (
"log"
"net"
"net/http"
"os"
"sync"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/metrics/vars"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func init() {
prometheus.MustRegister(vars.RequestCount)
prometheus.MustRegister(vars.RequestDuration)
prometheus.MustRegister(vars.RequestSize)
prometheus.MustRegister(vars.RequestDo)
prometheus.MustRegister(vars.RequestType)
prometheus.MustRegister(vars.ResponseSize)
prometheus.MustRegister(vars.ResponseRcode)
}
// Metrics holds the prometheus configuration. The metrics' path is fixed to be /metrics
type Metrics struct {
Next plugin.Handler
Addr string
Reg *prometheus.Registry
ln net.Listener
mux *http.ServeMux
......@@ -38,7 +29,24 @@ type Metrics struct {
// New returns a new instance of Metrics with the given address
func New(addr string) *Metrics {
return &Metrics{Addr: addr, zoneMap: make(map[string]bool)}
met := &Metrics{
Addr: addr,
Reg: prometheus.NewRegistry(),
zoneMap: make(map[string]bool),
}
// Add the default collectors
met.Reg.MustRegister(prometheus.NewGoCollector())
met.Reg.MustRegister(prometheus.NewProcessCollector(os.Getpid(), ""))
// Add all of our collectors
met.Reg.MustRegister(vars.RequestCount)
met.Reg.MustRegister(vars.RequestDuration)
met.Reg.MustRegister(vars.RequestSize)
met.Reg.MustRegister(vars.RequestDo)
met.Reg.MustRegister(vars.RequestType)
met.Reg.MustRegister(vars.ResponseSize)
met.Reg.MustRegister(vars.ResponseRcode)
return met
}
// AddZone adds zone z to m.
......@@ -77,7 +85,7 @@ func (m *Metrics) OnStartup() error {
ListenAddr = m.ln.Addr().String()
m.mux = http.NewServeMux()
m.mux.Handle("/metrics", prometheus.Handler())
m.mux.Handle("/metrics", promhttp.HandlerFor(m.Reg, promhttp.HandlerOpts{}))
go func() {
http.Serve(m.ln, m.mux)
......
PASS
BenchmarkSafe 500000 6131 ns/op
BenchmarkUsuallySafe 200000 7864 ns/op
BenchmarkUnsafe 100000 28560 ns/op
BenchmarkAllDWORD 50000 38722 ns/op
BenchmarkAllOctal 50000 40941 ns/op
BenchmarkAllHex 50000 44063 ns/op
BenchmarkAllCombined 50000 33613 ns/op
ok github.com/PuerkitoBio/purell 17.404s
# Contributing
Contributions are always welcome, both reporting issues and submitting pull requests!
### Reporting issues
Please make sure to include any potentially useful information in the issue, so we can pinpoint the issue faster without going back and forth.
- What SHA of Sarama are you running? If this is not the latest SHA on the master branch, please try if the problem persists with the latest version.
- You can set `sarama.Logger` to a [log.Logger](http://golang.org/pkg/log/#Logger) instance to capture debug output. Please include it in your issue description.
- Also look at the logs of the Kafka broker you are connected to. If you see anything out of the ordinary, please include it.
Also, please include the following information about your environment, so we can help you faster:
- What version of Kafka are you using?
- What version of Go are you using?
- What are the values of your Producer/Consumer/Client configuration?
### Submitting pull requests
We will gladly accept bug fixes, or additions to this library. Please fork this library, commit & push your changes, and open a pull request. Because this library is in production use by many people and applications, we code review all additions. To make the review process go as smooth as possible, please consider the following.
- If you plan to work on something major, please open an issue to discuss the design first.
- Don't break backwards compatibility. If you really have to, open an issue to discuss this first.
- Make sure to use the `go fmt` command to format your code according to the standards. Even better, set up your editor to do this for you when saving.
- Run [go vet](https://godoc.org/golang.org/x/tools/cmd/vet) to detect any suspicious constructs in your code that could be bugs.
- Explicitly handle all error return values. If you really want to ignore an error value, you can assign it to `_`.You can use [errcheck](https://github.com/kisielk/errcheck) to verify whether you have handled all errors.
- You may also want to run [golint](https://github.com/golang/lint) as well to detect style problems.
- Add tests that cover the changes you made. Make sure to run `go test` with the `-race` argument to test for race conditions.
- Make sure your code is supported by all the Go versions we support. You can rely on [Travis CI](https://travis-ci.org/Shopify/sarama) for testing older Go versions
##### Versions
*Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.*
Sarama Version:
Kafka Version:
Go Version:
##### Configuration
What configuration values are you using for Sarama and Kafka?
##### Logs
When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set `sarama.Logger` to a `log.Logger` to capture Sarama debug
output.
##### Problem Description
# Sarama examples
This folder contains example applications to demonstrate the use of Sarama. For code snippet examples on how to use the different types in Sarama, see [Sarama's API documentation on godoc.org](https://godoc.org/github.com/Shopify/sarama)
In these examples, we use `github.com/Shopify/sarama` as import path. We do this to ensure all the examples are up to date with the latest changes in Sarama. For your own applications, you may want to use `gopkg.in/Shopify/sarama.v1` to lock into a stable API version.
#### HTTP server
[http_server](./http_server) is a simple HTTP server uses both the sync producer to produce data as part of the request handling cycle, as well as the async producer to maintain an access log. It also uses the [mocks subpackage](https://godoc.org/github.com/Shopify/sarama/mocks) to test both.
# HTTP server example
This HTTP server example shows you how to use the AsyncProducer and SyncProducer, and how to test them using mocks. The server simply sends the data of the HTTP request's query string to Kafka, and send a 200 result if that succeeds. For every request, it will send an access log entry to Kafka as well in the background.
If you need to know whether a message was successfully sent to the Kafka cluster before you can send your HTTP response, using the `SyncProducer` is probably the simplest way to achieve this. If you don't care, e.g. for the access log, using the `AsyncProducer` will let you fire and forget. You can send the HTTP response, while the message is being produced in the background.
One important thing to note is that both the `SyncProducer` and `AsyncProducer` are **thread-safe**. Go's `http.Server` handles requests concurrently in different goroutines, but you can use a single producer safely. This will actually achieve efficiency gains as the producer will be able to batch messages from concurrent requests together.
package main
import (
"github.com/Shopify/sarama"
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"time"
)
var (
addr = flag.String("addr", ":8080", "The address to bind to")
brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
keyFile = flag.String("key", "", "The optional key file for client authentication")
caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
)
func main() {
flag.Parse()
if *verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
if *brokers == "" {
flag.PrintDefaults()
os.Exit(1)
}
brokerList := strings.Split(*brokers, ",")
log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
server := &Server{
DataCollector: newDataCollector(brokerList),
AccessLogProducer: newAccessLogProducer(brokerList),
}
defer func() {
if err := server.Close(); err != nil {
log.Println("Failed to close server", err)
}
}()
log.Fatal(server.Run(*addr))
}
func createTlsConfiguration() (t *tls.Config) {
if *certFile != "" && *keyFile != "" && *caFile != "" {
cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
if err != nil {
log.Fatal(err)
}
caCert, err := ioutil.ReadFile(*caFile)
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: *verifySsl,
}
}
// will be nil by default if nothing is provided
return t
}
type Server struct {
DataCollector sarama.SyncProducer
AccessLogProducer sarama.AsyncProducer
}
func (s *Server) Close() error {
if err := s.DataCollector.Close(); err != nil {
log.Println("Failed to shut down data collector cleanly", err)
}
if err := s.AccessLogProducer.Close(); err != nil {
log.Println("Failed to shut down access log producer cleanly", err)
}
return nil
}
func (s *Server) Handler() http.Handler {
return s.withAccessLog(s.collectQueryStringData())
}
func (s *Server) Run(addr string) error {
httpServer := &http.Server{
Addr: addr,
Handler: s.Handler(),
}
log.Printf("Listening for requests on %s...\n", addr)
return httpServer.ListenAndServe()
}
func (s *Server) collectQueryStringData() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
// We are not setting a message key, which means that all messages will
// be distributed randomly over the different partitions.
partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
Topic: "important",
Value: sarama.StringEncoder(r.URL.RawQuery),
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Failed to store your data:, %s", err)
} else {
// The tuple (topic, partition, offset) can be used as a unique identifier
// for a message in a Kafka cluster.
fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
}
})
}
type accessLogEntry struct {
Method string `json:"method"`
Host string `json:"host"`
Path string `json:"path"`
IP string `json:"ip"`
ResponseTime float64 `json:"response_time"`
encoded []byte
err error
}
func (ale *accessLogEntry) ensureEncoded() {
if ale.encoded == nil && ale.err == nil {
ale.encoded, ale.err = json.Marshal(ale)
}
}
func (ale *accessLogEntry) Length() int {
ale.ensureEncoded()
return len(ale.encoded)
}
func (ale *accessLogEntry) Encode() ([]byte, error) {
ale.ensureEncoded()
return ale.encoded, ale.err
}
func (s *Server) withAccessLog(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
started := time.Now()
next.ServeHTTP(w, r)
entry := &accessLogEntry{
Method: r.Method,
Host: r.Host,
Path: r.RequestURI,
IP: r.RemoteAddr,
ResponseTime: float64(time.Since(started)) / float64(time.Second),
}
// We will use the client's IP address as key. This will cause
// all the access log entries of the same IP address to end up
// on the same partition.
s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
Topic: "access_log",
Key: sarama.StringEncoder(r.RemoteAddr),
Value: entry,
}
})
}
func newDataCollector(brokerList []string) sarama.SyncProducer {
// For the data collector, we are looking for strong consistency semantics.
// Because we don't change the flush settings, sarama will try to produce messages
// as fast as possible to keep latency low.
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
config.Producer.Return.Successes = true
tlsConfig := createTlsConfiguration()
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
// On the broker side, you may want to change the following settings to get
// stronger consistency guarantees:
// - For your broker, set `unclean.leader.election.enable` to false
// - For the topic, you could increase `min.insync.replicas`.
producer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
return producer
}
func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
// For the access log, we are looking for AP semantics, with high throughput.
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
config := sarama.NewConfig()
tlsConfig := createTlsConfiguration()
if tlsConfig != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()
return producer
}
package main
import (
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
)
// In normal operation, we expect one access log entry,
// and one data collector entry. Let's assume both will succeed.
// We should return a HTTP 200 status.
func TestCollectSuccessfully(t *testing.T) {
dataCollectorMock := mocks.NewSyncProducer(t, nil)
dataCollectorMock.ExpectSendMessageAndSucceed()
accessLogProducerMock := mocks.NewAsyncProducer(t, nil)
accessLogProducerMock.ExpectInputAndSucceed()
// Now, use dependency injection to use the mocks.
s := &Server{
DataCollector: dataCollectorMock,
AccessLogProducer: accessLogProducerMock,
}
// The Server's Close call is important; it will call Close on
// the two mock producers, which will then validate whether all
// expectations are resolved.
defer safeClose(t, s)
req, err := http.NewRequest("GET", "http://example.com/?data", nil)
if err != nil {
t.Fatal(err)
}
res := httptest.NewRecorder()
s.Handler().ServeHTTP(res, req)
if res.Code != 200 {
t.Errorf("Expected HTTP status 200, found %d", res.Code)
}
if string(res.Body.Bytes()) != "Your data is stored with unique identifier important/0/1" {
t.Error("Unexpected response body", res.Body)
}
}
// Now, let's see if we handle the case of not being able to produce
// to the data collector properly. In this case we should return a 500 status.
func TestCollectionFailure(t *testing.T) {
dataCollectorMock := mocks.NewSyncProducer(t, nil)
dataCollectorMock.ExpectSendMessageAndFail(sarama.ErrRequestTimedOut)
accessLogProducerMock := mocks.NewAsyncProducer(t, nil)
accessLogProducerMock.ExpectInputAndSucceed()
s := &Server{
DataCollector: dataCollectorMock,
AccessLogProducer: accessLogProducerMock,
}
defer safeClose(t, s)
req, err := http.NewRequest("GET", "http://example.com/?data", nil)
if err != nil {
t.Fatal(err)
}
res := httptest.NewRecorder()
s.Handler().ServeHTTP(res, req)
if res.Code != 500 {
t.Errorf("Expected HTTP status 500, found %d", res.Code)
}
}
// We don't expect any data collector calls because the path is wrong,
// so we are not setting any expectations on the dataCollectorMock. It
// will still generate an access log entry though.
func TestWrongPath(t *testing.T) {
dataCollectorMock := mocks.NewSyncProducer(t, nil)
accessLogProducerMock := mocks.NewAsyncProducer(t, nil)
accessLogProducerMock.ExpectInputAndSucceed()
s := &Server{
DataCollector: dataCollectorMock,
AccessLogProducer: accessLogProducerMock,
}
defer safeClose(t, s)
req, err := http.NewRequest("GET", "http://example.com/wrong?data", nil)
if err != nil {
t.Fatal(err)
}
res := httptest.NewRecorder()
s.Handler().ServeHTTP(res, req)
if res.Code != 404 {
t.Errorf("Expected HTTP status 404, found %d", res.Code)
}
}
func safeClose(t *testing.T, o io.Closer) {
if err := o.Close(); err != nil {
t.Error(err)
}
}
# sarama/mocks
The `mocks` subpackage includes mock implementations that implement the interfaces of the major sarama types.
You can use them to test your sarama applications using dependency injection.
The following mock objects are available:
- [Consumer](https://godoc.org/github.com/Shopify/sarama/mocks#Consumer), which will create [PartitionConsumer](https://godoc.org/github.com/Shopify/sarama/mocks#PartitionConsumer) mocks.
- [AsyncProducer](https://godoc.org/github.com/Shopify/sarama/mocks#AsyncProducer)
- [SyncProducer](https://godoc.org/github.com/Shopify/sarama/mocks#SyncProducer)
The mocks allow you to set expectations on them. When you close the mocks, the expectations will be verified,
and the results will be reported to the `*testing.T` object you provided when creating the mock.
package mocks
import (
"sync"
"github.com/Shopify/sarama"
)
// AsyncProducer implements sarama's Producer interface for testing purposes.
// Before you can send messages to it's Input channel, you have to set expectations
// so it knows how to handle the input; it returns an error if the number of messages
// received is bigger then the number of expectations set. You can also set a
// function in each expectation so that the message value is checked by this function
// and an error is returned if the match fails.
type AsyncProducer struct {
l sync.Mutex
t ErrorReporter
expectations []*producerExpectation
closed chan struct{}
input chan *sarama.ProducerMessage
successes chan *sarama.ProducerMessage
errors chan *sarama.ProducerError
lastOffset int64
}
// NewAsyncProducer instantiates a new Producer mock. The t argument should
// be the *testing.T instance of your test method. An error will be written to it if
// an expectation is violated. The config argument is used to determine whether it
// should ack successes on the Successes channel.
func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
if config == nil {
config = sarama.NewConfig()
}
mp := &AsyncProducer{
t: t,
closed: make(chan struct{}, 0),
expectations: make([]*producerExpectation, 0),
input: make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
successes: make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
errors: make(chan *sarama.ProducerError, config.ChannelBufferSize),
}
go func() {
defer func() {
close(mp.successes)
close(mp.errors)
}()
for msg := range mp.input {
mp.l.Lock()
if mp.expectations == nil || len(mp.expectations) == 0 {
mp.expectations = nil
mp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
} else {
expectation := mp.expectations[0]
mp.expectations = mp.expectations[1:]
if expectation.CheckFunction != nil {
if val, err := msg.Value.Encode(); err != nil {
mp.t.Errorf("Input message encoding failed: %s", err.Error())
mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
} else {
err = expectation.CheckFunction(val)
if err != nil {
mp.t.Errorf("Check function returned an error: %s", err.Error())
mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
}
}
}
if expectation.Result == errProduceSuccess {
mp.lastOffset++
if config.Producer.Return.Successes {
msg.Offset = mp.lastOffset
mp.successes <- msg
}
} else {
if config.Producer.Return.Errors {
mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
}
}
}
mp.l.Unlock()
}
mp.l.Lock()
if len(mp.expectations) > 0 {
mp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(mp.expectations))
}
mp.l.Unlock()
close(mp.closed)
}()
return mp
}
////////////////////////////////////////////////
// Implement Producer interface
////////////////////////////////////////////////
// AsyncClose corresponds with the AsyncClose method of sarama's Producer implementation.
// By closing a mock producer, you also tell it that no more input will be provided, so it will
// write an error to the test state if there's any remaining expectations.
func (mp *AsyncProducer) AsyncClose() {
close(mp.input)
}
// Close corresponds with the Close method of sarama's Producer implementation.
// By closing a mock producer, you also tell it that no more input will be provided, so it will
// write an error to the test state if there's any remaining expectations.
func (mp *AsyncProducer) Close() error {
mp.AsyncClose()
<-mp.closed
return nil
}
// Input corresponds with the Input method of sarama's Producer implementation.
// You have to set expectations on the mock producer before writing messages to the Input
// channel, so it knows how to handle them. If there is no more remaining expectations and
// a messages is written to the Input channel, the mock producer will write an error to the test
// state object.
func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage {
return mp.input
}
// Successes corresponds with the Successes method of sarama's Producer implementation.
func (mp *AsyncProducer) Successes() <-chan *sarama.ProducerMessage {
return mp.successes
}
// Errors corresponds with the Errors method of sarama's Producer implementation.
func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError {
return mp.errors
}
////////////////////////////////////////////////
// Setting expectations
////////////////////////////////////////////////
// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message
// will be provided on the input channel. The mock producer will call the given function to check
// the message value. If an error is returned it will be made available on the Errors channel
// otherwise the mock will handle the message as if it produced successfully, i.e. it will make
// it available on the Successes channel if the Producer.Return.Successes setting is set to true.
func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) {
mp.l.Lock()
defer mp.l.Unlock()
mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
}
// ExpectInputWithCheckerFunctionAndFail sets an expectation on the mock producer that a message
// will be provided on the input channel. The mock producer will first call the given function to
// check the message value. If an error is returned it will be made available on the Errors channel
// otherwise the mock will handle the message as if it failed to produce successfully. This means
// it will make a ProducerError available on the Errors channel.
func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) {
mp.l.Lock()
defer mp.l.Unlock()
mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
}
// ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
// on the input channel. The mock producer will handle the message as if it is produced successfully,
// i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting
// is set to true.
func (mp *AsyncProducer) ExpectInputAndSucceed() {
mp.ExpectInputWithCheckerFunctionAndSucceed(nil)
}
// ExpectInputAndFail sets an expectation on the mock producer that a message will be provided
// on the input channel. The mock producer will handle the message as if it failed to produce
// successfully. This means it will make a ProducerError available on the Errors channel.
func (mp *AsyncProducer) ExpectInputAndFail(err error) {
mp.ExpectInputWithCheckerFunctionAndFail(nil, err)
}
package mocks
import (
"errors"
"fmt"
"regexp"
"strings"
"testing"
"github.com/Shopify/sarama"
)
func generateRegexpChecker(re string) func([]byte) error {
return func(val []byte) error {
matched, err := regexp.MatchString(re, string(val))
if err != nil {
return errors.New("Error while trying to match the input message with the expected pattern: " + err.Error())
}
if !matched {
return fmt.Errorf("No match between input value \"%s\" and expected pattern \"%s\"", val, re)
}
return nil
}
}
type testReporterMock struct {
errors []string
}
func newTestReporterMock() *testReporterMock {
return &testReporterMock{errors: make([]string, 0)}
}
func (trm *testReporterMock) Errorf(format string, args ...interface{}) {
trm.errors = append(trm.errors, fmt.Sprintf(format, args...))
}
func TestMockAsyncProducerImplementsAsyncProducerInterface(t *testing.T) {
var mp interface{} = &AsyncProducer{}
if _, ok := mp.(sarama.AsyncProducer); !ok {
t.Error("The mock producer should implement the sarama.Producer interface.")
}
}
func TestProducerReturnsExpectationsToChannels(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
mp := NewAsyncProducer(t, config)
mp.ExpectInputAndSucceed()
mp.ExpectInputAndSucceed()
mp.ExpectInputAndFail(sarama.ErrOutOfBrokers)
mp.Input() <- &sarama.ProducerMessage{Topic: "test 1"}
mp.Input() <- &sarama.ProducerMessage{Topic: "test 2"}
mp.Input() <- &sarama.ProducerMessage{Topic: "test 3"}
msg1 := <-mp.Successes()
msg2 := <-mp.Successes()
err1 := <-mp.Errors()
if msg1.Topic != "test 1" {
t.Error("Expected message 1 to be returned first")
}
if msg2.Topic != "test 2" {
t.Error("Expected message 2 to be returned second")
}
if err1.Msg.Topic != "test 3" || err1.Err != sarama.ErrOutOfBrokers {
t.Error("Expected message 3 to be returned as error")
}
if err := mp.Close(); err != nil {
t.Error(err)
}
}
func TestProducerWithTooFewExpectations(t *testing.T) {
trm := newTestReporterMock()
mp := NewAsyncProducer(trm, nil)
mp.ExpectInputAndSucceed()
mp.Input() <- &sarama.ProducerMessage{Topic: "test"}
mp.Input() <- &sarama.ProducerMessage{Topic: "test"}
if err := mp.Close(); err != nil {
t.Error(err)
}
if len(trm.errors) != 1 {
t.Error("Expected to report an error")
}
}
func TestProducerWithTooManyExpectations(t *testing.T) {
trm := newTestReporterMock()
mp := NewAsyncProducer(trm, nil)
mp.ExpectInputAndSucceed()
mp.ExpectInputAndFail(sarama.ErrOutOfBrokers)
mp.Input() <- &sarama.ProducerMessage{Topic: "test"}
if err := mp.Close(); err != nil {
t.Error(err)
}
if len(trm.errors) != 1 {
t.Error("Expected to report an error")
}
}
func TestProducerWithCheckerFunction(t *testing.T) {
trm := newTestReporterMock()
mp := NewAsyncProducer(trm, nil)
mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$"))
mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
if err := mp.Close(); err != nil {
t.Error(err)
}
if len(mp.Errors()) != 1 {
t.Error("Expected to report an error")
}
err1 := <-mp.Errors()
if !strings.HasPrefix(err1.Err.Error(), "No match") {
t.Error("Expected to report a value check error, found: ", err1.Err)
}
}
This diff is collapsed.
package mocks
import (
"sort"
"testing"
"github.com/Shopify/sarama"
)
func TestMockConsumerImplementsConsumerInterface(t *testing.T) {
var c interface{} = &Consumer{}
if _, ok := c.(sarama.Consumer); !ok {
t.Error("The mock consumer should implement the sarama.Consumer interface.")
}
var pc interface{} = &PartitionConsumer{}
if _, ok := pc.(sarama.PartitionConsumer); !ok {
t.Error("The mock partitionconsumer should implement the sarama.PartitionConsumer interface.")
}
}
func TestConsumerHandlesExpectations(t *testing.T) {
consumer := NewConsumer(t, nil)
defer func() {
if err := consumer.Close(); err != nil {
t.Error(err)
}
}()
consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")})
consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers)
consumer.ExpectConsumePartition("test", 1, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world again")})
consumer.ExpectConsumePartition("other", 0, AnyOffset).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello other")})
pc_test0, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
t.Fatal(err)
}
test0_msg := <-pc_test0.Messages()
if test0_msg.Topic != "test" || test0_msg.Partition != 0 || string(test0_msg.Value) != "hello world" {
t.Error("Message was not as expected:", test0_msg)
}
test0_err := <-pc_test0.Errors()
if test0_err.Err != sarama.ErrOutOfBrokers {
t.Error("Expected sarama.ErrOutOfBrokers, found:", test0_err.Err)
}
pc_test1, err := consumer.ConsumePartition("test", 1, sarama.OffsetOldest)
if err != nil {
t.Fatal(err)
}
test1_msg := <-pc_test1.Messages()
if test1_msg.Topic != "test" || test1_msg.Partition != 1 || string(test1_msg.Value) != "hello world again" {
t.Error("Message was not as expected:", test1_msg)
}
pc_other0, err := consumer.ConsumePartition("other", 0, sarama.OffsetNewest)
if err != nil {
t.Fatal(err)
}
other0_msg := <-pc_other0.Messages()
if other0_msg.Topic != "other" || other0_msg.Partition != 0 || string(other0_msg.Value) != "hello other" {
t.Error("Message was not as expected:", other0_msg)
}
}
func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) {
consumer := NewConsumer(t, nil)
consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers)
consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers)
pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
t.Fatal(err)
}
select {
case <-pc.Messages():
t.Error("Did not epxect a message on the messages channel.")
case err := <-pc.Errors():
if err.Err != sarama.ErrOutOfBrokers {
t.Error("Expected sarama.ErrOutOfBrokers, found", err)
}
}
errs := pc.Close().(sarama.ConsumerErrors)
if len(errs) != 1 && errs[0].Err != sarama.ErrOutOfBrokers {
t.Error("Expected Close to return the remaining sarama.ErrOutOfBrokers")
}
}
func TestConsumerWithoutExpectationsOnPartition(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, nil)
_, err := consumer.ConsumePartition("test", 1, sarama.OffsetOldest)
if err != errOutOfExpectations {
t.Error("Expected ConsumePartition to return errOutOfExpectations")
}
if err := consumer.Close(); err != nil {
t.Error("No error expected on close, but found:", err)
}
if len(trm.errors) != 1 {
t.Errorf("Expected an expectation failure to be set on the error reporter.")
}
}
func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, nil)
consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")})
if err := consumer.Close(); err != nil {
t.Error("No error expected on close, but found:", err)
}
if len(trm.errors) != 1 {
t.Errorf("Expected an expectation failure to be set on the error reporter.")
}
}
func TestConsumerWithWrongOffsetExpectation(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, nil)
consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
_, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest)
if err != nil {
t.Error("Did not expect error, found:", err)
}
if len(trm.errors) != 1 {
t.Errorf("Expected an expectation failure to be set on the error reporter.")
}
if err := consumer.Close(); err != nil {
t.Error(err)
}
}
func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, nil)
pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
pcmock.ExpectMessagesDrainedOnClose()
pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
t.Error(err)
}
// consume first message, not second one
<-pc.Messages()
if err := consumer.Close(); err != nil {
t.Error(err)
}
if len(trm.errors) != 1 {
t.Errorf("Expected an expectation failure to be set on the error reporter.")
}
}
func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, nil)
pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
pcmock.YieldError(sarama.ErrInvalidMessage)
pcmock.YieldError(sarama.ErrInvalidMessage)
pcmock.ExpectErrorsDrainedOnClose()
pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
t.Error(err)
}
// consume first and second error,
<-pc.Errors()
<-pc.Errors()
if err := consumer.Close(); err != nil {
t.Error(err)
}
if len(trm.errors) != 0 {
t.Errorf("Expected no expectation failures to be set on the error reporter.")
}
}
func TestConsumerTopicMetadata(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, nil)
consumer.SetTopicMetadata(map[string][]int32{
"test1": {0, 1, 2, 3},
"test2": {0, 1, 2, 3, 4, 5, 6, 7},
})
topics, err := consumer.Topics()
if err != nil {
t.Error(t)
}
sortedTopics := sort.StringSlice(topics)
sortedTopics.Sort()
if len(sortedTopics) != 2 || sortedTopics[0] != "test1" || sortedTopics[1] != "test2" {
t.Error("Unexpected topics returned:", sortedTopics)
}
partitions1, err := consumer.Partitions("test1")
if err != nil {
t.Error(t)
}
if len(partitions1) != 4 {
t.Error("Unexpected partitions returned:", len(partitions1))
}
partitions2, err := consumer.Partitions("test2")
if err != nil {
t.Error(t)
}
if len(partitions2) != 8 {
t.Error("Unexpected partitions returned:", len(partitions2))
}
if len(trm.errors) != 0 {
t.Errorf("Expected no expectation failures to be set on the error reporter.")
}
}
func TestConsumerUnexpectedTopicMetadata(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, nil)
if _, err := consumer.Topics(); err != sarama.ErrOutOfBrokers {
t.Error("Expected sarama.ErrOutOfBrokers, found", err)
}
if len(trm.errors) != 1 {
t.Errorf("Expected an expectation failure to be set on the error reporter.")
}
}
/*
Package mocks provides mocks that can be used for testing applications
that use Sarama. The mock types provided by this package implement the
interfaces Sarama exports, so you can use them for dependency injection
in your tests.
All mock instances require you to set expectations on them before you
can use them. It will determine how the mock will behave. If an
expectation is not met, it will make your test fail.
NOTE: this package currently does not fall under the API stability
guarantee of Sarama as it is still considered experimental.
*/
package mocks
import (
"errors"
"github.com/Shopify/sarama"
)
// ErrorReporter is a simple interface that includes the testing.T methods we use to report
// expectation violations when using the mock objects.
type ErrorReporter interface {
Errorf(string, ...interface{})
}
// ValueChecker is a function type to be set in each expectation of the producer mocks
// to check the value passed.
type ValueChecker func(val []byte) error
var (
errProduceSuccess error = nil
errOutOfExpectations = errors.New("No more expectations set on mock")
errPartitionConsumerNotStarted = errors.New("The partition consumer was never started")
)
const AnyOffset int64 = -1000
type producerExpectation struct {
Result error
CheckFunction ValueChecker
}
type consumerExpectation struct {
Err error
Msg *sarama.ConsumerMessage
}
package mocks
import (
"sync"
"github.com/Shopify/sarama"
)
// SyncProducer implements sarama's SyncProducer interface for testing purposes.
// Before you can use it, you have to set expectations on the mock SyncProducer
// to tell it how to handle calls to SendMessage, so you can easily test success
// and failure scenarios.
type SyncProducer struct {
l sync.Mutex
t ErrorReporter
expectations []*producerExpectation
lastOffset int64
}
// NewSyncProducer instantiates a new SyncProducer mock. The t argument should
// be the *testing.T instance of your test method. An error will be written to it if
// an expectation is violated. The config argument is currently unused, but is
// maintained to be compatible with the async Producer.
func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
return &SyncProducer{
t: t,
expectations: make([]*producerExpectation, 0),
}
}
////////////////////////////////////////////////
// Implement SyncProducer interface
////////////////////////////////////////////////
// SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
// You have to set expectations on the mock producer before calling SendMessage, so it knows
// how to handle them. You can set a function in each expectation so that the message value
// checked by this function and an error is returned if the match fails.
// If there is no more remaining expectation when SendMessage is called,
// the mock producer will write an error to the test state object.
func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
sp.l.Lock()
defer sp.l.Unlock()
if len(sp.expectations) > 0 {
expectation := sp.expectations[0]
sp.expectations = sp.expectations[1:]
if expectation.CheckFunction != nil {
val, err := msg.Value.Encode()
if err != nil {
sp.t.Errorf("Input message encoding failed: %s", err.Error())
return -1, -1, err
}
errCheck := expectation.CheckFunction(val)
if errCheck != nil {
sp.t.Errorf("Check function returned an error: %s", errCheck.Error())
return -1, -1, errCheck
}
}
if expectation.Result == errProduceSuccess {
sp.lastOffset++
msg.Offset = sp.lastOffset
return 0, msg.Offset, nil
}
return -1, -1, expectation.Result
}
sp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
return -1, -1, errOutOfExpectations
}
// SendMessages corresponds with the SendMessages method of sarama's SyncProducer implementation.
// You have to set expectations on the mock producer before calling SendMessages, so it knows
// how to handle them. If there is no more remaining expectations when SendMessages is called,
// the mock producer will write an error to the test state object.
func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
sp.l.Lock()
defer sp.l.Unlock()
if len(sp.expectations) >= len(msgs) {
expectations := sp.expectations[0 : len(msgs)-1]
sp.expectations = sp.expectations[len(msgs):]
for _, expectation := range expectations {
if expectation.Result != errProduceSuccess {
return expectation.Result
}
}
return nil
}
sp.t.Errorf("Insufficient expectations set on this mock producer to handle the input messages.")
return errOutOfExpectations
}
// Close corresponds with the Close method of sarama's SyncProducer implementation.
// By closing a mock syncproducer, you also tell it that no more SendMessage calls will follow,
// so it will write an error to the test state if there's any remaining expectations.
func (sp *SyncProducer) Close() error {
sp.l.Lock()
defer sp.l.Unlock()
if len(sp.expectations) > 0 {
sp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(sp.expectations))
}
return nil
}
////////////////////////////////////////////////
// Setting expectations
////////////////////////////////////////////////
// ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage
// will be called. The mock producer will first call the given function to check the message value.
// It will cascade the error of the function, if any, or handle the message as if it produced
// successfully, i.e. by returning a valid partition, and offset, and a nil error.
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
}
// ExpectSendMessageWithCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will first call the given function to check the message value.
// It will cascade the error of the function, if any, or handle the message as if it failed
// to produce successfully, i.e. by returning the provided error.
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
}
// ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will handle the message as if it produced successfully, i.e. by
// returning a valid partition, and offset, and a nil error.
func (sp *SyncProducer) ExpectSendMessageAndSucceed() {
sp.ExpectSendMessageWithCheckerFunctionAndSucceed(nil)
}
// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will handle the message as if it failed to produce
// successfully, i.e. by returning the provided error.
func (sp *SyncProducer) ExpectSendMessageAndFail(err error) {
sp.ExpectSendMessageWithCheckerFunctionAndFail(nil, err)
}
package mocks
import (
"strings"
"testing"
"github.com/Shopify/sarama"
)
func TestMockSyncProducerImplementsSyncProducerInterface(t *testing.T) {
var mp interface{} = &SyncProducer{}
if _, ok := mp.(sarama.SyncProducer); !ok {
t.Error("The mock async producer should implement the sarama.SyncProducer interface.")
}
}
func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) {
sp := NewSyncProducer(t, nil)
defer func() {
if err := sp.Close(); err != nil {
t.Error(err)
}
}()
sp.ExpectSendMessageAndSucceed()
sp.ExpectSendMessageAndSucceed()
sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
_, offset, err := sp.SendMessage(msg)
if err != nil {
t.Errorf("The first message should have been produced successfully, but got %s", err)
}
if offset != 1 || offset != msg.Offset {
t.Errorf("The first message should have been assigned offset 1, but got %d", msg.Offset)
}
_, offset, err = sp.SendMessage(msg)
if err != nil {
t.Errorf("The second message should have been produced successfully, but got %s", err)
}
if offset != 2 || offset != msg.Offset {
t.Errorf("The second message should have been assigned offset 2, but got %d", offset)
}
_, _, err = sp.SendMessage(msg)
if err != sarama.ErrOutOfBrokers {
t.Errorf("The third message should not have been produced successfully")
}
if err := sp.Close(); err != nil {
t.Error(err)
}
}
func TestSyncProducerWithTooManyExpectations(t *testing.T) {
trm := newTestReporterMock()
sp := NewSyncProducer(trm, nil)
sp.ExpectSendMessageAndSucceed()
sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
if _, _, err := sp.SendMessage(msg); err != nil {
t.Error("No error expected on first SendMessage call", err)
}
if err := sp.Close(); err != nil {
t.Error(err)
}
if len(trm.errors) != 1 {
t.Error("Expected to report an error")
}
}
func TestSyncProducerWithTooFewExpectations(t *testing.T) {
trm := newTestReporterMock()
sp := NewSyncProducer(trm, nil)
sp.ExpectSendMessageAndSucceed()
msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
if _, _, err := sp.SendMessage(msg); err != nil {
t.Error("No error expected on first SendMessage call", err)
}
if _, _, err := sp.SendMessage(msg); err != errOutOfExpectations {
t.Error("errOutOfExpectations expected on second SendMessage call, found:", err)
}
if err := sp.Close(); err != nil {
t.Error(err)
}
if len(trm.errors) != 1 {
t.Error("Expected to report an error")
}
}
func TestSyncProducerWithCheckerFunction(t *testing.T) {
trm := newTestReporterMock()
sp := NewSyncProducer(trm, nil)
sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$"))
msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
if _, _, err := sp.SendMessage(msg); err != nil {
t.Error("No error expected on first SendMessage call, found: ", err)
}
msg = &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
if _, _, err := sp.SendMessage(msg); err == nil || !strings.HasPrefix(err.Error(), "No match") {
t.Error("Error during value check expected on second SendMessage call, found:", err)
}
if err := sp.Close(); err != nil {
t.Error(err)
}
if len(trm.errors) != 1 {
t.Error("Expected to report an error")
}
}
# Sarama tools
This folder contains applications that are useful for exploration of your Kafka cluster, or instrumentation.
Some of these tools mirror tools that ship with Kafka, but these tools won't require installing the JVM to function.
- [kafka-console-producer](./kafka-console-producer): a command line tool to produce a single message to your Kafka custer.
- [kafka-console-partitionconsumer](./kafka-console-partitionconsumer): (deprecated) a command line tool to consume a single partition of a topic on your Kafka cluster.
- [kafka-console-consumer](./kafka-console-consumer): a command line tool to consume arbitrary partitions of a topic on your Kafka cluster.
To install all tools, run `go get github.com/Shopify/sarama/tools/...`
kafka-console-consumer
kafka-console-consumer.test
# kafka-console-consumer
A simple command line tool to consume partitions of a topic and print the
messages on the standard output.
### Installation
go get github.com/Shopify/sarama/tools/kafka-console-consumer
### Usage
# Minimum invocation
kafka-console-consumer -topic=test -brokers=kafka1:9092
# It will pick up a KAFKA_PEERS environment variable
export KAFKA_PEERS=kafka1:9092,kafka2:9092,kafka3:9092
kafka-console-consumer -topic=test
# You can specify the offset you want to start at. It can be either
# `oldest`, `newest`. The default is `newest`.
kafka-console-consumer -topic=test -offset=oldest
kafka-console-consumer -topic=test -offset=newest
# You can specify the partition(s) you want to consume as a comma-separated
# list. The default is `all`.
kafka-console-consumer -topic=test -partitions=1,2,3
# Display all command line options
kafka-console-consumer -help
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"github.com/Shopify/sarama"
)
var (
brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
topic = flag.String("topic", "", "REQUIRED: the topic to consume")
partitions = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers")
offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging")
bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.")
logger = log.New(os.Stderr, "", log.LstdFlags)
)
func main() {
flag.Parse()
if *brokerList == "" {
printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
}
if *topic == "" {
printUsageErrorAndExit("-topic is required")
}
if *verbose {
sarama.Logger = logger
}
var initialOffset int64
switch *offset {
case "oldest":
initialOffset = sarama.OffsetOldest
case "newest":
initialOffset = sarama.OffsetNewest
default:
printUsageErrorAndExit("-offset should be `oldest` or `newest`")
}
c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
if err != nil {
printErrorAndExit(69, "Failed to start consumer: %s", err)
}
partitionList, err := getPartitions(c)
if err != nil {
printErrorAndExit(69, "Failed to get the list of partitions: %s", err)
}
var (
messages = make(chan *sarama.ConsumerMessage, *bufferSize)
closing = make(chan struct{})
wg sync.WaitGroup
)
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Kill, os.Interrupt)
<-signals
logger.Println("Initiating shutdown of consumer...")
close(closing)
}()
for _, partition := range partitionList {
pc, err := c.ConsumePartition(*topic, partition, initialOffset)
if err != nil {
printErrorAndExit(69, "Failed to start consumer for partition %d: %s", partition, err)
}
go func(pc sarama.PartitionConsumer) {
<-closing
pc.AsyncClose()
}(pc)
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for message := range pc.Messages() {
messages <- message
}
}(pc)
}
go func() {
for msg := range messages {
fmt.Printf("Partition:\t%d\n", msg.Partition)
fmt.Printf("Offset:\t%d\n", msg.Offset)
fmt.Printf("Key:\t%s\n", string(msg.Key))
fmt.Printf("Value:\t%s\n", string(msg.Value))
fmt.Println()
}
}()
wg.Wait()
logger.Println("Done consuming topic", *topic)
close(messages)
if err := c.Close(); err != nil {
logger.Println("Failed to close consumer: ", err)
}
}
func getPartitions(c sarama.Consumer) ([]int32, error) {
if *partitions == "all" {
return c.Partitions(*topic)
}
tmp := strings.Split(*partitions, ",")
var pList []int32
for i := range tmp {
val, err := strconv.ParseInt(tmp[i], 10, 32)
if err != nil {
return nil, err
}
pList = append(pList, int32(val))
}
return pList, nil
}
func printErrorAndExit(code int, format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
os.Exit(code)
}
func printUsageErrorAndExit(format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Available command line options:")
flag.PrintDefaults()
os.Exit(64)
}
kafka-console-partitionconsumer
kafka-console-partitionconsumer.test
# kafka-console-partitionconsumer
NOTE: this tool is deprecated in favour of the more general and more powerful
`kafka-console-consumer`.
A simple command line tool to consume a partition of a topic and print the messages
on the standard output.
### Installation
go get github.com/Shopify/sarama/tools/kafka-console-partitionconsumer
### Usage
# Minimum invocation
kafka-console-partitionconsumer -topic=test -partition=4 -brokers=kafka1:9092
# It will pick up a KAFKA_PEERS environment variable
export KAFKA_PEERS=kafka1:9092,kafka2:9092,kafka3:9092
kafka-console-partitionconsumer -topic=test -partition=4
# You can specify the offset you want to start at. It can be either
# `oldest`, `newest`, or a specific offset number
kafka-console-partitionconsumer -topic=test -partition=3 -offset=oldest
kafka-console-partitionconsumer -topic=test -partition=2 -offset=1337
# Display all command line options
kafka-console-partitionconsumer -help
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"github.com/Shopify/sarama"
)
var (
brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
topic = flag.String("topic", "", "REQUIRED: the topic to consume")
partition = flag.Int("partition", -1, "REQUIRED: the partition to consume")
offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`, or an actual offset")
verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging")
logger = log.New(os.Stderr, "", log.LstdFlags)
)
func main() {
flag.Parse()
if *brokerList == "" {
printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
}
if *topic == "" {
printUsageErrorAndExit("-topic is required")
}
if *partition == -1 {
printUsageErrorAndExit("-partition is required")
}
if *verbose {
sarama.Logger = logger
}
var (
initialOffset int64
offsetError error
)
switch *offset {
case "oldest":
initialOffset = sarama.OffsetOldest
case "newest":
initialOffset = sarama.OffsetNewest
default:
initialOffset, offsetError = strconv.ParseInt(*offset, 10, 64)
}
if offsetError != nil {
printUsageErrorAndExit("Invalid initial offset: %s", *offset)
}
c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
if err != nil {
printErrorAndExit(69, "Failed to start consumer: %s", err)
}
pc, err := c.ConsumePartition(*topic, int32(*partition), initialOffset)
if err != nil {
printErrorAndExit(69, "Failed to start partition consumer: %s", err)
}
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Kill, os.Interrupt)
<-signals
pc.AsyncClose()
}()
for msg := range pc.Messages() {
fmt.Printf("Offset:\t%d\n", msg.Offset)
fmt.Printf("Key:\t%s\n", string(msg.Key))
fmt.Printf("Value:\t%s\n", string(msg.Value))
fmt.Println()
}
if err := c.Close(); err != nil {
logger.Println("Failed to close consumer: ", err)
}
}
func printErrorAndExit(code int, format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
os.Exit(code)
}
func printUsageErrorAndExit(format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Available command line options:")
flag.PrintDefaults()
os.Exit(64)
}
kafka-console-producer
kafka-console-producer.test
# kafka-console-producer
A simple command line tool to produce a single message to Kafka.
### Installation
go get github.com/Shopify/sarama/tools/kafka-console-producer
### Usage
# Minimum invocation
kafka-console-producer -topic=test -value=value -brokers=kafka1:9092
# It will pick up a KAFKA_PEERS environment variable
export KAFKA_PEERS=kafka1:9092,kafka2:9092,kafka3:9092
kafka-console-producer -topic=test -value=value
# It will read the value from stdin by using pipes
echo "hello world" | kafka-console-producer -topic=test
# Specify a key:
echo "hello world" | kafka-console-producer -topic=test -key=key
# Partitioning: by default, kafka-console-producer will partition as follows:
# - manual partitioning if a -partition is provided
# - hash partitioning by key if a -key is provided
# - random partioning otherwise.
#
# You can override this using the -partitioner argument:
echo "hello world" | kafka-console-producer -topic=test -key=key -partitioner=random
# Display all command line options
kafka-console-producer -help
package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"github.com/Shopify/sarama"
"github.com/rcrowley/go-metrics"
)
var (
brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
topic = flag.String("topic", "", "REQUIRED: the topic to produce to")
key = flag.String("key", "", "The key of the message to produce. Can be empty.")
value = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
partition = flag.Int("partition", -1, "The partition to produce to.")
verbose = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
silent = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
logger = log.New(os.Stderr, "", log.LstdFlags)
)
func main() {
flag.Parse()
if *brokerList == "" {
printUsageErrorAndExit("no -brokers specified. Alternatively, set the KAFKA_PEERS environment variable")
}
if *topic == "" {
printUsageErrorAndExit("no -topic specified")
}
if *verbose {
sarama.Logger = logger
}
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
switch *partitioner {
case "":
if *partition >= 0 {
config.Producer.Partitioner = sarama.NewManualPartitioner
} else {
config.Producer.Partitioner = sarama.NewHashPartitioner
}
case "hash":
config.Producer.Partitioner = sarama.NewHashPartitioner
case "random":
config.Producer.Partitioner = sarama.NewRandomPartitioner
case "manual":
config.Producer.Partitioner = sarama.NewManualPartitioner
if *partition == -1 {
printUsageErrorAndExit("-partition is required when partitioning manually")
}
default:
printUsageErrorAndExit(fmt.Sprintf("Partitioner %s not supported.", *partitioner))
}
message := &sarama.ProducerMessage{Topic: *topic, Partition: int32(*partition)}
if *key != "" {
message.Key = sarama.StringEncoder(*key)
}
if *value != "" {
message.Value = sarama.StringEncoder(*value)
} else if stdinAvailable() {
bytes, err := ioutil.ReadAll(os.Stdin)
if err != nil {
printErrorAndExit(66, "Failed to read data from the standard input: %s", err)
}
message.Value = sarama.ByteEncoder(bytes)
} else {
printUsageErrorAndExit("-value is required, or you have to provide the value on stdin")
}
producer, err := sarama.NewSyncProducer(strings.Split(*brokerList, ","), config)
if err != nil {
printErrorAndExit(69, "Failed to open Kafka producer: %s", err)
}
defer func() {
if err := producer.Close(); err != nil {
logger.Println("Failed to close Kafka producer cleanly:", err)
}
}()
partition, offset, err := producer.SendMessage(message)
if err != nil {
printErrorAndExit(69, "Failed to produce message: %s", err)
} else if !*silent {
fmt.Printf("topic=%s\tpartition=%d\toffset=%d\n", *topic, partition, offset)
}
if *showMetrics {
metrics.WriteOnce(config.MetricRegistry, os.Stderr)
}
}
func printErrorAndExit(code int, format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
os.Exit(code)
}
func printUsageErrorAndExit(message string) {
fmt.Fprintln(os.Stderr, "ERROR:", message)
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Available command line options:")
flag.PrintDefaults()
os.Exit(64)
}
func stdinAvailable() bool {
stat, _ := os.Stdin.Stat()
return (stat.Mode() & os.ModeCharDevice) == 0
}
#!/bin/sh
set -ex
# Launch and wait for toxiproxy
${REPOSITORY_ROOT}/vagrant/run_toxiproxy.sh &
while ! nc -q 1 localhost 2181 </dev/null; do echo "Waiting"; sleep 1; done
while ! nc -q 1 localhost 9092 </dev/null; do echo "Waiting"; sleep 1; done
# Launch and wait for Zookeeper
for i in 1 2 3 4 5; do
KAFKA_PORT=`expr $i + 9090`
cd ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} && bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
done
while ! nc -q 1 localhost 21805 </dev/null; do echo "Waiting"; sleep 1; done
# Launch and wait for Kafka
for i in 1 2 3 4 5; do
KAFKA_PORT=`expr $i + 9090`
cd ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} && bin/kafka-server-start.sh -daemon config/server.properties
done
while ! nc -q 1 localhost 29095 </dev/null; do echo "Waiting"; sleep 1; done
#!/bin/sh
set -ex
cd ${KAFKA_INSTALL_ROOT}/kafka-9092
bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic test.1 --zookeeper localhost:2181
bin/kafka-topics.sh --create --partitions 4 --replication-factor 3 --topic test.4 --zookeeper localhost:2181
bin/kafka-topics.sh --create --partitions 64 --replication-factor 3 --topic test.64 --zookeeper localhost:2181
#!/bin/sh
set -ex
TOXIPROXY_VERSION=2.0.0
mkdir -p ${KAFKA_INSTALL_ROOT}
if [ ! -f ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz ]; then
wget --quiet http://apache.mirror.gtcomm.net/kafka/${KAFKA_VERSION}/kafka_2.11-${KAFKA_VERSION}.tgz -O ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz
fi
if [ ! -f ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ]; then
wget --quiet https://github.com/Shopify/toxiproxy/releases/download/v${TOXIPROXY_VERSION}/toxiproxy-server-linux-amd64 -O ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION}
chmod +x ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION}
fi
rm -f ${KAFKA_INSTALL_ROOT}/toxiproxy
ln -s ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ${KAFKA_INSTALL_ROOT}/toxiproxy
for i in 1 2 3 4 5; do
ZK_PORT=`expr $i + 2180`
ZK_PORT_REAL=`expr $i + 21800`
KAFKA_PORT=`expr $i + 9090`
KAFKA_PORT_REAL=`expr $i + 29090`
# unpack kafka
mkdir -p ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}
tar xzf ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz -C ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} --strip-components 1
# broker configuration
cp ${REPOSITORY_ROOT}/vagrant/server.properties ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/
sed -i s/KAFKAID/${KAFKA_PORT}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties
sed -i s/KAFKAPORT/${KAFKA_PORT_REAL}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties
sed -i s/KAFKA_HOSTNAME/${KAFKA_HOSTNAME}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties
sed -i s/ZK_PORT/${ZK_PORT}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties
KAFKA_DATADIR="${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/data"
mkdir -p ${KAFKA_DATADIR}
sed -i s#KAFKA_DATADIR#${KAFKA_DATADIR}#g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties
# zookeeper configuration
cp ${REPOSITORY_ROOT}/vagrant/zookeeper.properties ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/
sed -i s/KAFKAID/${KAFKA_PORT}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties
sed -i s/ZK_PORT/${ZK_PORT_REAL}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties
ZK_DATADIR="${KAFKA_INSTALL_ROOT}/zookeeper-${ZK_PORT}"
mkdir -p ${ZK_DATADIR}
sed -i s#ZK_DATADIR#${ZK_DATADIR}#g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties
echo $i > ${KAFKA_INSTALL_ROOT}/zookeeper-${ZK_PORT}/myid
done
start on started zookeeper-ZK_PORT
stop on stopping zookeeper-ZK_PORT
# Use a script instead of exec (using env stanza leaks KAFKA_HEAP_OPTS from zookeeper)
script
sleep 2
export KAFKA_HEAP_OPTS="-Xmx320m"
exec /opt/kafka-KAFKAID/bin/kafka-server-start.sh /opt/kafka-KAFKAID/config/server.properties
end script
#!/bin/sh
set -ex
apt-get update
yes | apt-get install default-jre
export KAFKA_INSTALL_ROOT=/opt
export KAFKA_HOSTNAME=192.168.100.67
export KAFKA_VERSION=0.9.0.1
export REPOSITORY_ROOT=/vagrant
sh /vagrant/vagrant/install_cluster.sh
sh /vagrant/vagrant/setup_services.sh
sh /vagrant/vagrant/create_topics.sh
#!/bin/sh
set -ex
${KAFKA_INSTALL_ROOT}/toxiproxy -port 8474 -host 0.0.0.0 &
PID=$!
while ! nc -q 1 localhost 8474 </dev/null; do echo "Waiting"; sleep 1; done
wget -O/dev/null -S --post-data='{"name":"zk1", "upstream":"localhost:21801", "listen":"0.0.0.0:2181"}' localhost:8474/proxies
wget -O/dev/null -S --post-data='{"name":"zk2", "upstream":"localhost:21802", "listen":"0.0.0.0:2182"}' localhost:8474/proxies
wget -O/dev/null -S --post-data='{"name":"zk3", "upstream":"localhost:21803", "listen":"0.0.0.0:2183"}' localhost:8474/proxies
wget -O/dev/null -S --post-data='{"name":"zk4", "upstream":"localhost:21804", "listen":"0.0.0.0:2184"}' localhost:8474/proxies
wget -O/dev/null -S --post-data='{"name":"zk5", "upstream":"localhost:21805", "listen":"0.0.0.0:2185"}' localhost:8474/proxies
wget -O/dev/null -S --post-data='{"name":"kafka1", "upstream":"localhost:29091", "listen":"0.0.0.0:9091"}' localhost:8474/proxies
wget -O/dev/null -S --post-data='{"name":"kafka2", "upstream":"localhost:29092", "listen":"0.0.0.0:9092"}' localhost:8474/proxies
wget -O/dev/null -S --post-data='{"name":"kafka3", "upstream":"localhost:29093", "listen":"0.0.0.0:9093"}' localhost:8474/proxies
wget -O/dev/null -S --post-data='{"name":"kafka4", "upstream":"localhost:29094", "listen":"0.0.0.0:9094"}' localhost:8474/proxies
wget -O/dev/null -S --post-data='{"name":"kafka5", "upstream":"localhost:29095", "listen":"0.0.0.0:9095"}' localhost:8474/proxies
wait $PID
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=KAFKAID
reserved.broker.max.id=10000
############################# Socket Server Settings #############################
# The port the socket server listens on
port=KAFKAPORT
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=KAFKA_HOSTNAME
advertised.port=KAFKAID
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
# advertised.port=<port accessible by clients>
# The number of threads handling network requests
num.network.threads=2
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=KAFKA_DATADIR
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2
# Create new topics with a replication factor of 2 so failover can be tested
# more easily.
default.replication.factor=2
auto.create.topics.enable=false
delete.topic.enable=true
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
log.retention.bytes=268435456
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=268435456
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:ZK_PORT
# Timeout in ms for connecting to zookeeper
zookeeper.session.timeout.ms=3000
zookeeper.connection.timeout.ms=3000
#!/bin/sh
set -ex
stop toxiproxy || true
cp ${REPOSITORY_ROOT}/vagrant/toxiproxy.conf /etc/init/toxiproxy.conf
cp ${REPOSITORY_ROOT}/vagrant/run_toxiproxy.sh ${KAFKA_INSTALL_ROOT}/
start toxiproxy
for i in 1 2 3 4 5; do
ZK_PORT=`expr $i + 2180`
KAFKA_PORT=`expr $i + 9090`
stop zookeeper-${ZK_PORT} || true
# set up zk service
cp ${REPOSITORY_ROOT}/vagrant/zookeeper.conf /etc/init/zookeeper-${ZK_PORT}.conf
sed -i s/KAFKAID/${KAFKA_PORT}/g /etc/init/zookeeper-${ZK_PORT}.conf
# set up kafka service
cp ${REPOSITORY_ROOT}/vagrant/kafka.conf /etc/init/kafka-${KAFKA_PORT}.conf
sed -i s/KAFKAID/${KAFKA_PORT}/g /etc/init/kafka-${KAFKA_PORT}.conf
sed -i s/ZK_PORT/${ZK_PORT}/g /etc/init/kafka-${KAFKA_PORT}.conf
start zookeeper-${ZK_PORT}
done
# Wait for the last kafka node to finish booting
while ! nc -q 1 localhost 29095 </dev/null; do echo "Waiting"; sleep 1; done
start on started networking
stop on shutdown
env KAFKA_INSTALL_ROOT=/opt
exec /opt/run_toxiproxy.sh
start on started toxiproxy
stop on stopping toxiproxy
script
export KAFKA_HEAP_OPTS="-Xmx192m"
exec /opt/kafka-KAFKAID/bin/zookeeper-server-start.sh /opt/kafka-KAFKAID/config/zookeeper.properties
end script
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=ZK_DATADIR
# the port at which the clients will connect
clientPort=ZK_PORT
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
server.1=localhost:2281:2381
server.2=localhost:2282:2382
server.3=localhost:2283:2383
server.4=localhost:2284:2384
server.5=localhost:2285:2385
dnl
dnl Check Bison version
dnl AC_PROG_BISON([MIN_VERSION=2.4])
dnl
dnl Will define BISON_USE_PARSER_H_EXTENSION if Automake is < 1.11
dnl for use with .h includes.
dnl
AC_DEFUN([AC_PROG_BISON], [
if test "x$1" = "x" ; then
bison_required_version="2.4"
else
bison_required_version="$1"
fi
AC_CHECK_PROG(have_prog_bison, [bison], [yes],[no])
AC_DEFINE_UNQUOTED([BISON_VERSION], [0.0], [Bison version if bison is not available])
#Do not use *.h extension for parser header files, use newer *.hh
bison_use_parser_h_extension=false
if test "$have_prog_bison" = "yes" ; then
AC_MSG_CHECKING([for bison version >= $bison_required_version])
bison_version=`bison --version | head -n 1 | cut '-d ' -f 4`
AC_DEFINE_UNQUOTED([BISON_VERSION], [$bison_version], [Defines bison version])
if test "$bison_version" \< "$bison_required_version" ; then
BISON=:
AC_MSG_RESULT([no])
AC_MSG_ERROR([Bison version $bison_required_version or higher must be installed on the system!])
else
AC_MSG_RESULT([yes])
BISON=bison
AC_SUBST(BISON)
#Verify automake version 1.11 headers for yy files are .h, > 1.12 uses .hh
automake_version=`automake --version | head -n 1 | cut '-d ' -f 4`
AC_DEFINE_UNQUOTED([AUTOMAKE_VERSION], [$automake_version], [Defines automake version])
if test "$automake_version" \< "1.12" ; then
#Use *.h extension for parser header file
bison_use_parser_h_extension=true
echo "Automake version < 1.12"
AC_DEFINE([BISON_USE_PARSER_H_EXTENSION], [1], [Use *.h extension for parser header file])
fi
fi
else
BISON=:
AC_MSG_RESULT([NO])
fi
AM_CONDITIONAL([BISON_USE_PARSER_H_EXTENSION], [test x$bison_use_parser_h_extension = xtrue])
AC_SUBST(BISON)
])
This diff is collapsed.
# ===========================================================================
# https://www.gnu.org/software/autoconf-archive/ax_check_openssl.html
# ===========================================================================
#
# SYNOPSIS
#
# AX_CHECK_OPENSSL([action-if-found[, action-if-not-found]])
#
# DESCRIPTION
#
# Look for OpenSSL in a number of default spots, or in a user-selected
# spot (via --with-openssl). Sets
#
# OPENSSL_INCLUDES to the include directives required
# OPENSSL_LIBS to the -l directives required
# OPENSSL_LDFLAGS to the -L or -R flags required
#
# and calls ACTION-IF-FOUND or ACTION-IF-NOT-FOUND appropriately
#
# This macro sets OPENSSL_INCLUDES such that source files should use the
# openssl/ directory in include directives:
#
# #include <openssl/hmac.h>
#
# LICENSE
#
# Copyright (c) 2009,2010 Zmanda Inc. <http://www.zmanda.com/>
# Copyright (c) 2009,2010 Dustin J. Mitchell <dustin@zmanda.com>
#
# Copying and distribution of this file, with or without modification, are
# permitted in any medium without royalty provided the copyright notice
# and this notice are preserved. This file is offered as-is, without any
# warranty.
#serial 10
AU_ALIAS([CHECK_SSL], [AX_CHECK_OPENSSL])
AC_DEFUN([AX_CHECK_OPENSSL], [
found=false
AC_ARG_WITH([openssl],
[AS_HELP_STRING([--with-openssl=DIR],
[root of the OpenSSL directory])],
[
case "$withval" in
"" | y | ye | yes | n | no)
AC_MSG_ERROR([Invalid --with-openssl value])
;;
*) ssldirs="$withval"
;;
esac
], [
# if pkg-config is installed and openssl has installed a .pc file,
# then use that information and don't search ssldirs
AC_CHECK_TOOL([PKG_CONFIG], [pkg-config])
if test x"$PKG_CONFIG" != x""; then
OPENSSL_LDFLAGS=`$PKG_CONFIG openssl --libs-only-L 2>/dev/null`
if test $? = 0; then
OPENSSL_LIBS=`$PKG_CONFIG openssl --libs-only-l 2>/dev/null`
OPENSSL_INCLUDES=`$PKG_CONFIG openssl --cflags-only-I 2>/dev/null`
found=true
fi
fi
# no such luck; use some default ssldirs
if ! $found; then
ssldirs="/usr/local/ssl /usr/lib/ssl /usr/ssl /usr/pkg /usr/local /usr"
fi
]
)
# note that we #include <openssl/foo.h>, so the OpenSSL headers have to be in
# an 'openssl' subdirectory
if ! $found; then
OPENSSL_INCLUDES=
for ssldir in $ssldirs; do
AC_MSG_CHECKING([for openssl/ssl.h in $ssldir])
if test -f "$ssldir/include/openssl/ssl.h"; then
OPENSSL_INCLUDES="-I$ssldir/include"
OPENSSL_LDFLAGS="-L$ssldir/lib"
OPENSSL_LIBS="-lssl -lcrypto"
found=true
AC_MSG_RESULT([yes])
break
else
AC_MSG_RESULT([no])
fi
done
# if the file wasn't found, well, go ahead and try the link anyway -- maybe
# it will just work!
fi
# try the preprocessor and linker with our new flags,
# being careful not to pollute the global LIBS, LDFLAGS, and CPPFLAGS
AC_MSG_CHECKING([whether compiling and linking against OpenSSL works])
echo "Trying link with OPENSSL_LDFLAGS=$OPENSSL_LDFLAGS;" \
"OPENSSL_LIBS=$OPENSSL_LIBS; OPENSSL_INCLUDES=$OPENSSL_INCLUDES" >&AS_MESSAGE_LOG_FD
save_LIBS="$LIBS"
save_LDFLAGS="$LDFLAGS"
save_CPPFLAGS="$CPPFLAGS"
LDFLAGS="$LDFLAGS $OPENSSL_LDFLAGS"
LIBS="$OPENSSL_LIBS $LIBS"
CPPFLAGS="$OPENSSL_INCLUDES $CPPFLAGS"
AC_LINK_IFELSE(
[AC_LANG_PROGRAM([#include <openssl/ssl.h>], [SSL_new(NULL)])],
[
AC_MSG_RESULT([yes])
$1
], [
AC_MSG_RESULT([no])
$2
])
CPPFLAGS="$save_CPPFLAGS"
LDFLAGS="$save_LDFLAGS"
LIBS="$save_LIBS"
AC_SUBST([OPENSSL_INCLUDES])
AC_SUBST([OPENSSL_LIBS])
AC_SUBST([OPENSSL_LDFLAGS])
])
# ===========================================================================
# https://www.gnu.org/software/autoconf-archive/ax_compare_version.html
# ===========================================================================
#
# SYNOPSIS
#
# AX_COMPARE_VERSION(VERSION_A, OP, VERSION_B, [ACTION-IF-TRUE], [ACTION-IF-FALSE])
#
# DESCRIPTION
#
# This macro compares two version strings. Due to the various number of
# minor-version numbers that can exist, and the fact that string
# comparisons are not compatible with numeric comparisons, this is not
# necessarily trivial to do in a autoconf script. This macro makes doing
# these comparisons easy.
#
# The six basic comparisons are available, as well as checking equality
# limited to a certain number of minor-version levels.
#
# The operator OP determines what type of comparison to do, and can be one
# of:
#
# eq - equal (test A == B)
# ne - not equal (test A != B)
# le - less than or equal (test A <= B)
# ge - greater than or equal (test A >= B)
# lt - less than (test A < B)
# gt - greater than (test A > B)
#
# Additionally, the eq and ne operator can have a number after it to limit
# the test to that number of minor versions.
#
# eq0 - equal up to the length of the shorter version
# ne0 - not equal up to the length of the shorter version
# eqN - equal up to N sub-version levels
# neN - not equal up to N sub-version levels
#
# When the condition is true, shell commands ACTION-IF-TRUE are run,
# otherwise shell commands ACTION-IF-FALSE are run. The environment
# variable 'ax_compare_version' is always set to either 'true' or 'false'
# as well.
#
# Examples:
#
# AX_COMPARE_VERSION([3.15.7],[lt],[3.15.8])
# AX_COMPARE_VERSION([3.15],[lt],[3.15.8])
#
# would both be true.
#
# AX_COMPARE_VERSION([3.15.7],[eq],[3.15.8])
# AX_COMPARE_VERSION([3.15],[gt],[3.15.8])
#
# would both be false.
#
# AX_COMPARE_VERSION([3.15.7],[eq2],[3.15.8])
#
# would be true because it is only comparing two minor versions.
#
# AX_COMPARE_VERSION([3.15.7],[eq0],[3.15])
#
# would be true because it is only comparing the lesser number of minor
# versions of the two values.
#
# Note: The characters that separate the version numbers do not matter. An
# empty string is the same as version 0. OP is evaluated by autoconf, not
# configure, so must be a string, not a variable.
#
# The author would like to acknowledge Guido Draheim whose advice about
# the m4_case and m4_ifvaln functions make this macro only include the
# portions necessary to perform the specific comparison specified by the
# OP argument in the final configure script.
#
# LICENSE
#
# Copyright (c) 2008 Tim Toolan <toolan@ele.uri.edu>
#
# Copying and distribution of this file, with or without modification, are
# permitted in any medium without royalty provided the copyright notice
# and this notice are preserved. This file is offered as-is, without any
# warranty.
#serial 12
dnl #########################################################################
AC_DEFUN([AX_COMPARE_VERSION], [
AC_REQUIRE([AC_PROG_AWK])
# Used to indicate true or false condition
ax_compare_version=false
# Convert the two version strings to be compared into a format that
# allows a simple string comparison. The end result is that a version
# string of the form 1.12.5-r617 will be converted to the form
# 0001001200050617. In other words, each number is zero padded to four
# digits, and non digits are removed.
AS_VAR_PUSHDEF([A],[ax_compare_version_A])
A=`echo "$1" | sed -e 's/\([[0-9]]*\)/Z\1Z/g' \
-e 's/Z\([[0-9]]\)Z/Z0\1Z/g' \
-e 's/Z\([[0-9]][[0-9]]\)Z/Z0\1Z/g' \
-e 's/Z\([[0-9]][[0-9]][[0-9]]\)Z/Z0\1Z/g' \
-e 's/[[^0-9]]//g'`
AS_VAR_PUSHDEF([B],[ax_compare_version_B])
B=`echo "$3" | sed -e 's/\([[0-9]]*\)/Z\1Z/g' \
-e 's/Z\([[0-9]]\)Z/Z0\1Z/g' \
-e 's/Z\([[0-9]][[0-9]]\)Z/Z0\1Z/g' \
-e 's/Z\([[0-9]][[0-9]][[0-9]]\)Z/Z0\1Z/g' \
-e 's/[[^0-9]]//g'`
dnl # In the case of le, ge, lt, and gt, the strings are sorted as necessary
dnl # then the first line is used to determine if the condition is true.
dnl # The sed right after the echo is to remove any indented white space.
m4_case(m4_tolower($2),
[lt],[
ax_compare_version=`echo "x$A
x$B" | sed 's/^ *//' | sort -r | sed "s/x${A}/false/;s/x${B}/true/;1q"`
],
[gt],[
ax_compare_version=`echo "x$A
x$B" | sed 's/^ *//' | sort | sed "s/x${A}/false/;s/x${B}/true/;1q"`
],
[le],[
ax_compare_version=`echo "x$A
x$B" | sed 's/^ *//' | sort | sed "s/x${A}/true/;s/x${B}/false/;1q"`
],
[ge],[
ax_compare_version=`echo "x$A
x$B" | sed 's/^ *//' | sort -r | sed "s/x${A}/true/;s/x${B}/false/;1q"`
],[
dnl Split the operator from the subversion count if present.
m4_bmatch(m4_substr($2,2),
[0],[
# A count of zero means use the length of the shorter version.
# Determine the number of characters in A and B.
ax_compare_version_len_A=`echo "$A" | $AWK '{print(length)}'`
ax_compare_version_len_B=`echo "$B" | $AWK '{print(length)}'`
# Set A to no more than B's length and B to no more than A's length.
A=`echo "$A" | sed "s/\(.\{$ax_compare_version_len_B\}\).*/\1/"`
B=`echo "$B" | sed "s/\(.\{$ax_compare_version_len_A\}\).*/\1/"`
],
[[0-9]+],[
# A count greater than zero means use only that many subversions
A=`echo "$A" | sed "s/\(\([[0-9]]\{4\}\)\{m4_substr($2,2)\}\).*/\1/"`
B=`echo "$B" | sed "s/\(\([[0-9]]\{4\}\)\{m4_substr($2,2)\}\).*/\1/"`
],
[.+],[
AC_WARNING(
[illegal OP numeric parameter: $2])
],[])
# Pad zeros at end of numbers to make same length.
ax_compare_version_tmp_A="$A`echo $B | sed 's/./0/g'`"
B="$B`echo $A | sed 's/./0/g'`"
A="$ax_compare_version_tmp_A"
# Check for equality or inequality as necessary.
m4_case(m4_tolower(m4_substr($2,0,2)),
[eq],[
test "x$A" = "x$B" && ax_compare_version=true
],
[ne],[
test "x$A" != "x$B" && ax_compare_version=true
],[
AC_WARNING([illegal OP parameter: $2])
])
])
AS_VAR_POPDEF([A])dnl
AS_VAR_POPDEF([B])dnl
dnl # Execute ACTION-IF-TRUE / ACTION-IF-FALSE.
if test "$ax_compare_version" = "true" ; then
m4_ifvaln([$4],[$4],[:])dnl
m4_ifvaln([$5],[else $5])dnl
fi
]) dnl AX_COMPARE_VERSION
# =============================================================================
# https://www.gnu.org/software/autoconf-archive/ax_cxx_compile_stdcxx_11.html
# =============================================================================
#
# SYNOPSIS
#
# AX_CXX_COMPILE_STDCXX_11([ext|noext], [mandatory|optional])
#
# DESCRIPTION
#
# Check for baseline language coverage in the compiler for the C++11
# standard; if necessary, add switches to CXX and CXXCPP to enable
# support.
#
# This macro is a convenience alias for calling the AX_CXX_COMPILE_STDCXX
# macro with the version set to C++11. The two optional arguments are
# forwarded literally as the second and third argument respectively.
# Please see the documentation for the AX_CXX_COMPILE_STDCXX macro for
# more information. If you want to use this macro, you also need to
# download the ax_cxx_compile_stdcxx.m4 file.
#
# LICENSE
#
# Copyright (c) 2008 Benjamin Kosnik <bkoz@redhat.com>
# Copyright (c) 2012 Zack Weinberg <zackw@panix.com>
# Copyright (c) 2013 Roy Stogner <roystgnr@ices.utexas.edu>
# Copyright (c) 2014, 2015 Google Inc.; contributed by Alexey Sokolov <sokolov@google.com>
# Copyright (c) 2015 Paul Norman <penorman@mac.com>
# Copyright (c) 2015 Moritz Klammler <moritz@klammler.eu>
#
# Copying and distribution of this file, with or without modification, are
# permitted in any medium without royalty provided the copyright notice
# and this notice are preserved. This file is offered as-is, without any
# warranty.
#serial 18
AX_REQUIRE_DEFINED([AX_CXX_COMPILE_STDCXX])
AC_DEFUN([AX_CXX_COMPILE_STDCXX_11], [AX_CXX_COMPILE_STDCXX([11], [$1], [$2])])
dnl @synopsis AX_DMD
dnl
dnl Test for the presence of a DMD-compatible D2 compiler, and (optionally)
dnl specified modules on the import path.
dnl
dnl If "DMD" is defined in the environment, that will be the only
dnl dmd command tested. Otherwise, a hard-coded list will be used.
dnl
dnl After AX_DMD runs, the shell variables "success" and "ax_dmd" are set to
dnl "yes" or "no", and "DMD" is set to the appropriate command. Furthermore,
dnl "dmd_optlink" will be set to "yes" or "no" depending on whether OPTLINK is
dnl used as the linker (DMD/Windows), and "dmd_of_dirsep" will be set to the
dnl directory separator to use when passing -of to DMD (OPTLINK requires a
dnl backslash).
dnl
dnl AX_CHECK_D_MODULE must be run after AX_DMD. It tests for the presence of a
dnl module in the import path of the chosen compiler, and sets the shell
dnl variable "success" to "yes" or "no".
dnl
dnl @category D
dnl @version 2011-05-31
dnl @license AllPermissive
dnl
dnl Copyright (C) 2009 David Reiss
dnl Copyright (C) 2011 David Nadlinger
dnl Copying and distribution of this file, with or without modification,
dnl are permitted in any medium without royalty provided the copyright
dnl notice and this notice are preserved.
AC_DEFUN([AX_DMD],
[
dnl Hard-coded default commands to test.
DMD_PROGS="dmd,gdmd,ldmd"
dnl Allow the user to specify an alternative.
if test -n "$DMD" ; then
DMD_PROGS="$DMD"
fi
AC_MSG_CHECKING(for DMD)
# std.algorithm as a quick way to check for D2/Phobos.
echo "import std.algorithm; void main() {}" > configtest_ax_dmd.d
success=no
oIFS="$IFS"
IFS=","
for DMD in $DMD_PROGS ; do
IFS="$oIFS"
echo "Running \"$DMD configtest_ax_dmd.d\"" >&AS_MESSAGE_LOG_FD
if $DMD configtest_ax_dmd.d >&AS_MESSAGE_LOG_FD 2>&1 ; then
success=yes
break
fi
done
if test "$success" != "yes" ; then
AC_MSG_RESULT(no)
DMD=""
else
AC_MSG_RESULT(yes)
fi
ax_dmd="$success"
# Test whether OPTLINK is used by trying if DMD accepts -L/? without
# erroring out.
if test "$success" == "yes" ; then
AC_MSG_CHECKING(whether DMD uses OPTLINK)
echo "Running \”$DMD -L/? configtest_ax_dmd.d\"" >&AS_MESSAGE_LOG_FD
if $DMD -L/? configtest_ax_dmd.d >&AS_MESSAGE_LOG_FD 2>&1 ; then
AC_MSG_RESULT(yes)
dmd_optlink="yes"
# This actually produces double slashes in the final configure
# output, but at least it works.
dmd_of_dirsep="\\\\"
else
AC_MSG_RESULT(no)
dmd_optlink="no"
dmd_of_dirsep="/"
fi
fi
rm -f configtest_ax_dmd*
])
AC_DEFUN([AX_CHECK_D_MODULE],
[
AC_MSG_CHECKING(for D module [$1])
echo "import $1; void main() {}" > configtest_ax_dmd.d
echo "Running \"$DMD configtest_ax_dmd.d\"" >&AS_MESSAGE_LOG_FD
if $DMD -c configtest_ax_dmd.d >&AS_MESSAGE_LOG_FD 2>&1 ; then
AC_MSG_RESULT(yes)
success=yes
else
AC_MSG_RESULT(no)
success=no
fi
rm -f configtest_ax_dmd*
])
dnl @synopsis AX_JAVAC_AND_JAVA
dnl @synopsis AX_CHECK_JAVA_CLASS(CLASSNAME)
dnl
dnl Test for the presence of a JDK, and (optionally) specific classes.
dnl
dnl If "JAVA" is defined in the environment, that will be the only
dnl java command tested. Otherwise, a hard-coded list will be used.
dnl Similarly for "JAVAC".
dnl
dnl AX_JAVAC_AND_JAVA does not currently support testing for a particular
dnl Java version, testing for only one of "java" and "javac", or
dnl compiling or running user-provided Java code.
dnl
dnl After AX_JAVAC_AND_JAVA runs, the shell variables "success" and
dnl "ax_javac_and_java" are set to "yes" or "no", and "JAVAC" and
dnl "JAVA" are set to the appropriate commands.
dnl
dnl AX_CHECK_JAVA_CLASS must be run after AX_JAVAC_AND_JAVA.
dnl It tests for the presence of a class based on a fully-qualified name.
dnl It sets the shell variable "success" to "yes" or "no".
dnl
dnl @category Java
dnl @version 2009-02-09
dnl @license AllPermissive
dnl
dnl Copyright (C) 2009 David Reiss
dnl Copying and distribution of this file, with or without modification,
dnl are permitted in any medium without royalty provided the copyright
dnl notice and this notice are preserved.
AC_DEFUN([AX_JAVAC_AND_JAVA],
[
dnl Hard-coded default commands to test.
JAVAC_PROGS="javac,jikes,gcj -C"
JAVA_PROGS="java,kaffe"
dnl Allow the user to specify an alternative.
if test -n "$JAVAC" ; then
JAVAC_PROGS="$JAVAC"
fi
if test -n "$JAVA" ; then
JAVA_PROGS="$JAVA"
fi
AC_MSG_CHECKING(for javac and java)
echo "public class configtest_ax_javac_and_java { public static void main(String args@<:@@:>@) { } }" > configtest_ax_javac_and_java.java
success=no
oIFS="$IFS"
IFS=","
for JAVAC in $JAVAC_PROGS ; do
IFS="$oIFS"
echo "Running \"$JAVAC configtest_ax_javac_and_java.java\"" >&AS_MESSAGE_LOG_FD
if $JAVAC configtest_ax_javac_and_java.java >&AS_MESSAGE_LOG_FD 2>&1 ; then
# prevent $JAVA VM issues with UTF-8 path names (THRIFT-3271)
oLC_ALL="$LC_ALL"
LC_ALL=""
IFS=","
for JAVA in $JAVA_PROGS ; do
IFS="$oIFS"
echo "Running \"$JAVA configtest_ax_javac_and_java\"" >&AS_MESSAGE_LOG_FD
if $JAVA configtest_ax_javac_and_java >&AS_MESSAGE_LOG_FD 2>&1 ; then
success=yes
break 2
fi
done
# restore LC_ALL
LC_ALL="$oLC_ALL"
oLC_ALL=""
fi
done
rm -f configtest_ax_javac_and_java.java configtest_ax_javac_and_java.class
if test "$success" != "yes" ; then
AC_MSG_RESULT(no)
JAVAC=""
JAVA=""
else
AC_MSG_RESULT(yes)
fi
ax_javac_and_java="$success"
])
AC_DEFUN([AX_CHECK_JAVA_CLASS],
[
AC_MSG_CHECKING(for Java class [$1])
echo "import $1; public class configtest_ax_javac_and_java { public static void main(String args@<:@@:>@) { } }" > configtest_ax_javac_and_java.java
echo "Running \"$JAVAC configtest_ax_javac_and_java.java\"" >&AS_MESSAGE_LOG_FD
if $JAVAC configtest_ax_javac_and_java.java >&AS_MESSAGE_LOG_FD 2>&1 ; then
AC_MSG_RESULT(yes)
success=yes
else
AC_MSG_RESULT(no)
success=no
fi
rm -f configtest_ax_javac_and_java.java configtest_ax_javac_and_java.class
])
AC_DEFUN([AX_CHECK_ANT_VERSION],
[
AC_MSG_CHECKING(for ant version > $2)
ANT_VALID=`expr "x$(printf "$2\n$($1 -version 2>/dev/null | sed -n 's/.*version \(@<:@0-9\.@:>@*\).*/\1/p')" | sort -t '.' -k 1,1 -k 2,2 -k 3,3 -g | sed -n 1p)" = "x$2"`
if test "x$ANT_VALID" = "x1" ; then
AC_MSG_RESULT(yes)
else
AC_MSG_RESULT(no)
ANT=""
fi
])
dnl @synopsis AX_LIB_EVENT([MINIMUM-VERSION])
dnl
dnl Test for the libevent library of a particular version (or newer).
dnl
dnl If no path to the installed libevent is given, the macro will first try
dnl using no -I or -L flags, then searches under /usr, /usr/local, /opt,
dnl and /opt/libevent.
dnl If these all fail, it will try the $LIBEVENT_ROOT environment variable.
dnl
dnl This macro requires that #include <sys/types.h> works and defines u_char.
dnl
dnl This macro calls:
dnl AC_SUBST(LIBEVENT_CPPFLAGS)
dnl AC_SUBST(LIBEVENT_LDFLAGS)
dnl AC_SUBST(LIBEVENT_LIBS)
dnl
dnl And (if libevent is found):
dnl AC_DEFINE(HAVE_LIBEVENT)
dnl
dnl It also leaves the shell variables "success" and "ax_have_libevent"
dnl set to "yes" or "no".
dnl
dnl NOTE: This macro does not currently work for cross-compiling,
dnl but it can be easily modified to allow it. (grep "cross").
dnl
dnl @category InstalledPackages
dnl @category C
dnl @version 2007-09-12
dnl @license AllPermissive
dnl
dnl Copyright (C) 2009 David Reiss
dnl Copying and distribution of this file, with or without modification,
dnl are permitted in any medium without royalty provided the copyright
dnl notice and this notice are preserved.
dnl Input: ax_libevent_path, WANT_LIBEVENT_VERSION
dnl Output: success=yes/no
AC_DEFUN([AX_LIB_EVENT_DO_CHECK],
[
# Save our flags.
CPPFLAGS_SAVED="$CPPFLAGS"
LDFLAGS_SAVED="$LDFLAGS"
LIBS_SAVED="$LIBS"
LD_LIBRARY_PATH_SAVED="$LD_LIBRARY_PATH"
# Set our flags if we are checking a specific directory.
if test -n "$ax_libevent_path" ; then
LIBEVENT_CPPFLAGS="-I$ax_libevent_path/include"
LIBEVENT_LDFLAGS="-L$ax_libevent_path/lib"
LD_LIBRARY_PATH="$ax_libevent_path/lib:$LD_LIBRARY_PATH"
else
LIBEVENT_CPPFLAGS=""
LIBEVENT_LDFLAGS=""
fi
# Required flag for libevent.
LIBEVENT_LIBS="-levent"
# Prepare the environment for compilation.
CPPFLAGS="$CPPFLAGS $LIBEVENT_CPPFLAGS"
LDFLAGS="$LDFLAGS $LIBEVENT_LDFLAGS"
LIBS="$LIBS $LIBEVENT_LIBS"
export CPPFLAGS
export LDFLAGS
export LIBS
export LD_LIBRARY_PATH
success=no
# Compile, link, and run the program. This checks:
# - event.h is available for including.
# - event_get_version() is available for linking.
# - The event version string is lexicographically greater
# than the required version.
AC_LANG_PUSH([C])
dnl This can be changed to AC_LINK_IFELSE if you are cross-compiling,
dnl but then the version cannot be checked.
AC_LINK_IFELSE([AC_LANG_PROGRAM([[
#include <sys/types.h>
#include <event.h>
]], [[
const char* lib_version = event_get_version();
const char* wnt_version = "$WANT_LIBEVENT_VERSION";
int lib_digits;
int wnt_digits;
for (;;) {
/* If we reached the end of the want version. We have it. */
if (*wnt_version == '\0' || *wnt_version == '-') {
return 0;
}
/* If the want version continues but the lib version does not, */
/* we are missing a letter. We don't have it. */
if (*lib_version == '\0' || *lib_version == '-') {
return 1;
}
/* In the 1.4 version numbering style, if there are more digits */
/* in one version than the other, that one is higher. */
for (lib_digits = 0;
lib_version[lib_digits] >= '0' &&
lib_version[lib_digits] <= '9';
lib_digits++)
;
for (wnt_digits = 0;
wnt_version[wnt_digits] >= '0' &&
wnt_version[wnt_digits] <= '9';
wnt_digits++)
;
if (lib_digits > wnt_digits) {
return 0;
}
if (lib_digits < wnt_digits) {
return 1;
}
/* If we have greater than what we want. We have it. */
if (*lib_version > *wnt_version) {
return 0;
}
/* If we have less, we don't. */
if (*lib_version < *wnt_version) {
return 1;
}
lib_version++;
wnt_version++;
}
return 0;
]])], [
success=yes
])
AC_LANG_POP([C])
# Restore flags.
CPPFLAGS="$CPPFLAGS_SAVED"
LDFLAGS="$LDFLAGS_SAVED"
LIBS="$LIBS_SAVED"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH_SAVED"
])
AC_DEFUN([AX_LIB_EVENT],
[
dnl Allow search path to be overridden on the command line.
AC_ARG_WITH([libevent],
AS_HELP_STRING([--with-libevent@<:@=DIR@:>@], [use libevent [default=yes]. Optionally specify the root prefix dir where libevent is installed]),
[
if test "x$withval" = "xno"; then
want_libevent="no"
elif test "x$withval" = "xyes"; then
want_libevent="yes"
ax_libevent_path=""
else
want_libevent="yes"
ax_libevent_path="$withval"
fi
],
[ want_libevent="yes" ; ax_libevent_path="" ])
if test "$want_libevent" = "yes"; then
WANT_LIBEVENT_VERSION=ifelse([$1], ,1.2,$1)
AC_MSG_CHECKING(for libevent >= $WANT_LIBEVENT_VERSION)
# Run tests.
if test -n "$ax_libevent_path"; then
AX_LIB_EVENT_DO_CHECK
else
for ax_libevent_path in "" $lt_sysroot/usr $lt_sysroot/usr/local $lt_sysroot/opt $lt_sysroot/opt/local $lt_sysroot/opt/libevent "$LIBEVENT_ROOT" ; do
AX_LIB_EVENT_DO_CHECK
if test "$success" = "yes"; then
break;
fi
done
fi
if test "$success" != "yes" ; then
AC_MSG_RESULT(no)
LIBEVENT_CPPFLAGS=""
LIBEVENT_LDFLAGS=""
LIBEVENT_LIBS=""
else
AC_MSG_RESULT(yes)
AC_DEFINE(HAVE_LIBEVENT,,[define if libevent is available])
ax_have_libevent_[]m4_translit([$1], [.], [_])="yes"
fi
ax_have_libevent="$success"
AC_SUBST(LIBEVENT_CPPFLAGS)
AC_SUBST(LIBEVENT_LDFLAGS)
AC_SUBST(LIBEVENT_LIBS)
fi
])
dnl @synopsis AX_LIB_ZLIB([MINIMUM-VERSION])
dnl
dnl Test for the libz library of a particular version (or newer).
dnl
dnl If no path to the installed zlib is given, the macro will first try
dnl using no -I or -L flags, then searches under /usr, /usr/local, /opt,
dnl and /opt/zlib.
dnl If these all fail, it will try the $ZLIB_ROOT environment variable.
dnl
dnl This macro calls:
dnl AC_SUBST(ZLIB_CPPFLAGS)
dnl AC_SUBST(ZLIB_LDFLAGS)
dnl AC_SUBST(ZLIB_LIBS)
dnl
dnl And (if zlib is found):
dnl AC_DEFINE(HAVE_ZLIB)
dnl
dnl It also leaves the shell variables "success" and "ax_have_zlib"
dnl set to "yes" or "no".
dnl
dnl NOTE: This macro does not currently work for cross-compiling,
dnl but it can be easily modified to allow it. (grep "cross").
dnl
dnl @category InstalledPackages
dnl @category C
dnl @version 2007-09-12
dnl @license AllPermissive
dnl
dnl Copyright (C) 2009 David Reiss
dnl Copying and distribution of this file, with or without modification,
dnl are permitted in any medium without royalty provided the copyright
dnl notice and this notice are preserved.
dnl Input: ax_zlib_path, WANT_ZLIB_VERSION
dnl Output: success=yes/no
AC_DEFUN([AX_LIB_ZLIB_DO_CHECK],
[
# Save our flags.
CPPFLAGS_SAVED="$CPPFLAGS"
LDFLAGS_SAVED="$LDFLAGS"
LIBS_SAVED="$LIBS"
LD_LIBRARY_PATH_SAVED="$LD_LIBRARY_PATH"
# Set our flags if we are checking a specific directory.
if test -n "$ax_zlib_path" ; then
ZLIB_CPPFLAGS="-I$ax_zlib_path/include"
ZLIB_LDFLAGS="-L$ax_zlib_path/lib"
LD_LIBRARY_PATH="$ax_zlib_path/lib:$LD_LIBRARY_PATH"
else
ZLIB_CPPFLAGS=""
ZLIB_LDFLAGS=""
fi
# Required flag for zlib.
ZLIB_LIBS="-lz"
# Prepare the environment for compilation.
CPPFLAGS="$CPPFLAGS $ZLIB_CPPFLAGS"
LDFLAGS="$LDFLAGS $ZLIB_LDFLAGS"
LIBS="$LIBS $ZLIB_LIBS"
export CPPFLAGS
export LDFLAGS
export LIBS
export LD_LIBRARY_PATH
success=no
# Compile, link, and run the program. This checks:
# - zlib.h is available for including.
# - zlibVersion() is available for linking.
# - ZLIB_VERNUM is greater than or equal to the desired version.
# - ZLIB_VERSION (defined in zlib.h) matches zlibVersion()
# (defined in the library).
AC_LANG_PUSH([C])
dnl This can be changed to AC_LINK_IFELSE if you are cross-compiling.
AC_LINK_IFELSE([AC_LANG_PROGRAM([[
#include <zlib.h>
#if ZLIB_VERNUM >= 0x$WANT_ZLIB_VERSION
#else
# error zlib is too old
#endif
]], [[
const char* lib_version = zlibVersion();
const char* hdr_version = ZLIB_VERSION;
for (;;) {
if (*lib_version != *hdr_version) {
/* If this happens, your zlib header doesn't match your zlib */
/* library. That is really bad. */
return 1;
}
if (*lib_version == '\0') {
break;
}
lib_version++;
hdr_version++;
}
return 0;
]])], [
success=yes
])
AC_LANG_POP([C])
# Restore flags.
CPPFLAGS="$CPPFLAGS_SAVED"
LDFLAGS="$LDFLAGS_SAVED"
LIBS="$LIBS_SAVED"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH_SAVED"
])
AC_DEFUN([AX_LIB_ZLIB],
[
dnl Allow search path to be overridden on the command line.
AC_ARG_WITH([zlib],
AS_HELP_STRING([--with-zlib@<:@=DIR@:>@], [use zlib (default is yes) - it is possible to specify an alternate root directory for zlib]),
[
if test "x$withval" = "xno"; then
want_zlib="no"
elif test "x$withval" = "xyes"; then
want_zlib="yes"
ax_zlib_path=""
else
want_zlib="yes"
ax_zlib_path="$withval"
fi
],
[want_zlib="yes" ; ax_zlib_path="" ])
if test "$want_zlib" = "yes"; then
# Parse out the version.
zlib_version_req=ifelse([$1], ,1.2.3,$1)
zlib_version_req_major=`expr $zlib_version_req : '\([[0-9]]*\)'`
zlib_version_req_minor=`expr $zlib_version_req : '[[0-9]]*\.\([[0-9]]*\)'`
zlib_version_req_patch=`expr $zlib_version_req : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'`
if test -z "$zlib_version_req_patch" ; then
zlib_version_req_patch="0"
fi
WANT_ZLIB_VERSION=`expr $zlib_version_req_major \* 1000 \+ $zlib_version_req_minor \* 100 \+ $zlib_version_req_patch \* 10`
AC_MSG_CHECKING(for zlib >= $zlib_version_req)
# Run tests.
if test -n "$ax_zlib_path"; then
AX_LIB_ZLIB_DO_CHECK
else
for ax_zlib_path in "" /usr /usr/local /opt /opt/zlib "$ZLIB_ROOT" ; do
AX_LIB_ZLIB_DO_CHECK
if test "$success" = "yes"; then
break;
fi
done
fi
if test "$success" != "yes" ; then
AC_MSG_RESULT(no)
ZLIB_CPPFLAGS=""
ZLIB_LDFLAGS=""
ZLIB_LIBS=""
else
AC_MSG_RESULT(yes)
AC_DEFINE(HAVE_ZLIB,,[define if zlib is available])
fi
ax_have_zlib="$success"
AC_SUBST(ZLIB_CPPFLAGS)
AC_SUBST(ZLIB_LDFLAGS)
AC_SUBST(ZLIB_LIBS)
fi
])
This diff is collapsed.
# ===============================================================================
# https://www.gnu.org/software/autoconf-archive/ax_prog_dotnetcore_version.html
# ===============================================================================
#
# SYNOPSIS
#
# AX_PROG_DOTNETCORE_VERSION([VERSION],[ACTION-IF-TRUE],[ACTION-IF-FALSE])
#
# DESCRIPTION
#
# Makes sure that .NET Core supports the version indicated. If true the
# shell commands in ACTION-IF-TRUE are executed. If not the shell commands
# in ACTION-IF-FALSE are run. The $dotnetcore_version variable will be
# filled with the detected version.
#
# This macro uses the $DOTNETCORE variable to perform the check. If
# $DOTNETCORE is not set prior to calling this macro, the macro will fail.
#
# Example:
#
# AC_PATH_PROG([DOTNETCORE],[dotnet])
# AC_PROG_DOTNETCORE_VERSION([1.0.2],[ ... ],[ ... ])
#
# Searches for .NET Core, then checks if at least version 1.0.2 is
# present.
#
# LICENSE
#
# Copyright (c) 2016 Jens Geyer <jensg@apache.org>
#
# Copying and distribution of this file, with or without modification, are
# permitted in any medium without royalty provided the copyright notice
# and this notice are preserved. This file is offered as-is, without any
# warranty.
#serial 2
AC_DEFUN([AX_PROG_DOTNETCORE_VERSION],[
AC_REQUIRE([AC_PROG_SED])
AS_IF([test -n "$DOTNETCORE"],[
ax_dotnetcore_version="$1"
AC_MSG_CHECKING([for .NET Core version])
dotnetcore_version=`$DOTNETCORE --version 2>&1 | $SED -e 's/\(@<:@0-9@:>@*\.@<:@0-9@:>@*\.@<:@0-9@:>@*\)\(.*\)/\1/'`
AC_MSG_RESULT($dotnetcore_version)
AC_SUBST([DOTNETCORE_VERSION],[$dotnetcore_version])
AX_COMPARE_VERSION([$ax_dotnetcore_version],[le],[$dotnetcore_version],[
:
$2
],[
:
$3
])
],[
AC_MSG_WARN([could not find .NET Core])
$3
])
])
# ===========================================================================
# https://www.gnu.org/software/autoconf-archive/ax_prog_haxe_version.html
# ===========================================================================
#
# SYNOPSIS
#
# AX_PROG_HAXE_VERSION([VERSION],[ACTION-IF-TRUE],[ACTION-IF-FALSE])
#
# DESCRIPTION
#
# Makes sure that haxe supports the version indicated. If true the shell
# commands in ACTION-IF-TRUE are executed. If not the shell commands in
# ACTION-IF-FALSE are run. The $HAXE_VERSION variable will be filled with
# the detected version.
#
# This macro uses the $HAXE variable to perform the check. If $HAXE is not
# set prior to calling this macro, the macro will fail.
#
# Example:
#
# AC_PATH_PROG([HAXE],[haxe])
# AC_PROG_HAXE_VERSION([3.1.3],[ ... ],[ ... ])
#
# Searches for Haxe, then checks if at least version 3.1.3 is present.
#
# LICENSE
#
# Copyright (c) 2015 Jens Geyer <jensg@apache.org>
#
# Copying and distribution of this file, with or without modification, are
# permitted in any medium without royalty provided the copyright notice
# and this notice are preserved. This file is offered as-is, without any
# warranty.
#serial 2
AC_DEFUN([AX_PROG_HAXE_VERSION],[
AC_REQUIRE([AC_PROG_SED])
AS_IF([test -n "$HAXE"],[
ax_haxe_version="$1"
AC_MSG_CHECKING([for haxe version])
haxe_version=`$HAXE -version 2>&1 | $SED -e 's/^.* \( @<:@0-9@:>@*\.@<:@0-9@:>@*\.@<:@0-9@:>@*\) .*/\1/'`
AC_MSG_RESULT($haxe_version)
AC_SUBST([HAXE_VERSION],[$haxe_version])
AX_COMPARE_VERSION([$ax_haxe_version],[le],[$haxe_version],[
:
$2
],[
:
$3
])
],[
AC_MSG_WARN([could not find Haxe])
$3
])
])
dnl @synopsis AX_THRIFT_GEN(SHORT_LANGUAGE, LONG_LANGUAGE, DEFAULT)
dnl @synopsis AX_THRIFT_LIB(SHORT_LANGUAGE, LONG_LANGUAGE, DEFAULT)
dnl
dnl Allow a particular language generator to be disabled.
dnl Allow a particular language library to be disabled.
dnl
dnl These macros have poor error handling and are poorly documented.
dnl They are intended only for internal use by the Thrift compiler.
dnl
dnl @version 2008-02-20
dnl @license AllPermissive
dnl
dnl Copyright (C) 2009 David Reiss
dnl Copying and distribution of this file, with or without modification,
dnl are permitted in any medium without royalty provided the copyright
dnl notice and this notice are preserved.
AC_DEFUN([AX_THRIFT_LIB],
[
AC_ARG_WITH($1,
AC_HELP_STRING([--with-$1], [build the $2 library @<:@default=$3@:>@]),
[with_$1="$withval"],
[with_$1=$3]
)
have_$1=no
dnl What we do here is going to vary from library to library,
dnl so we can't really generalize (yet!).
])
::
:: Licensed under the Apache License, Version 2.0 (the "License");
:: you may not use this file except in compliance with the License.
:: You may obtain a copy of the License at
::
:: http://www.apache.org/licenses/LICENSE-2.0
::
:: Unless required by applicable law or agreed to in writing, software
:: distributed under the License is distributed on an "AS IS" BASIS,
:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
:: See the License for the specific language governing permissions and
:: limitations under the License.
::
@ECHO OFF
SETLOCAL EnableDelayedExpansion
CD build\appveyor || EXIT /B
CALL cl_banner_build.bat || EXIT /B
CALL cl_setenv.bat || EXIT /B
SET CMAKEARGS=^
-G'%GENERATOR%' ^
-DCMAKE_BUILD_TYPE=%CONFIGURATION% ^
-DCMAKE_INSTALL_PREFIX=%INSTDIR_MSYS% ^
-DCMAKE_MAKE_PROGRAM=/mingw64/bin/mingw32-make ^
-DCMAKE_C_COMPILER=x86_64-w64-mingw32-gcc.exe ^
-DCMAKE_CXX_COMPILER=x86_64-w64-mingw32-g++.exe ^
-DWITH_LIBEVENT=OFF ^
-DWITH_PYTHON=OFF ^
-DWITH_SHARED_LIB=OFF ^
-DWITH_STATIC_LIB=ON
@ECHO ON
%BASH% -lc "mkdir -p %BUILDDIR_MSYS% && cd %BUILDDIR_MSYS% && cmake.exe %SRCDIR_MSYS% %CMAKEARGS% && cmake --build . --config %CONFIGURATION% --target install" || EXIT /B
@ECHO OFF
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment