Skip to content

Commit

Permalink
Performance refactor of running_output buffers
Browse files Browse the repository at this point in the history
closes #914
closes #967
  • Loading branch information
sparrc committed Apr 26, 2016
1 parent 1c4043a commit 97a2c2f
Show file tree
Hide file tree
Showing 7 changed files with 529 additions and 198 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@

### Release Notes

- New [agent] configuration option: `metric_batch_size`. This option tells
telegraf the maximum batch size to allow to accumulate before sending a flush
to the configured outputs. `metric_buffer_limit` now refers to the absolute
maximum number of metrics that will accumulate before metrics are dropped.

- There is no longer an option to
`flush_buffer_when_full`, this is now the default and only behavior of telegraf.

- **Breaking Change**: docker plugin tags. The cont_id tag no longer exists, it
will now be a field, and be called container_id. Additionally, cont_image and
cont_name are being renamed to container_image and container_name.

- **Breaking Change**: docker plugin measurements. The `docker_cpu`, `docker_mem`,
`docker_blkio` and `docker_net` measurements are being renamed to
`docker_container_cpu`, `docker_container_mem`, `docker_container_blkio` and
Expand All @@ -16,15 +25,19 @@ So adding "container" to each metric will:
(1) make it more clear that these metrics are per-container, and
(2) allow users to easily drop per-container metrics if cardinality is an
issue (`namedrop = ["docker_container_*"]`)

- `tagexclude` and `taginclude` are now available, which can be used to remove
tags from measurements on inputs and outputs. See
[the configuration doc](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md)
for more details.

- **Measurement filtering:** All measurement filters now match based on glob
only. Previously there was an undocumented behavior where filters would match
based on _prefix_ in addition to globs. This means that a filter like
`fielddrop = ["time_"]` will need to be changed to `fielddrop = ["time_*"]`

- **datadog**: measurement and field names will no longer have `_` replaced by `.`

- The following plugins have changed their tags to _not_ overwrite the host tag:
- cassandra: `host -> cassandra_host`
- disque: `host -> disque_host`
Expand All @@ -42,6 +55,8 @@ based on _prefix_ in addition to globs. This means that a filter like
- [#1072](https://github.com/influxdata/telegraf/pull/1072): New Input Plugin: filestat.
- [#1066](https://github.com/influxdata/telegraf/pull/1066): Replication lag metrics for MongoDB input plugin
- [#1086](https://github.com/influxdata/telegraf/pull/1086): Ability to specify AWS keys in config file. Thanks @johnrengleman!
- [#1096](https://github.com/influxdata/telegraf/pull/1096): Performance refactor of running output buffers.
- [#967](https://github.com/influxdata/telegraf/issues/967): Buffer logging improvements.

### Bugfixes

Expand All @@ -55,6 +70,7 @@ based on _prefix_ in addition to globs. This means that a filter like
- [#1078](https://github.com/influxdata/telegraf/issues/1078): Use default AWS credential chain.
- [#1070](https://github.com/influxdata/telegraf/issues/1070): SQL Server input. Fix datatype conversion.
- [#1089](https://github.com/influxdata/telegraf/issues/1089): Fix leaky TCP connections in phpfpm plugin.
- [#914](https://github.com/influxdata/telegraf/issues/914): Telegraf can drop metrics on full buffers.

## v0.12.1 [2016-04-14]

Expand Down
85 changes: 74 additions & 11 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true

## Telegraf will send metrics to output in batch of at
## Telegraf will send metrics to outputs in batches of at
## most metric_batch_size metrics.
metric_batch_size = 1000
## Telegraf will cache metric_buffer_limit metrics for each output, and will
## flush this buffer on a successful write. This should be a multiple of
## metric_batch_size and could not be less than 2 times metric_batch_size
## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills.
metric_buffer_limit = 10000
## Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true

## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
Expand Down Expand Up @@ -151,6 +149,15 @@
# ## Amazon REGION
# region = 'us-east-1'
#
# ## Amazon Credentials
# ## Credentials are loaded in the following order
# ## 1) explicit credentials from 'access_key' and 'secret_key'
# ## 2) environment variables
# ## 3) shared credentials file
# ## 4) EC2 Instance Profile
# #access_key = ""
# #secret_key = ""
#
# ## Namespace for the CloudWatch MetricDatums
# namespace = 'InfluxData/Telegraf'

Expand Down Expand Up @@ -243,6 +250,16 @@
# [[outputs.kinesis]]
# ## Amazon REGION of kinesis endpoint.
# region = "ap-southeast-2"
#
# ## Amazon Credentials
# ## Credentials are loaded in the following order
# ## 1) explicit credentials from 'access_key' and 'secret_key'
# ## 2) environment variables
# ## 3) shared credentials file
# ## 4) EC2 Instance Profile
# #access_key = ""
# #secret_key = ""
#
# ## Kinesis StreamName must exist prior to starting telegraf.
# streamname = "StreamName"
# ## PartitionKey as used for sharding data.
Expand Down Expand Up @@ -457,6 +474,15 @@
# ## Amazon Region
# region = 'us-east-1'
#
# ## Amazon Credentials
# ## Credentials are loaded in the following order
# ## 1) explicit credentials from 'access_key' and 'secret_key'
# ## 2) environment variables
# ## 3) shared credentials file
# ## 4) EC2 Instance Profile
# #access_key = ""
# #secret_key = ""
#
# ## Requested CloudWatch aggregation Period (required - must be a multiple of 60s)
# period = '1m'
#
Expand Down Expand Up @@ -588,8 +614,14 @@
# [[inputs.filestat]]
# ## Files to gather stats about.
# ## These accept standard unix glob matching rules, but with the addition of
# ## ** as a "super asterisk". See https://github.com/gobwas/glob.
# files = ["/etc/telegraf/telegraf.conf", "/var/log/**.log"]
# ## ** as a "super asterisk". ie:
# ## "/var/log/**.log" -> recursively find all .log files in /var/log
# ## "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log
# ## "/var/log/apache.log" -> just tail the apache log file
# ##
# ## See https://github.com/gobwas/glob for more examples
# ##
# files = ["/var/log/**.log"]
# ## If true, read the entire file and calculate an md5 checksum.
# md5 = false

Expand Down Expand Up @@ -980,6 +1012,11 @@
# ## databases are gathered.
# ## databases = ["app_production", "testing"]
# #
# # outputaddress = "db01"
# ## A custom name for the database that will be used as the "server" tag in the
# ## measurement output. If not specified, a default one generated from
# ## the connection address is used.
# #
# ## Define the toml config where the sql queries are stored
# ## New queries can be added, if the withdbname is set to true and there is no
# ## databases defined in the 'databases field', the sql query is ended by a
Expand All @@ -990,24 +1027,28 @@
# ## because the databases variable was set to ['postgres', 'pgbench' ] and the
# ## withdbname was true. Be careful that if the withdbname is set to false you
# ## don't have to define the where clause (aka with the dbname) the tagvalue
# ## field is used to define custom tags (separated by comas)
# ## field is used to define custom tags (separated by commas)
# ## The optional "measurement" value can be used to override the default
# ## output measurement name ("postgresql").
# #
# ## Structure :
# ## [[inputs.postgresql_extensible.query]]
# ## sqlquery string
# ## version string
# ## withdbname boolean
# ## tagvalue string (coma separated)
# ## tagvalue string (comma separated)
# ## measurement string
# [[inputs.postgresql_extensible.query]]
# sqlquery="SELECT * FROM pg_stat_database"
# version=901
# withdbname=false
# tagvalue=""
# measurement=""
# [[inputs.postgresql_extensible.query]]
# sqlquery="SELECT * FROM pg_stat_bgwriter"
# version=901
# withdbname=false
# tagvalue=""
# tagvalue="postgresql.stats"


# # Read metrics from one or many PowerDNS servers
Expand Down Expand Up @@ -1379,6 +1420,28 @@
# percentile_limit = 1000


# # Stream a log file, like the tail -f command
# [[inputs.tail]]
# ## files to tail.
# ## These accept standard unix glob matching rules, but with the addition of
# ## ** as a "super asterisk". ie:
# ## "/var/log/**.log" -> recursively find all .log files in /var/log
# ## "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log
# ## "/var/log/apache.log" -> just tail the apache log file
# ##
# ## See https://github.com/gobwas/glob for more examples
# ##
# files = ["/var/mymetrics.out"]
# ## Read file from beginning.
# from_beginning = false
#
# ## Data format to consume.
# ## Each data format has it's own unique set of configuration options, read
# ## more about them here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"


# # Generic TCP listener
# [[inputs.tcp_listener]]
# ## Address and port to host TCP listener on
Expand Down
77 changes: 77 additions & 0 deletions internal/buffer/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package buffer

import (
"github.com/influxdata/telegraf"
)

// Buffer is an object for storing metrics in a circular buffer.
type Buffer struct {
buf chan telegraf.Metric
// total dropped metrics
drops int
// total metrics added
total int
}

// NewBuffer returns a Buffer
// size is the maximum number of metrics that Buffer will cache. If Add is
// called when the buffer is full, then the oldest metric(s) will be dropped.
func NewBuffer(size int) *Buffer {
return &Buffer{
buf: make(chan telegraf.Metric, size),
}
}

// IsEmpty returns true if Buffer is empty.
func (b *Buffer) IsEmpty() bool {
return len(b.buf) == 0
}

// Len returns the current length of the buffer.
func (b *Buffer) Len() int {
return len(b.buf)
}

// Drops returns the total number of dropped metrics that have occured in this
// buffer since instantiation.
func (b *Buffer) Drops() int {
return b.drops
}

// Total returns the total number of metrics that have been added to this buffer.
func (b *Buffer) Total() int {
return b.total
}

// Add adds metrics to the buffer.
func (b *Buffer) Add(metrics ...telegraf.Metric) {
for i, _ := range metrics {
b.total++
select {
case b.buf <- metrics[i]:
default:
b.drops++
<-b.buf
b.buf <- metrics[i]
}
}
}

// Batch returns a batch of metrics of size batchSize.
// the batch will be of maximum length batchSize. It can be less than batchSize,
// if the length of Buffer is less than batchSize.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
n := min(len(b.buf), batchSize)
out := make([]telegraf.Metric, n)
for i := 0; i < n; i++ {
out[i] = <-b.buf
}
return out
}

func min(a, b int) int {
if b < a {
return b
}
return a
}
94 changes: 94 additions & 0 deletions internal/buffer/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package buffer

import (
"testing"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/assert"
)

var metricList = []telegraf.Metric{
testutil.TestMetric(2, "mymetric1"),
testutil.TestMetric(1, "mymetric2"),
testutil.TestMetric(11, "mymetric3"),
testutil.TestMetric(15, "mymetric4"),
testutil.TestMetric(8, "mymetric5"),
}

func BenchmarkAddMetrics(b *testing.B) {
buf := NewBuffer(10000)
m := testutil.TestMetric(1, "mymetric")
for n := 0; n < b.N; n++ {
buf.Add(m)
}
}

func TestNewBufferBasicFuncs(t *testing.T) {
b := NewBuffer(10)

assert.True(t, b.IsEmpty())
assert.Zero(t, b.Len())
assert.Zero(t, b.Drops())
assert.Zero(t, b.Total())

m := testutil.TestMetric(1, "mymetric")
b.Add(m)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 1)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 1)

b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 6)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 6)
}

func TestDroppingMetrics(t *testing.T) {
b := NewBuffer(10)

// Add up to the size of the buffer
b.Add(metricList...)
b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 10)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 10)

// Add 5 more and verify they were dropped
b.Add(metricList...)
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 10)
assert.Equal(t, b.Drops(), 5)
assert.Equal(t, b.Total(), 15)
}

func TestGettingBatches(t *testing.T) {
b := NewBuffer(20)

// Verify that the buffer returned is smaller than requested when there are
// not as many items as requested.
b.Add(metricList...)
batch := b.Batch(10)
assert.Len(t, batch, 5)

// Verify that the buffer is now empty
assert.True(t, b.IsEmpty())
assert.Zero(t, b.Len())
assert.Zero(t, b.Drops())
assert.Equal(t, b.Total(), 5)

// Verify that the buffer returned is not more than the size requested
b.Add(metricList...)
batch = b.Batch(3)
assert.Len(t, batch, 3)

// Verify that buffer is not empty
assert.False(t, b.IsEmpty())
assert.Equal(t, b.Len(), 2)
assert.Equal(t, b.Drops(), 0)
assert.Equal(t, b.Total(), 10)
}
Loading

0 comments on commit 97a2c2f

Please sign in to comment.