Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processor & Aggregator Plugin Support #1726

Closed
sparrc opened this issue Sep 8, 2016 · 14 comments · Fixed by #1777
Closed

Processor & Aggregator Plugin Support #1726

sparrc opened this issue Sep 8, 2016 · 14 comments · Fixed by #1777
Labels
feature request Requests for new plugin and for new features to existing plugins
Milestone

Comments

@sparrc
Copy link
Contributor

sparrc commented Sep 8, 2016

EDIT: the proposed Aggregator interface has changed, see #1726 (comment)

Proposal:

processor & aggregator plugins will be new types of plugins that sit in-between input and output plugins.

If there are processors or aggregators defined in the config, then all metrics will pass through them before being passed onto the output plugins.

processor plugins will generically support matching based on (with globbing):

  1. tag key/value
  2. measurement name
  3. field keys

aggregator plugins will generically support matching based on (with globbing):

  1. tag key/value
  2. measurement name
  3. field keys

An initial implementation has been written by @alimousazy in this PR: #1364, but I would like to consider here the structure & interface of processor plugins independent of the histogram/aggregator feature.

My proposal for the processor interface differs a bit from that PR. While that PR presents an interesting way of streaming metrics through multiple channels, it also raises an important question of how large to create each channel, which could greatly increase the total possible buffer size of telegraf.

Channels are great for multiple processes to run concurrently and aggregate their work in one place, but this is not actually the workflow of a processor plugin. For each metric that comes from the input plugins, each processor will need to be applied, and after all processors are applied the metric(s) will be passed onto the aggregator plugin(s) & output plugin(s).

The original metric will therefore get sent directly to the output plugins, while the aggregator plugins are free to process the metric as they need, adding their metrics to their accumulator as they need.

type Processor interface {
    // SampleConfig returns the default configuration of the Input
    SampleConfig() string

    // Description returns a one-sentence description on the Input
    Description() string

    // Apply the processor to the given metric
    Apply(in ...telegraf.Metric) []telegraf.Metric
}

type Aggregator interface {
    // SampleConfig returns the default configuration of the Input
    SampleConfig() string

    // Description returns a one-sentence description on the Input
    Description() string

    // Apply the metric to the aggregator
    Apply(in telegraf.Metric)

    // Start starts the aggregator
    Start(acc telegraf.Accumulator)
    Stop()
}

Use case: [Why is this important (helps with prioritizing requests)]

some of the uses of these plugins:

  1. dropping metrics
  2. aggregating metrics
  3. adding & removing tags
  4. adding & removing fields
  5. modifying fields, measurement names, tags, etc.

Open Questions:

  1. Ordering: how do we deal with ordering of processors? do we need to support an argument for users to manually order the plugins? or can we rely on the configuration file to provide the order for us?
  2. Allocations: what affect are processor plugins going to have on allocations?
@sparrc sparrc added the feature request Requests for new plugin and for new features to existing plugins label Sep 8, 2016
@sparrc sparrc added this to the 1.1.0 milestone Sep 8, 2016
sparrc added a commit that referenced this issue Sep 8, 2016
sparrc added a commit that referenced this issue Sep 8, 2016
sparrc added a commit that referenced this issue Sep 8, 2016
sparrc added a commit that referenced this issue Sep 8, 2016
sparrc added a commit that referenced this issue Sep 8, 2016
@alimousazy
Copy link
Contributor

Hi,

I totally agree on that channel will increase complexity in term of memory usage and execution, but I have a question regarding the proposed design. since some of these filters may need to have another trigger other than metric arrival for example Histogram may need to flush data every 1 minute (Aggregate date) how we can handle that . Another thing Filter mapping of in and out metric is not always one to one for example histogram or dropping filters may decide not pass metric. another case when filter flushing metrics ex-histogram it might return multi-metrics instead of one.

@sparrc
Copy link
Contributor Author

sparrc commented Sep 9, 2016

since some of these filters may need to have another trigger other than metric arrival for example Histogram may need to flush data every 1 minute (Aggregate date) how we can handle that .

That's a good point, I think it might be necessary to define two types of plugins: filters and aggregators. Aggregators would behave sort of like a "service filter" where they have continuous access to an output channel.

I'll come up with a design overview for this soon.

Another thing Filter mapping of in and out metric is not always one to one for example histogram or dropping filters may decide not pass metric. another case when filter flushing metrics ex-histogram it might return multi-metrics instead of one.

