Skip to content

Commit

Permalink
Change plugin config to be specified as a list
Browse files Browse the repository at this point in the history
This makes plugin configuration more similar to output configuration,
where we can specify multiple plugins as a list. The idea behind this is
that the Telegraf agent can handle the multi-processing and error
handling better than each plugin handling that internally. This will
also allow for having different plugin configurations for different
instances of the same type of plugin.
  • Loading branch information
sparrc committed Nov 20, 2015
1 parent 970bfce commit 6f7001b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 16 deletions.
21 changes: 12 additions & 9 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ type runningOutput struct {
}

type runningPlugin struct {
name string
plugin plugins.Plugin
config *ConfiguredPlugin
name string
filtername string
plugin plugins.Plugin
config *ConfiguredPlugin
}

// Agent runs telegraf and collects data based on the given config
Expand Down Expand Up @@ -176,9 +177,11 @@ func (a *Agent) LoadPlugins(filters []string, config *Config) ([]string, error)
var names []string

for name, plugin := range config.PluginsDeclared() {
if sliceContains(name, filters) || len(filters) == 0 {
// Trim the ID off the output name for filtering
filtername := strings.TrimRight(name, "-0123456789")
if sliceContains(filtername, filters) || len(filters) == 0 {
config := config.GetPluginConfig(name)
a.plugins = append(a.plugins, &runningPlugin{name, plugin, config})
a.plugins = append(a.plugins, &runningPlugin{name, filtername, plugin, config})
names = append(names, name)
}
}
Expand Down Expand Up @@ -207,7 +210,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {

acc := NewAccumulator(plugin.config, pointChan)
acc.SetDebug(a.Debug)
acc.SetPrefix(plugin.name + "_")
acc.SetPrefix(plugin.filtername + "_")
acc.SetDefaultTags(a.Tags)

if err := plugin.plugin.Gather(acc); err != nil {
Expand Down Expand Up @@ -240,7 +243,7 @@ func (a *Agent) gatherSeparate(

acc := NewAccumulator(plugin.config, pointChan)
acc.SetDebug(a.Debug)
acc.SetPrefix(plugin.name + "_")
acc.SetPrefix(plugin.filtername + "_")
acc.SetDefaultTags(a.Tags)

if err := plugin.plugin.Gather(acc); err != nil {
Expand Down Expand Up @@ -286,7 +289,7 @@ func (a *Agent) Test() error {
for _, plugin := range a.plugins {
acc := NewAccumulator(plugin.config, pointChan)
acc.SetDebug(true)
acc.SetPrefix(plugin.name + "_")
acc.SetPrefix(plugin.filtername + "_")

fmt.Printf("* Plugin: %s, Collection 1\n", plugin.name)
if plugin.config.Interval != 0 {
Expand All @@ -299,7 +302,7 @@ func (a *Agent) Test() error {

// Special instructions for some plugins. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch plugin.name {
switch plugin.filtername {
case "cpu", "mongodb":
time.Sleep(500 * time.Millisecond)
fmt.Printf("* Plugin: %s, Collection 2\n", plugin.name)
Expand Down
43 changes: 36 additions & 7 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ var pluginHeader = `
###############################################################################
# PLUGINS #
###############################################################################
[plugins]
`

var servicePluginHeader = `
Expand Down Expand Up @@ -311,7 +313,7 @@ type printer interface {
}

func printConfig(name string, p printer) {
fmt.Printf("\n# %s\n[%s]", p.Description(), name)
fmt.Printf("\n# %s\n[[plugins.%s]]", p.Description(), name)
config := p.SampleConfig()
if config == "" {
fmt.Printf("\n # no configuration\n")
Expand Down Expand Up @@ -540,8 +542,34 @@ func LoadConfig(path string) (*Config, error) {
outputName)
}
}
case "plugins":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case *ast.Table:
err = c.parsePlugin(pluginName, pluginSubTable, 0)
if err != nil {
log.Printf("Could not parse config for plugin: %s\n",
pluginName)
return nil, err
}
case []*ast.Table:
for id, t := range pluginSubTable {
err = c.parsePlugin(pluginName, t, id)
if err != nil {
log.Printf("Could not parse config for plugin: %s\n",
pluginName)
return nil, err
}
}
default:
return nil, fmt.Errorf("Unsupported config format: %s",
pluginName)
}
}
// Assume it's a plugin for legacy config file support if no other
// identifiers are present
default:
err = c.parsePlugin(name, subTable)
err = c.parsePlugin(name, subTable, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -590,7 +618,7 @@ func (c *Config) parseOutput(name string, outputAst *ast.Table, id int) error {
}

// Parse a plugin config, plus plugin meta-config, out of the given *ast.Table.
func (c *Config) parsePlugin(name string, pluginAst *ast.Table) error {
func (c *Config) parsePlugin(name string, pluginAst *ast.Table, id int) error {
creator, ok := plugins.Plugins[name]
if !ok {
return fmt.Errorf("Undefined but requested plugin: %s", name)
Expand Down Expand Up @@ -682,13 +710,14 @@ func (c *Config) parsePlugin(name string, pluginAst *ast.Table) error {
delete(pluginAst.Fields, "interval")
delete(pluginAst.Fields, "tagdrop")
delete(pluginAst.Fields, "tagpass")
c.pluginFieldsSet[name] = extractFieldNames(pluginAst)
c.pluginConfigurationFieldsSet[name] = cpFields
nameID := fmt.Sprintf("%s-%d", name, id)
c.pluginFieldsSet[nameID] = extractFieldNames(pluginAst)
c.pluginConfigurationFieldsSet[nameID] = cpFields
err := toml.UnmarshalTable(pluginAst, plugin)
if err != nil {
return err
}
c.plugins[name] = plugin
c.pluginConfigurations[name] = cp
c.plugins[nameID] = plugin
c.pluginConfigurations[nameID] = cp
return nil
}

0 comments on commit 6f7001b

Please sign in to comment.