Skip to content

Commit

Permalink
Fix kafka plugin and rename to kafka_consumer
Browse files Browse the repository at this point in the history
fixes #371
  • Loading branch information
sparrc committed Nov 19, 2015
1 parent a3feddd commit 970bfce
Show file tree
Hide file tree
Showing 11 changed files with 463 additions and 431 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
## v0.2.3 [unreleased]

### Release Notes
- **breaking change** The `kafka` plugin has been renamed to `kafka_consumer`.
and most of the config option names have changed.
This only affects the kafka consumer _plugin_ (not the
output). There were a number of problems with the kafka plugin that led to it
only collecting data once at startup, so the kafka plugin was basically non-
functional.
- Riemann output added

### Features
- [#379](https://github.com/influxdb/telegraf/pull/379): Riemann output, thanks @allenj!
- [#375](https://github.com/influxdb/telegraf/pull/375): kafka_consumer service plugin.

### Bugfixes
- [#371](https://github.com/influxdb/telegraf/issues/371): Kafka consumer plugin not functioning.

## v0.2.2 [2015-11-18]

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ Telegraf currently has support for collecting metrics from:
* haproxy
* httpjson (generic JSON-emitting http service plugin)
* jolokia (remote JMX with JSON over HTTP)
* kafka_consumer
* leofs
* lustre2
* memcached
Expand Down Expand Up @@ -197,6 +196,7 @@ Telegraf currently has support for collecting metrics from:
Telegraf can collect metrics via the following services:

* statsd
* kafka_consumer

We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API.
Expand Down
44 changes: 19 additions & 25 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/influxdb/telegraf/plugins"
"github.com/influxdb/telegraf/plugins/exec"
"github.com/influxdb/telegraf/plugins/kafka_consumer"
"github.com/influxdb/telegraf/plugins/memcached"
"github.com/influxdb/telegraf/plugins/procstat"
"github.com/naoina/toml"
"github.com/naoina/toml/ast"
Expand Down Expand Up @@ -205,17 +205,14 @@ func TestConfig_parsePlugin(t *testing.T) {
pluginConfigurationFieldsSet: make(map[string][]string),
}

subtbl := tbl.Fields["kafka"].(*ast.Table)
err = c.parsePlugin("kafka", subtbl)
subtbl := tbl.Fields["memcached"].(*ast.Table)
err = c.parsePlugin("memcached", subtbl)

kafka := plugins.Plugins["kafka"]().(*kafka_consumer.Kafka)
kafka.ConsumerGroupName = "telegraf_metrics_consumers"
kafka.Topic = "topic_with_metrics"
kafka.ZookeeperPeers = []string{"test.example.com:2181"}
kafka.BatchSize = 1000
memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"}

kConfig := &ConfiguredPlugin{
Name: "kafka",
mConfig := &ConfiguredPlugin{
Name: "memcached",
Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"},
TagDrop: []TagFilter{
Expand All @@ -233,10 +230,10 @@ func TestConfig_parsePlugin(t *testing.T) {
Interval: 5 * time.Second,
}

assert.Equal(t, kafka, c.plugins["kafka"],
"Testdata did not produce a correct kafka struct.")
assert.Equal(t, kConfig, c.pluginConfigurations["kafka"],
"Testdata did not produce correct kafka metadata.")
assert.Equal(t, memcached, c.plugins["memcached"],
"Testdata did not produce a correct memcached struct.")
assert.Equal(t, mConfig, c.pluginConfigurations["memcached"],
"Testdata did not produce correct memcached metadata.")
}

func TestConfig_LoadDirectory(t *testing.T) {
Expand All @@ -249,14 +246,11 @@ func TestConfig_LoadDirectory(t *testing.T) {
t.Error(err)
}

kafka := plugins.Plugins["kafka"]().(*kafka_consumer.Kafka)
kafka.ConsumerGroupName = "telegraf_metrics_consumers"
kafka.Topic = "topic_with_metrics"
kafka.ZookeeperPeers = []string{"test.example.com:2181"}
kafka.BatchSize = 10000
memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"192.168.1.1"}

kConfig := &ConfiguredPlugin{
Name: "kafka",
mConfig := &ConfiguredPlugin{
Name: "memcached",
Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"},
TagDrop: []TagFilter{
Expand Down Expand Up @@ -296,10 +290,10 @@ func TestConfig_LoadDirectory(t *testing.T) {

pConfig := &ConfiguredPlugin{Name: "procstat"}

assert.Equal(t, kafka, c.plugins["kafka"],
"Merged Testdata did not produce a correct kafka struct.")
assert.Equal(t, kConfig, c.pluginConfigurations["kafka"],
"Merged Testdata did not produce correct kafka metadata.")
assert.Equal(t, memcached, c.plugins["memcached"],
"Merged Testdata did not produce a correct memcached struct.")
assert.Equal(t, mConfig, c.pluginConfigurations["memcached"],
"Merged Testdata did not produce correct memcached metadata.")

assert.Equal(t, ex, c.plugins["exec"],
"Merged Testdata did not produce a correct exec struct.")
Expand Down
5 changes: 3 additions & 2 deletions outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type InfluxDB struct {
}

var sampleConfig = `
# The full HTTP or UDP endpoint URL for your InfluxDB instance
# Multiple urls can be specified for InfluxDB cluster support.
# The full HTTP or UDP endpoint URL for your InfluxDB instance.
# Multiple urls can be specified but it is assumed that they are part of the same
# cluster, this means that only ONE of the urls will be written to each interval.
# urls = ["udp://localhost:8089"] # UDP endpoint example
urls = ["http://localhost:8086"] # required
# The target database for metrics (telegraf will create it if not exists)
Expand Down
Loading

0 comments on commit 970bfce

Please sign in to comment.