Skip to content

Commit

Permalink
Support Filter & Aggregator Plugins
Browse files Browse the repository at this point in the history
closes #1726
  • Loading branch information
sparrc committed Sep 13, 2016
1 parent bc22309 commit b3600c7
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 12 deletions.
46 changes: 38 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,24 +259,54 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 200)

ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
// create an output metric channel and a gorouting that continously passes
// each metric onto the output plugins & aggregators.
outMetricC := make(chan telegraf.Metric, 100)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-shutdown:
// TODO aggregators should get stopped here
if len(outMetricC) > 0 {
// keep going until outMetricC is flushed
continue
}
return
case m := <-outMetricC:
// TODO send metrics to aggregators (copy all)
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(copyMetric(m))
}
}
}
}
}()

ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
for {
select {
case <-shutdown:
log.Println("Hang on, flushing any cached metrics before shutdown")
// wait for outMetricC to get flushed before flushing outputs
wg.Wait()
a.flush()
return nil
case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(copyMetric(m))
}
case metric := <-metricC:
mS := []telegraf.Metric{metric}
for _, filter := range a.Config.Filters {
mS = filter.Apply(mS...)
}
for _, m := range mS {
outMetricC <- m
}
}
}
Expand Down
1 change: 1 addition & 0 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
_ "github.com/influxdata/telegraf/plugins/filters/all"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down
12 changes: 12 additions & 0 deletions filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package telegraf

type FilterPlugin interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string

// Description returns a one-sentence description on the Input
Description() string

// Apply the filter to the given metric
Apply(in ...Metric) []Metric
}
64 changes: 64 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/filters"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
Expand Down Expand Up @@ -50,6 +51,7 @@ type Config struct {
Agent *AgentConfig
Inputs []*models.RunningInput
Outputs []*models.RunningOutput
Filters []*models.RunningFilterPlugin
}

func NewConfig() *Config {
Expand All @@ -64,6 +66,7 @@ func NewConfig() *Config {
Tags: make(map[string]string),
Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*models.RunningOutput, 0),
Filters: make([]*models.RunningFilterPlugin, 0),
InputFilters: make([]string, 0),
OutputFilters: make([]string, 0),
}
Expand Down Expand Up @@ -499,6 +502,7 @@ func (c *Config) LoadConfig(path string) error {
case "outputs":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
// legacy [outputs.influxdb] support
case *ast.Table:
if err = c.addOutput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
Expand All @@ -517,6 +521,7 @@ func (c *Config) LoadConfig(path string) error {
case "inputs", "plugins":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
// legacy [inputs.cpu] support
case *ast.Table:
if err = c.addInput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
Expand All @@ -532,6 +537,20 @@ func (c *Config) LoadConfig(path string) error {
pluginName, path)
}
}
case "filters":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addFilterPlugin(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}
// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
Expand Down Expand Up @@ -572,6 +591,32 @@ func parseFile(fpath string) (*ast.Table, error) {
return toml.Parse(contents)
}

func (c *Config) addFilterPlugin(name string, table *ast.Table) error {
creator, ok := filters.Filters[name]
if !ok {
return fmt.Errorf("Undefined but requested filter: %s", name)
}
filter := creator()

filterConfig, err := buildFilterPlugin(name, table)
if err != nil {
return err
}

if err := config.UnmarshalTable(table, filter); err != nil {
return err
}

rf := &models.RunningFilterPlugin{
Name: name,
FilterPlugin: filter,
Config: filterConfig,
}

c.Filters = append(c.Filters, rf)
return nil
}

func (c *Config) addOutput(name string, table *ast.Table) error {
if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) {
return nil
Expand Down Expand Up @@ -652,6 +697,25 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return nil
}

// buildFilterPlugin TODO doc
func buildFilterPlugin(name string, tbl *ast.Table) (*models.FilterPluginConfig, error) {
conf := &models.FilterPluginConfig{Name: name}
unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop",
"tagexclude", "taginclude"}
for _, field := range unsupportedFields {
if _, ok := tbl.Fields[field]; ok {
// TODO raise error because field is not supported
}
}

var err error
conf.Filter, err = buildFilter(tbl)
if err != nil {
return conf, err
}
return conf, nil
}

// buildFilter builds a Filter
// (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to
// be inserted into the models.OutputConfig/models.InputConfig
Expand Down
37 changes: 37 additions & 0 deletions internal/models/running_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package models

import (
"github.com/influxdata/telegraf"
)

type RunningFilterPlugin struct {
Name string
FilterPlugin telegraf.FilterPlugin
Config *FilterPluginConfig
}

// FilterConfig containing a name and filter
type FilterPluginConfig struct {
Name string
Filter Filter
}

func (rf *RunningFilterPlugin) Apply(in ...telegraf.Metric) []telegraf.Metric {
ret := []telegraf.Metric{}

for _, metric := range in {
if rf.Config.Filter.IsActive() {
// check if the filter should be applied to this metric
if ok := rf.Config.Filter.Apply(metric.Name(), metric.Fields(), metric.Tags()); !ok {
// this means filter should not be applied
ret = append(ret, metric)
continue
}
}
// This metric should pass through the filter, so call the filter Apply
// function and append results to the output slice.
ret = append(ret, rf.FilterPlugin.Apply(metric)...)
}

return ret
}
117 changes: 117 additions & 0 deletions internal/models/running_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package models

import (
"testing"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/assert"
)

type TestFilterPlugin struct {
}

func (f *TestFilterPlugin) SampleConfig() string { return "" }
func (f *TestFilterPlugin) Description() string { return "" }

// Apply renames:
// "foo" to "fuz"
// "bar" to "baz"
// And it also drops measurements named "dropme"
func (f *TestFilterPlugin) Apply(in ...telegraf.Metric) []telegraf.Metric {
out := make([]telegraf.Metric, 0)
for _, m := range in {
switch m.Name() {
case "foo":
out = append(out, testutil.TestMetric(1, "fuz"))
case "bar":
out = append(out, testutil.TestMetric(1, "baz"))
case "dropme":
// drop the metric!
default:
out = append(out, m)
}
}
return out
}

func NewTestRunningFilterPlugin() *RunningFilterPlugin {
out := &RunningFilterPlugin{
Name: "test",
FilterPlugin: &TestFilterPlugin{},
Config: &FilterPluginConfig{Filter: Filter{}},
}
return out
}

func TestRunningFilterPlugin(t *testing.T) {
inmetrics := []telegraf.Metric{
testutil.TestMetric(1, "foo"),
testutil.TestMetric(1, "bar"),
testutil.TestMetric(1, "baz"),
}

expectedNames := []string{
"fuz",
"baz",
"baz",
}
rfp := NewTestRunningFilterPlugin()
filteredMetrics := rfp.Apply(inmetrics...)

actualNames := []string{
filteredMetrics[0].Name(),
filteredMetrics[1].Name(),
filteredMetrics[2].Name(),
}
assert.Equal(t, expectedNames, actualNames)
}

func TestRunningFilterPlugin_WithNameDrop(t *testing.T) {
inmetrics := []telegraf.Metric{
testutil.TestMetric(1, "foo"),
testutil.TestMetric(1, "bar"),
testutil.TestMetric(1, "baz"),
}

expectedNames := []string{
"foo",
"baz",
"baz",
}
rfp := NewTestRunningFilterPlugin()

rfp.Config.Filter.NameDrop = []string{"foo"}
assert.NoError(t, rfp.Config.Filter.Compile())

filteredMetrics := rfp.Apply(inmetrics...)

actualNames := []string{
filteredMetrics[0].Name(),
filteredMetrics[1].Name(),
filteredMetrics[2].Name(),
}
assert.Equal(t, expectedNames, actualNames)
}

func TestRunningFilterPlugin_DroppedMetric(t *testing.T) {
inmetrics := []telegraf.Metric{
testutil.TestMetric(1, "dropme"),
testutil.TestMetric(1, "foo"),
testutil.TestMetric(1, "bar"),
}

expectedNames := []string{
"fuz",
"baz",
}
rfp := NewTestRunningFilterPlugin()
filteredMetrics := rfp.Apply(inmetrics...)

actualNames := []string{
filteredMetrics[0].Name(),
filteredMetrics[1].Name(),
}
assert.Equal(t, expectedNames, actualNames)
}
4 changes: 0 additions & 4 deletions internal/models/running_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func TestRunningOutput_PassFilter(t *testing.T) {
func TestRunningOutput_TagIncludeNoMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{

TagInclude: []string{"nothing*"},
},
}
Expand All @@ -154,7 +153,6 @@ func TestRunningOutput_TagIncludeNoMatch(t *testing.T) {
func TestRunningOutput_TagExcludeMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{

TagExclude: []string{"tag*"},
},
}
Expand All @@ -176,7 +174,6 @@ func TestRunningOutput_TagExcludeMatch(t *testing.T) {
func TestRunningOutput_TagExcludeNoMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{

TagExclude: []string{"nothing*"},
},
}
Expand All @@ -198,7 +195,6 @@ func TestRunningOutput_TagExcludeNoMatch(t *testing.T) {
func TestRunningOutput_TagIncludeMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{

TagInclude: []string{"tag*"},
},
}
Expand Down
5 changes: 5 additions & 0 deletions plugins/filters/all/all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package all

import (
_ "github.com/influxdata/telegraf/plugins/filters/printer"
)
Loading

0 comments on commit b3600c7

Please sign in to comment.