agreed, I have updated the Apply function to reflect this (accepting and returning lists)

@sparrc
Copy link
Contributor Author

sparrc commented Sep 9, 2016

Updated design, this is to take into account the need for two different types of plugins: filters & aggregators.

@alimousazy
Copy link
Contributor

While I do feel that this model will solve flushing metrics in active state component, but active state filters still considered as filter and its output should go throw other filters based on order. in suggested design active state filters which does flush metric sepreatly will by pass other filters and push metrics directly to output plugin. It came to my mind that the apply pattern which we are using looks similar to channel with no buffer if you ignore the cost of creating channel in term of memory and functionality, while I still feel that the channel is overkill for such functionliaty.

@sparrc
Copy link
Contributor Author

sparrc commented Sep 9, 2016

@alimousazy I don't quite understand what you're suggesting, just eliminating the channel directly before the outputs? That channel shouldn't need to have a large buffer as it will have a goroutine constantly reading off of it.

@alimousazy
Copy link
Contributor

@sparrc what I meant since Histogram will emit metrics every minute these metrics should pass also to other filters like drop filter ... etc . based on my understanding the latest design that your proposed Histogram metric will go directly to output plugin .

@sparrc
Copy link
Contributor Author

sparrc commented Sep 9, 2016

yes, correct, the metrics coming from the aggregators would have the same fields as the metrics they are aggregating

@alimousazy
Copy link
Contributor

Nice ideas for filters https://hekad.readthedocs.io/en/v0.10.0/config/filters/index.html#config-filters.

I would like to work on Lua sandbox for input plugins, filters and output plugins, this will make it easier for the end user to load diffrent kind of plugins at run time without the need of implementing it in Go.

@sparrc
Copy link
Contributor Author

sparrc commented Sep 12, 2016

I actually don't like the concept of making filter plugins that are specific to any input or output plugin. The filter plugins should perform generic tasks on any metric passing through, rather than being specifically defined for only a single type of plugin.

If you'd like to work on a Lua sandbox please open a separate issue where we can discuss the design of that. I'm not 100% convinced it's necessary, to me it seems like we can serve non-Go needs with the exec plugin, but I'd like to discuss in case there is a compelling case for it.

sparrc added a commit that referenced this issue Sep 12, 2016
@sparrc
Copy link
Contributor Author

sparrc commented Sep 12, 2016

@alimousazy Any more thoughts on the design I've outlined above? I know that you've raised the issue that aggregated metrics don't get passed onto the filter.

In my view, since aggregations will be "opt-in" metrics, there is no need to further pass them on to the filter plugins, it will be sufficient for the aggregators to create output metrics based on the incoming metrics that have already been filtered.

sparrc added a commit that referenced this issue Sep 12, 2016
sparrc added a commit that referenced this issue Sep 12, 2016
@alimousazy
Copy link
Contributor

I still feel that aggregated data should pass by other filters, let me give you use cases

1- Bandwidth limit filter should be able to limit number of emited metrics regardless if it aggregated or not, therefor aggregated metrics should pass by that filter.

2- Metrics shaping filter should be able to rename metrics regardless if it coming from aggregator or normal input plugin.

3- Anonmly detection filter should be able to work on aggregated data as well as normal data.

These use cases that I was able to recall.

@sparrc

sparrc added a commit that referenced this issue Sep 13, 2016
sparrc added a commit that referenced this issue Sep 13, 2016
@sparrc
Copy link
Contributor Author

sparrc commented Sep 13, 2016

