From 35d1513630722617720d883013e2503b26e4024e Mon Sep 17 00:00:00 2001 From: THIERRY SALLE Date: Fri, 3 Mar 2017 17:50:24 +0100 Subject: [PATCH] Added SSL and SASL support for input plugin kafka_consumer Use wurstmeister/kafka docker images for input kafka_consumer tests --- Godeps | 1 + Makefile | 24 ++-- internal/config/testdata/telegraf-agent.toml | 12 +- plugins/inputs/kafka_consumer/README.md | 13 ++- .../inputs/kafka_consumer/kafka_consumer.go | 105 +++++++++++++----- .../kafka_consumer_integration_test.go | 11 +- .../kafka_consumer/kafka_consumer_test.go | 2 +- 7 files changed, 116 insertions(+), 52 deletions(-) diff --git a/Godeps b/Godeps index aa9ace1ab1f3c..19153d4f1f0b5 100644 --- a/Godeps +++ b/Godeps @@ -52,6 +52,7 @@ github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6 github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987 github.com/vjeantet/grok d73e972b60935c7fec0b4ffbc904ed39ecaf7efe github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee +github.com/bsm/sarama-cluster 5d8c11085c875b3155870da9ba6be706429a95dc github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096 github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 diff --git a/Makefile b/Makefile index d2bad656d0abc..a4f7e3eb9dbbe 100644 --- a/Makefile +++ b/Makefile @@ -46,11 +46,15 @@ prepare-windows: # Run all docker containers necessary for unit tests docker-run: docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0 + docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper docker run --name kafka \ - -e ADVERTISED_HOST=localhost \ - -e ADVERTISED_PORT=9092 \ - -p "2181:2181" -p "9092:9092" \ - -d spotify/kafka + --link zookeeper:zookeeper \ + -e KAFKA_ADVERTISED_HOST_NAME=localhost \ + -e KAFKA_ADVERTISED_PORT=9092 \ + -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ + -e KAFKA_CREATE_TOPICS="test:1:1" \ + -p "9092:9092" \ + -d wurstmeister/kafka docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5 docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql docker run --name memcached -p "11211:11211" -d memcached @@ -65,11 +69,15 @@ docker-run: # Run docker containers necessary for CircleCI unit tests docker-run-circle: docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0 + docker run --name zookeeper -p "2181:2181" -d wurstmeister/zookeeper docker run --name kafka \ - -e ADVERTISED_HOST=localhost \ - -e ADVERTISED_PORT=9092 \ - -p "2181:2181" -p "9092:9092" \ - -d spotify/kafka + --link zookeeper:zookeeper \ + -e KAFKA_ADVERTISED_HOST_NAME=localhost \ + -e KAFKA_ADVERTISED_PORT=9092 \ + -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ + -e KAFKA_CREATE_TOPICS="test:1:1" \ + -p "9092:9092" \ + -d wurstmeister/kafka docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5 docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt diff --git a/internal/config/testdata/telegraf-agent.toml b/internal/config/testdata/telegraf-agent.toml index 5cf82af763d64..cf7598a23e40b 100644 --- a/internal/config/testdata/telegraf-agent.toml +++ b/internal/config/testdata/telegraf-agent.toml @@ -145,15 +145,13 @@ # read metrics from a Kafka topic [[inputs.kafka_consumer]] - # topic(s) to consume + ## kafka brokers + brokers = ["localhost:9092"] + ## topic(s) to consume topics = ["telegraf"] - # an array of Zookeeper connection strings - zookeeper_peers = ["localhost:2181"] - # the name of the consumer group + ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" - # Maximum number of points to buffer between collection intervals - point_buffer = 100000 - # Offset (must be either "oldest" or "newest") + ## Offset (must be either "oldest" or "newest") offset = "oldest" # Read metrics from a LeoFS Server via SNMP diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index ae04c95159ace..096ad1a428c92 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -14,7 +14,7 @@ from the same topic in parallel. ## topic(s) to consume topics = ["telegraf"] ## an array of Zookeeper connection strings - zookeeper_peers = ["localhost:2181"] + brokers = ["localhost:9092"] ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" ## Maximum number of metrics to buffer between collection intervals @@ -22,6 +22,17 @@ from the same topic in parallel. ## Offset (must be either "oldest" or "newest") offset = "oldest" + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ## Optional SASL Config + # sasl_username = "kafka" + # sasl_password = "secret" + ## Data format to consume. ## Each data format has its own unique set of configuration options, read diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 2f6933db0c004..4e4715617c9f4 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -7,20 +7,35 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/Shopify/sarama" - "github.com/wvanbergen/kafka/consumergroup" + cluster "github.com/bsm/sarama-cluster" ) type Kafka struct { - ConsumerGroup string - Topics []string - MaxMessageLen int - ZookeeperPeers []string - ZookeeperChroot string - Consumer *consumergroup.ConsumerGroup + ConsumerGroup string + Topics []string + Brokers []string + MaxMessageLen int + + Cluster *cluster.Consumer + + // Verify Kafka SSL Certificate + InsecureSkipVerify bool + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + + // SASL Username + SASLUsername string `toml:"sasl_username"` + // SASL Password + SASLPassword string `toml:"sasl_password"` // Legacy metric buffer support MetricBuffer int @@ -47,12 +62,22 @@ type Kafka struct { } var sampleConfig = ` + ## kafka servers + brokers = ["localhost:9092"] ## topic(s) to consume topics = ["telegraf"] - ## an array of Zookeeper connection strings - zookeeper_peers = ["localhost:2181"] - ## Zookeeper Chroot - zookeeper_chroot = "" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ## Optional SASL Config + # sasl_username = "kafka" + # sasl_password = "secret" + ## the name of the consumer group consumer_group = "telegraf_metrics_consumers" ## Offset (must be either "oldest" or "newest") @@ -84,45 +109,67 @@ func (k *Kafka) SetParser(parser parsers.Parser) { func (k *Kafka) Start(acc telegraf.Accumulator) error { k.Lock() defer k.Unlock() - var consumerErr error + var clusterErr error k.acc = acc - config := consumergroup.NewConfig() - config.Zookeeper.Chroot = k.ZookeeperChroot + config := cluster.NewConfig() + config.Consumer.Return.Errors = true + + tlsConfig, err := internal.GetTLSConfig( + k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify) + if err != nil { + return err + } + + if tlsConfig != nil { + log.Printf("D! TLS Enabled") + config.Net.TLS.Config = tlsConfig + config.Net.TLS.Enable = true + } + if k.SASLUsername != "" && k.SASLPassword != "" { + log.Printf("D! Using SASL auth with username '%s',", + k.SASLUsername) + config.Net.SASL.User = k.SASLUsername + config.Net.SASL.Password = k.SASLPassword + config.Net.SASL.Enable = true + } + switch strings.ToLower(k.Offset) { case "oldest", "": - config.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Offsets.Initial = sarama.OffsetOldest case "newest": - config.Offsets.Initial = sarama.OffsetNewest + config.Consumer.Offsets.Initial = sarama.OffsetNewest default: log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", k.Offset) - config.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Offsets.Initial = sarama.OffsetOldest } - if k.Consumer == nil || k.Consumer.Closed() { - k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( + if k.Cluster == nil { + k.Cluster, clusterErr = cluster.NewConsumer( + k.Brokers, k.ConsumerGroup, k.Topics, - k.ZookeeperPeers, config, ) - if consumerErr != nil { - return consumerErr + + if clusterErr != nil { + log.Printf("E! Error when creating Kafka Consumer, brokers: %v, topics: %v\n", + k.Brokers, k.Topics) + return clusterErr } // Setup message and error channels - k.in = k.Consumer.Messages() - k.errs = k.Consumer.Errors() + k.in = k.Cluster.Messages() + k.errs = k.Cluster.Errors() } k.done = make(chan struct{}) - // Start the kafka message reader go k.receiver() - log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n", - k.ZookeeperPeers, k.Topics) + log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v\n", + k.Brokers, k.Topics) return nil } @@ -156,7 +203,7 @@ func (k *Kafka) receiver() { // TODO(cam) this locking can be removed if this PR gets merged: // https://github.com/wvanbergen/kafka/pull/84 k.Lock() - k.Consumer.CommitUpto(msg) + k.Cluster.MarkOffset(msg, "") k.Unlock() } } @@ -167,7 +214,7 @@ func (k *Kafka) Stop() { k.Lock() defer k.Unlock() close(k.done) - if err := k.Consumer.Close(); err != nil { + if err := k.Cluster.Close(); err != nil { k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error())) } } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go index 41ce101570946..a145a938afc4f 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go @@ -19,7 +19,6 @@ func TestReadsMetricsFromKafka(t *testing.T) { } brokerPeers := []string{testutil.GetLocalHost() + ":9092"} - zkPeers := []string{testutil.GetLocalHost() + ":2181"} testTopic := fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()) // Send a Kafka message to the kafka host @@ -36,11 +35,11 @@ func TestReadsMetricsFromKafka(t *testing.T) { // Start the Kafka Consumer k := &Kafka{ - ConsumerGroup: "telegraf_test_consumers", - Topics: []string{testTopic}, - ZookeeperPeers: zkPeers, - PointBuffer: 100000, - Offset: "oldest", + ConsumerGroup: "telegraf_test_consumers", + Topics: []string{testTopic}, + Brokers: brokerPeers, + PointBuffer: 100000, + Offset: "oldest", } p, _ := parsers.NewInfluxParser() k.SetParser(p) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 5519dd0d17b44..9a585d6ede4ca 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -23,7 +23,7 @@ func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) { k := Kafka{ ConsumerGroup: "test", Topics: []string{"telegraf"}, - ZookeeperPeers: []string{"localhost:2181"}, + Brokers: []string{"localhost:9092"}, Offset: "oldest", in: in, doNotCommitMsgs: true,