Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding option to randomize Kinesis partition key #2705

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ be deprecated eventually.
- [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks
- [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests.
- [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin
- [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option
- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer

### Bugfixes
Expand Down
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ github.com/prometheus/common dd2f054febf4a6c00f2343686efb775948a8bff4
github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160
github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c
github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693
github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b
github.com/shirou/gopsutil 70693b6a3da51a8a686d31f1b346077bbc066062
github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15
github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
Expand Down
5 changes: 5 additions & 0 deletions plugins/outputs/kinesis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ This is used to group data within a stream. Currently this plugin only supports
Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable
solution to scale out.

### use_random_partitionkey

When true a random UUID will be generated and used as the partitionkey when sending data to Kinesis. This allows data to evenly spread across multiple shards in the stream. Due to using a random paritionKey there can be no guarantee of ordering when consuming the data off the shards.
If true then the partitionkey option will be ignored.

### format

The format configuration value has been designated to allow people to change the format of the Point as written to
Expand Down
23 changes: 18 additions & 5 deletions plugins/outputs/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/satori/go.uuid"

"github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
Expand All @@ -23,10 +24,11 @@ type KinesisOutput struct {
Filename string `toml:"shared_credential_file"`
Token string `toml:"token"`

StreamName string `toml:"streamname"`
PartitionKey string `toml:"partitionkey"`
Debug bool `toml:"debug"`
svc *kinesis.Kinesis
StreamName string `toml:"streamname"`
PartitionKey string `toml:"partitionkey"`
RandomPartitionKey bool `toml:"use_random_partitionkey"`
Debug bool `toml:"debug"`
svc *kinesis.Kinesis

serializer serializers.Serializer
}
Expand Down Expand Up @@ -54,6 +56,11 @@ var sampleConfig = `
streamname = "StreamName"
## PartitionKey as used for sharding data.
partitionkey = "PartitionKey"
## If set the paritionKey will be a random UUID on every put.
## This allows for scaling across multiple shards in a stream.
## This will cause issues with ordering.
use_random_partitionkey = false


## Data format to output.
## Each data format has it's own unique set of configuration options, read
Expand Down Expand Up @@ -173,9 +180,15 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
return err
}

partitionKey := k.PartitionKey
if k.RandomPartitionKey {
u := uuid.NewV4()
partitionKey = u.String()
}

d := kinesis.PutRecordsRequestEntry{
Data: values,
PartitionKey: aws.String(k.PartitionKey),
PartitionKey: aws.String(partitionKey),
}

r = append(r, &d)
Expand Down