fair enough, we'll probably need to make a separate AggregatorAccumulator for adding aggregated metrics to the first metric channel. That accumulator will have some way of marking a metric as an "aggregate", so that after it passes thru the filters it does not get re-sent to the aggregators, so flow would look like this:

 ┌───────────┐                                       ┌───────────┐                        ┌───────────┐
 │  inputs   │                                       │processors │                        │  outputs  │
 │   .cpu    │────┐                          ┌──────▶│  .tagger  │                     ┌─▶│ .influxdb │
 │           │    │                          │       │           │                     │  │           │
 └───────────┘    │                          │       └───────────┘                     │  └───────────┘
                  │    ┌─────────────┐       │             │           ┌─────────────┐ │
                  ├───▶│ metric chan │───────┘             │        ┌─▶│ metric chan │─┤
                  │    └─────────────┘                     ▼        │  └─────────────┘ │
 ┌───────────┐    │           ▲                      ┌───────────┐  │                  │  ┌───────────┐
 │  inputs   │    │           │                      │processors │  │                  │  │  outputs  │
 │   .mem    │────┘           │                      │ .renamer  │──┤                  └─▶│   .file   │
 │           │         original metric               │           │  │                     │           │
 └───────────┘       ┌─────not sent─────┐            └───────────┘  │                     └───────────┘
                     │                  │                           │
                     │                  │                           │
                     │                  │                   aggregate metrics
                     │                  │                       not sent
                     │                  │                           │
               ┌───────────┐      ┌───────────┐                     │
               │aggregators│      │aggregators│                     │
               │.histogram │      │   .min    │◀────────────────────┤
               │           │      │           │                     │
               └───────────┘      └───────────┘                     │
                     ▲                                              │
                     │                                              │
                     └──────────────────────────────────────────────┘                                                                    

sparrc added a commit that referenced this issue Sep 13, 2016
@sparrc sparrc changed the title Filter Plugin Support Processor & Aggregator Plugin Support Sep 13, 2016
sparrc added a commit that referenced this issue Sep 13, 2016
@sparrc
Copy link
Contributor Author

sparrc commented Sep 13, 2016

@alimousazy fyi I've renamed "filters" to "processors" because it was a bit of name overload. Filters already refer to the metric filter options that users can apply to plugins (tagdrop, tagpass, tagexclude, taginclude, etc.).

sparrc added a commit that referenced this issue Sep 13, 2016
sparrc added a commit that referenced this issue Sep 16, 2016
sparrc added a commit that referenced this issue Sep 16, 2016
sparrc added a commit that referenced this issue Sep 19, 2016
sparrc added a commit that referenced this issue Sep 19, 2016
sparrc added a commit that referenced this issue Sep 19, 2016
sparrc added a commit that referenced this issue Sep 19, 2016
sparrc added a commit that referenced this issue Sep 19, 2016
sparrc added a commit that referenced this issue Sep 19, 2016
sparrc added a commit that referenced this issue Sep 19, 2016
sparrc added a commit that referenced this issue Sep 19, 2016
sparrc added a commit that referenced this issue Sep 20, 2016
sparrc added a commit that referenced this issue Sep 20, 2016
sparrc added a commit that referenced this issue Sep 21, 2016
sparrc added a commit that referenced this issue Sep 23, 2016
sparrc added a commit that referenced this issue Sep 23, 2016
sparrc added a commit that referenced this issue Sep 27, 2016
@sparrc
Copy link
Contributor Author

sparrc commented Sep 27, 2016

Updating the Aggregator interface due to some internal discussions and implementation details that came up.

We have decided that it would be best if all aggregator plugins were required to have a period parameter which allows the user to configure how large the bucket of each aggregation is. This is going to be a required argument, with a default value of 30s.

Because of this, the flushing of aggregator plugins can be done outside of the plugin itself, and thus the interface will be simplified to not require Start/Stop functions. Instead, the Push function pushes the current aggregated metrics to the given accumulator (similar to the input plugin Gather(acc) function). The Reset() function resets the aggregator's internal buffers and starts counting new aggregations.

The way the plugin is handled, no locking needs to be done, the Reset/Push/Add functions will never conflict with one another.

type Aggregator interface {
    // SampleConfig returns the default configuration of the Input.
    SampleConfig() string

    // Description returns a one-sentence description on the Input.
    Description() string

    // Add the metric to the aggregator.
    Add(in Metric)

    // Push pushes the current aggregates to the accumulator.
    Push(acc Accumulator)

    // Reset resets the aggregators caches and aggregates.
    Reset()
}

sparrc added a commit that referenced this issue Sep 28, 2016
sparrc added a commit that referenced this issue Sep 28, 2016
sparrc added a commit that referenced this issue Oct 3, 2016
sparrc added a commit that referenced this issue Oct 4, 2016
sparrc added a commit that referenced this issue Oct 5, 2016
sparrc added a commit that referenced this issue Oct 6, 2016
sparrc added a commit that referenced this issue Oct 7, 2016
sparrc added a commit that referenced this issue Oct 10, 2016
sparrc added a commit that referenced this issue Oct 12, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request Requests for new plugin and for new features to existing plugins
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants