Skip to content

Commit

Permalink
Support Processor & Aggregator Plugins
Browse files Browse the repository at this point in the history
closes #1726
  • Loading branch information
sparrc committed Sep 13, 2016
1 parent bc22309 commit 41f55e5
Show file tree
Hide file tree
Showing 14 changed files with 409 additions and 25 deletions.
59 changes: 41 additions & 18 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,13 @@ func (a *Agent) gatherer(
input *models.RunningInput,
interval time.Duration,
metricC chan telegraf.Metric,
) error {
) {
defer panicRecover(input)

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
var outerr error

acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
Expand All @@ -128,17 +126,14 @@ func (a *Agent) gatherer(
gatherWithTimeout(shutdown, input, acc, interval)
elapsed := time.Since(start)

if outerr != nil {
return outerr
}
if a.Config.Agent.Debug {
log.Printf("Input [%s] gathered metrics, (%s interval) in %s\n",
input.Name, interval, elapsed)
}

select {
case <-shutdown:
return nil
return
case <-ticker.C:
continue
}
Expand Down Expand Up @@ -259,24 +254,54 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 200)

ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
// create an output metric channel and a gorouting that continously passes
// each metric onto the output plugins & aggregators.
outMetricC := make(chan telegraf.Metric, 100)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-shutdown:
// TODO aggregators should get stopped here
if len(outMetricC) > 0 {
// keep going until outMetricC is flushed
continue
}
return
case m := <-outMetricC:
// TODO send metrics to aggregators (copy all)
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(copyMetric(m))
}
}
}
}
}()

ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
for {
select {
case <-shutdown:
log.Println("Hang on, flushing any cached metrics before shutdown")
// wait for outMetricC to get flushed before flushing outputs
wg.Wait()
a.flush()
return nil
case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(copyMetric(m))
}
case metric := <-metricC:
mS := []telegraf.Metric{metric}
for _, processor := range a.Config.Processors {
mS = processor.Apply(mS...)
}
for _, m := range mS {
outMetricC <- m
}
}
}
Expand Down Expand Up @@ -353,9 +378,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}
go func(in *models.RunningInput, interv time.Duration) {
defer wg.Done()
if err := a.gatherer(shutdown, in, interv, metricC); err != nil {
log.Printf(err.Error())
}
a.gatherer(shutdown, in, interv, metricC)
}(input, interval)
}

Expand Down
16 changes: 16 additions & 0 deletions aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package telegraf

type Aggregator interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string

// Description returns a one-sentence description on the Input
Description() string

// Apply the metric to the aggregator
Apply(in Metric)

// Start starts the service filter with the given accumulator
Start(acc Accumulator)
Stop()
}
2 changes: 2 additions & 0 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (

"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
_ "github.com/influxdata/telegraf/plugins/aggregators/all"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/processors/all"
"github.com/kardianos/service"
)

Expand Down
85 changes: 82 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"

"github.com/influxdata/config"
Expand Down Expand Up @@ -47,9 +48,10 @@ type Config struct {
InputFilters []string
OutputFilters []string

Agent *AgentConfig
Inputs []*models.RunningInput
Outputs []*models.RunningOutput
Agent *AgentConfig
Inputs []*models.RunningInput
Outputs []*models.RunningOutput
Processors []*models.RunningProcessor
}

func NewConfig() *Config {
Expand All @@ -64,6 +66,7 @@ func NewConfig() *Config {
Tags: make(map[string]string),
Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*models.RunningOutput, 0),
Processors: make([]*models.RunningProcessor, 0),
InputFilters: make([]string, 0),
OutputFilters: make([]string, 0),
}
Expand Down Expand Up @@ -499,6 +502,7 @@ func (c *Config) LoadConfig(path string) error {
case "outputs":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
// legacy [outputs.influxdb] support
case *ast.Table:
if err = c.addOutput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
Expand All @@ -517,6 +521,7 @@ func (c *Config) LoadConfig(path string) error {
case "inputs", "plugins":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
// legacy [inputs.cpu] support
case *ast.Table:
if err = c.addInput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
Expand All @@ -532,6 +537,35 @@ func (c *Config) LoadConfig(path string) error {
pluginName, path)
}
}
case "filters":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addProcessor(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}
case "aggregators":
// TODO support building aggregator plugins
// for pluginName, pluginVal := range subTable.Fields {
// switch pluginSubTable := pluginVal.(type) {
// case []*ast.Table:
// for _, t := range pluginSubTable {
// if err = c.addProcessor(pluginName, t); err != nil {
// return fmt.Errorf("Error parsing %s, %s", path, err)
// }
// }
// default:
// return fmt.Errorf("Unsupported config format: %s, file %s",
// pluginName, path)
// }
// }
// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
Expand Down Expand Up @@ -572,6 +606,32 @@ func parseFile(fpath string) (*ast.Table, error) {
return toml.Parse(contents)
}

func (c *Config) addProcessor(name string, table *ast.Table) error {
creator, ok := processors.Processors[name]
if !ok {
return fmt.Errorf("Undefined but requested processor: %s", name)
}
processor := creator()

processorConfig, err := buildProcessor(name, table)
if err != nil {
return err
}

if err := config.UnmarshalTable(table, processor); err != nil {
return err
}

rf := &models.RunningProcessor{
Name: name,
Processor: processor,
Config: processorConfig,
}

c.Processors = append(c.Processors, rf)
return nil
}

func (c *Config) addOutput(name string, table *ast.Table) error {
if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) {
return nil
Expand Down Expand Up @@ -652,6 +712,25 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return nil
}

// buildProcessor TODO doc
func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
conf := &models.ProcessorConfig{Name: name}
unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop",
"tagexclude", "taginclude"}
for _, field := range unsupportedFields {
if _, ok := tbl.Fields[field]; ok {
// TODO raise error because field is not supported
}
}

var err error
conf.Filter, err = buildFilter(tbl)
if err != nil {
return conf, err
}
return conf, nil
}

// buildFilter builds a Filter
// (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to
// be inserted into the models.OutputConfig/models.InputConfig
Expand Down
37 changes: 37 additions & 0 deletions internal/models/running_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package models

import (
"github.com/influxdata/telegraf"
)

type RunningProcessor struct {
Name string
Processor telegraf.Processor
Config *ProcessorConfig
}

// FilterConfig containing a name and filter
type ProcessorConfig struct {
Name string
Filter Filter
}

func (rf *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
ret := []telegraf.Metric{}

for _, metric := range in {
if rf.Config.Filter.IsActive() {
// check if the filter should be applied to this metric
if ok := rf.Config.Filter.Apply(metric.Name(), metric.Fields(), metric.Tags()); !ok {
// this means filter should not be applied
ret = append(ret, metric)
continue
}
}
// This metric should pass through the filter, so call the filter Apply
// function and append results to the output slice.
ret = append(ret, rf.Processor.Apply(metric)...)
}

return ret
}
Loading

0 comments on commit 41f55e5

Please sign in to comment.