Skip to content

Commit

Permalink
[processor/transform] Wire up logs processing (open-telemetry#9368)
Browse files Browse the repository at this point in the history
* [processor/transform] Wire up logs processing

* changelog

Co-authored-by: Alex Boten <aboten@lightstep.com>
  • Loading branch information
2 people authored and djaglowski committed May 10, 2022
1 parent 1849cf8 commit d3bdfc6
Show file tree
Hide file tree
Showing 15 changed files with 529 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- `cumulativetodeltaprocessor`: add new include/exclude configuration options with regex support (#8952)
- `cmd/mdatagen`: Update generated functions to have simple parse function to handle string parsing consistently and limit code duplication across receivers (#7574)
- `attributesprocessor`: Support filter by severity (#9132)
- `processor/transform`: Add transformation of logs (#9368)

### 🧰 Bug fixes 🧰

Expand Down
21 changes: 19 additions & 2 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Transform Processor

Supported pipeline types: traces
Supported pipeline types: logs, traces

The transform processor modifies telemetry based on configuration using the Telemetry Query Language.
It takes a list of queries which are performed in the order specified in the config.
Expand Down Expand Up @@ -38,20 +38,37 @@ exporters:

processors:
transform:
logs:
queries:
- set(severity_text, "FAIL") where body == "request failed"
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
- set(body, attributes["http.route"])
traces:
queries:
- set(status.code, 1) where attributes["http.path"] == "/health"
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
- set(name, attributes["http.route"])
service:
pipelines:
logs:
receivers: [otlp]
processors: [transform]
exporters: [nop]
traces:
receivers: [otlp]
processors: [transform]
exporters: [nop]
```
This processor will perform the operations in order for all spans
This processor will perform the operations in order for
All logs
1) Set severity text to FAIL if the body contains a string text "request failed"
2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes
3) Set `body` to the `http.route` attribute if it is set

All spans

1) Set status code to OK for all spans with a path `/health`
2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes
Expand Down
22 changes: 20 additions & 2 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-co

import (
"go.opentelemetry.io/collector/config"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

type LogsConfig struct {
Queries []string `mapstructure:"queries"`

// The functions that have been registered in the extension for logs processing.
functions map[string]interface{} `mapstructure:"-"`
}

type TracesConfig struct {
Queries []string `mapstructure:"queries"`

Expand All @@ -31,12 +40,21 @@ type TracesConfig struct {
type Config struct {
config.ProcessorSettings `mapstructure:",squash"`

Logs LogsConfig `mapstructure:"logs"`
Traces TracesConfig `mapstructure:"traces"`
}

var _ config.Processor = (*Config)(nil)

func (c *Config) Validate() error {
_, err := common.ParseQueries(c.Traces.Queries, c.Traces.functions, traces.ParsePath)
return err
var errors error
_, err := common.ParseQueries(c.Logs.Queries, c.Logs.functions, logs.ParsePath)
if err != nil {
errors = multierr.Append(errors, err)
}
_, err = common.ParseQueries(c.Traces.Queries, c.Traces.functions, traces.ParsePath)
if err != nil {
errors = multierr.Append(errors, err)
}
return errors
}
25 changes: 21 additions & 4 deletions processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

func TestLoadingConfig(t *testing.T) {
Expand All @@ -40,13 +41,21 @@ func TestLoadingConfig(t *testing.T) {
p0 := cfg.Processors[config.NewComponentID(typeStr)]
assert.Equal(t, p0, &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Logs: LogsConfig{
Queries: []string{
`set(body, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: logs.DefaultFunctions(),
},
Traces: TracesConfig{
Queries: []string{
`set(name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: common.DefaultFunctions(),
functions: traces.DefaultFunctions(),
},
})
}
Expand All @@ -58,11 +67,19 @@ func TestLoadInvalidConfig(t *testing.T) {
factory := NewFactory()
factories.Processors[typeStr] = factory

cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax.yaml"), factories)
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_log.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_log.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_trace.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function.yaml"), factories)
cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_trace.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)
}
26 changes: 26 additions & 0 deletions processor/transformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

Expand All @@ -36,13 +37,19 @@ func NewFactory() component.ProcessorFactory {
return component.NewProcessorFactory(
typeStr,
createDefaultConfig,
component.WithLogsProcessor(createLogsProcessor),
component.WithTracesProcessor(createTracesProcessor),
)
}

func createDefaultConfig() config.Processor {
return &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Logs: LogsConfig{
Queries: []string{},

functions: logs.DefaultFunctions(),
},
Traces: TracesConfig{
Queries: []string{},

Expand All @@ -51,6 +58,25 @@ func createDefaultConfig() config.Processor {
}
}

func createLogsProcessor(
_ context.Context,
settings component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (component.LogsProcessor, error) {
oCfg := cfg.(*Config)

proc, err := logs.NewProcessor(oCfg.Logs.Queries, oCfg.Logs.functions, settings)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
proc.ProcessLogs,
processorhelper.WithCapabilities(processorCapabilities))
}

func createTracesProcessor(
_ context.Context,
settings component.ProcessorCreateSettings,
Expand Down
36 changes: 34 additions & 2 deletions processor/transformprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

func TestFactory_Type(t *testing.T) {
Expand All @@ -38,10 +40,15 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.Equal(t, cfg, &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Logs: LogsConfig{
Queries: []string{},

functions: logs.DefaultFunctions(),
},
Traces: TracesConfig{
Queries: []string{},

functions: common.DefaultFunctions(),
functions: traces.DefaultFunctions(),
},
})
assert.NoError(t, configtest.CheckConfigStruct(cfg))
Expand All @@ -64,6 +71,31 @@ func TestFactoryCreateTracesProcessor_InvalidActions(t *testing.T) {
assert.Nil(t, ap)
}

func TestFactoryCreateLogsProcessor(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
oCfg.Logs.Queries = []string{`set(attributes["test"], "pass") where body == "operationA"`}

lp, err := factory.CreateLogsProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
assert.NotNil(t, lp)
assert.NoError(t, err)

ld := plog.NewLogs()
log := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
log.Body().SetStringVal("operationA")

_, ok := log.Attributes().Get("test")
assert.False(t, ok)

err = lp.ConsumeLogs(context.Background(), ld)
assert.NoError(t, err)

val, ok := log.Attributes().Get("test")
assert.True(t, ok)
assert.Equal(t, "pass", val.StringVal())
}

func TestFactoryCreateTracesProcessor(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
Expand Down
24 changes: 24 additions & 0 deletions processor/transformprocessor/internal/logs/functions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logs // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func DefaultFunctions() map[string]interface{} {
// No logs-only functions yet.
return common.DefaultFunctions()
}
Loading

0 comments on commit d3bdfc6

Please sign in to comment.