Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Valuecounter aggregator plugin #3523

Merged
merged 9 commits into from
Jun 19, 2018
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ formats may be used with input plugins supporting the `data_format` option:
* [basicstats](./plugins/aggregators/basicstats)
* [minmax](./plugins/aggregators/minmax)
* [histogram](./plugins/aggregators/histogram)
* [valuecounter](./plugins/aggregators/valuecounter)

## Output Plugins

Expand Down
1 change: 1 addition & 0 deletions plugins/aggregators/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ import (
_ "github.com/influxdata/telegraf/plugins/aggregators/basicstats"
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
_ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter"
)
73 changes: 73 additions & 0 deletions plugins/aggregators/valuecounter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# ValueCounter Aggregator Plugin

The valuecounter plugin counts the occurrence of values in fields and emits the
counter once every 'period' seconds.

A use case for the valuecounter plugin is when you are processing a HTTP access
log (with the logparser input) and want to count the HTTP status codes.

The fields which will be counted must be configured with the `fields`
configuration directive. When no `fields` is provided the plugin will not count
any fields. The results are emitted in fields in the format:
`originalfieldname_fieldvalue = count`.

Valuecounter only works on fields of the type int, bool or string. Float fields
are being dropped to prevent the creating of too many fields.

### Configuration:

```toml
[[aggregators.valuecounter]]
## General Aggregator Arguments:
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## The fields for which the values will be counted
fields = ["status"]
```

### Measurements & Fields:

- measurement1
- field_value1
- field_value2

### Tags:

No tags are applied by this aggregator.

### Example Output:

Example for parsing a HTTP access log.

telegraf.conf:
```
[[inputs.logparser]]
files = ["/tmp/tst.log"]
[inputs.logparser.grok]
patterns = ['%{DATA:url:tag} %{NUMBER:response:string}']
measurement = "access"

[[aggregators.valuecounter]]
namepass = ["access"]
fields = ["response"]
```

/tmp/tst.log
```
/some/path 200
/some/path 401
/some/path 200
```

```
$ telegraf --config telegraf.conf --quiet

access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991487011
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="401" 1511948755991522282
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991531697

access,path=/tmp/tst.log,host=localhost.localdomain,url=/some/path response_200=2i,response_401=1i 1511948761000000000
```
108 changes: 108 additions & 0 deletions plugins/aggregators/valuecounter/valuecounter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package valuecounter

import (
"fmt"
"log"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
)

type aggregate struct {
name string
tags map[string]string
fieldCount map[string]int
}

// ValueCounter an aggregation plugin
type ValueCounter struct {
cache map[uint64]aggregate
Fields []string
}

// NewValueCounter create a new aggregation plugin which counts the occurances
// of fields and emits the count.
func NewValueCounter() telegraf.Aggregator {
vc := &ValueCounter{}
vc.Reset()
return vc
}

var sampleConfig = `
## General Aggregator Arguments:
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## The fields for which the values will be counted
fields = []
`

// SampleConfig generates a sample config for the ValueCounter plugin
func (vc *ValueCounter) SampleConfig() string {
return sampleConfig
}

// Description returns the description of the ValueCounter plugin
func (vc *ValueCounter) Description() string {
return "Count the occurance of values in fields."
}

// Add is run on every metric which passes the plugin
func (vc *ValueCounter) Add(in telegraf.Metric) {
id := in.HashID()

// Check if the cache already has an entry for this metric, if not create it
if _, ok := vc.cache[id]; !ok {
a := aggregate{
name: in.Name(),
tags: in.Tags(),
fieldCount: make(map[string]int),
}
vc.cache[id] = a
}

// Check if this metric has fields which we need to count, if so increment
// the count.
for fk, fv := range in.Fields() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should skip floats? One thing that would be bad is if there is a frequently changing float value, which could result in many new fields.

If we don't disallow this, maybe we should add a warning in the README.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion, will check if I can implement this

for _, cf := range vc.Fields {
if fk == cf {
// Do not process float types to prevent memory from blowing up
switch fv.(type) {
default:
log.Printf("I! Valuecounter: Unsupported field type. " +
"Must be an int, string or bool. Ignoring.")
continue
case uint64, int64, string, bool:
}
fn := fmt.Sprintf("%v_%v", fk, fv)
vc.cache[id].fieldCount[fn]++
}
}
}
}

// Push emits the counters
func (vc *ValueCounter) Push(acc telegraf.Accumulator) {
for _, agg := range vc.cache {
fields := map[string]interface{}{}

for field, count := range agg.fieldCount {
fields[field] = count
}

acc.AddFields(agg.name, fields, agg.tags)
}
}

// Reset the cache, executed after each push
func (vc *ValueCounter) Reset() {
vc.cache = make(map[uint64]aggregate)
}

func init() {
aggregators.Add("valuecounter", func() telegraf.Aggregator {
return NewValueCounter()
})
}
126 changes: 126 additions & 0 deletions plugins/aggregators/valuecounter/valuecounter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package valuecounter

import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)

// Create a valuecounter with config
func NewTestValueCounter(fields []string) telegraf.Aggregator {
vc := &ValueCounter{
Fields: fields,
}
vc.Reset()

return vc
}

var m1, _ = metric.New("m1",
map[string]string{"foo": "bar"},
map[string]interface{}{
"status": 200,
"somefield": 20.1,
"foobar": "bar",
},
time.Now(),
)

var m2, _ = metric.New("m1",
map[string]string{"foo": "bar"},
map[string]interface{}{
"status": "OK",
"ignoreme": "string",
"andme": true,
"boolfield": false,
},
time.Now(),
)

func BenchmarkApply(b *testing.B) {
vc := NewTestValueCounter([]string{"status"})

for n := 0; n < b.N; n++ {
vc.Add(m1)
vc.Add(m2)
}
}

// Test basic functionality
func TestBasic(t *testing.T) {
vc := NewTestValueCounter([]string{"status"})
acc := testutil.Accumulator{}

vc.Add(m1)
vc.Add(m2)
vc.Add(m1)
vc.Push(&acc)

expectedFields := map[string]interface{}{
"status_200": 2,
"status_OK": 1,
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test with multiple fields to count
func TestMultipleFields(t *testing.T) {
vc := NewTestValueCounter([]string{"status", "somefield", "boolfield"})
acc := testutil.Accumulator{}

vc.Add(m1)
vc.Add(m2)
vc.Add(m2)
vc.Add(m1)
vc.Push(&acc)

expectedFields := map[string]interface{}{
"status_200": 2,
"status_OK": 2,
"boolfield_false": 2,
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test with a reset between two runs
func TestWithReset(t *testing.T) {
vc := NewTestValueCounter([]string{"status"})
acc := testutil.Accumulator{}

vc.Add(m1)
vc.Add(m1)
vc.Add(m2)
vc.Push(&acc)

expectedFields := map[string]interface{}{
"status_200": 2,
"status_OK": 1,
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)

acc.ClearMetrics()
vc.Reset()

vc.Add(m2)
vc.Add(m2)
vc.Add(m1)
vc.Push(&acc)

expectedFields = map[string]interface{}{
"status_200": 1,
"status_OK": 2,
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}