Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/logstransformprocessor] Re-enable flaky test and capture errors better #9776

Merged
merged 8 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
- `prometheusreceiver`: Handle the condition where `up` metric value is NaN (#9253)
- `tanzuobservabilityexporter`: Make metrics stanza in config be optional (#9098)
- `filelogreceiver`: Update Kubernetes examples to fix native OTel logs collection issue where 0 length logs cause errors (#9754)
- `logstransformprocessor`: Resolve node ordering to fix intermittent failures (#9761)

## v0.50.0

Expand Down
8 changes: 4 additions & 4 deletions processor/logstransformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func TestLoadConfig(t *testing.T) {
Operators: stanza.OperatorConfigs{
map[string]interface{}{
"type": "regex_parser",
"regex": "^(?P<time>\\d{4}-\\d{2}-\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$",
"regex": "^(?P<time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$",
"severity": map[string]interface{}{
"parse_from": "body.sev",
"parse_from": "attributes.sev",
},
"timestamp": map[string]interface{}{
"layout": "%Y-%m-%d",
"parse_from": "body.time",
"layout": "%Y-%m-%d %H:%M:%S",
"parse_from": "attributes.time",
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion processor/logstransformprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
go.opentelemetry.io/collector/model v0.50.0
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.uber.org/zap v1.21.0
gonum.org/v1/gonum v0.11.0
)

require (
Expand All @@ -36,7 +37,6 @@ require (
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/text v0.3.7 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.46.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
Expand Down
47 changes: 34 additions & 13 deletions processor/logstransformprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,39 @@ package logstransformprocessor // import "github.com/open-telemetry/opentelemetr
import (
"context"
"errors"
"fmt"
"math"
"runtime"
"sync"

"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/pipeline"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"
"gonum.org/v1/gonum/graph/topo"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/stanza"
)

type outputType struct {
logs pdata.Logs
err error
}

type logsTransformProcessor struct {
logger *zap.Logger
config *Config
id config.ComponentID

pipe pipeline.Pipeline
pipe *pipeline.DirectedPipeline
firstOperator operator.Operator
emitter *stanza.LogEmitter
converter *stanza.Converter
fromConverter *stanza.FromPdataConverter
wg sync.WaitGroup
outputChannel chan pdata.Logs
outputChannel chan outputType
}

func (ltp *logsTransformProcessor) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -90,6 +99,15 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos

ltp.pipe = pipe

orderedNodes, err := topo.Sort(pipe.Graph)
if err != nil {
return err
}
if len(orderedNodes) == 0 {
return errors.New("processor requires at least one operator to be configured")
}
ltp.firstOperator = orderedNodes[0].(pipeline.OperatorNode).Operator()

wkrCount := int(math.Max(1, float64(runtime.NumCPU())))
if baseCfg.Converter.WorkerCount > 0 {
wkrCount = baseCfg.Converter.WorkerCount
Expand All @@ -104,7 +122,7 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos
ltp.fromConverter = stanza.NewFromPdataConverter(wkrCount, ltp.logger)
ltp.fromConverter.Start()

ltp.outputChannel = make(chan pdata.Logs)
ltp.outputChannel = make(chan outputType)

// Below we're starting 3 loops:
// * first which reads all the logs translated by the fromConverter and then forwards
Expand Down Expand Up @@ -141,12 +159,15 @@ func (ltp *logsTransformProcessor) processLogs(ctx context.Context, ld pdata.Log
case <-doneChan:
ltp.logger.Debug("loop stopped")
return ld, errors.New("processor interrupted")
case pLogs, ok := <-ltp.outputChannel:
case output, ok := <-ltp.outputChannel:
if !ok {
return ld, errors.New("processor encountered an issue receiving logs from stanza operators pipeline")
}
if output.err != nil {
return ld, err
}

return pLogs, nil
return output.logs, nil
}
}
}
Expand All @@ -165,13 +186,14 @@ func (ltp *logsTransformProcessor) converterLoop(ctx context.Context) {
case entries, ok := <-ltp.fromConverter.OutChannel():
if !ok {
ltp.logger.Debug("fromConverter channel got closed")
continue
return
}

for _, e := range entries {
// Add item to the first operator of the pipeline manually
if err := ltp.pipe.Operators()[0].Process(ctx, e); err != nil {
ltp.logger.Error("unexpected error encountered adding entries to pipeline", zap.Error(err))
if err := ltp.firstOperator.Process(ctx, e); err != nil {
ltp.outputChannel <- outputType{err: fmt.Errorf("processor encountered an issue with the pipeline: %w", err)}
break
}
}
}
Expand All @@ -188,15 +210,14 @@ func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context) {
case <-ctx.Done():
ltp.logger.Debug("emitter loop stopped")
return

case e, ok := <-ltp.emitter.OutChannel():
if !ok {
ltp.logger.Debug("emitter channel got closed")
continue
return
}

if err := ltp.converter.Batch(e); err != nil {
ltp.logger.Error("unexpected error encountered batching logs to converter", zap.Error(err))
ltp.outputChannel <- outputType{err: fmt.Errorf("processor encountered an issue with the converter: %w", err)}
}
}
}
Expand All @@ -215,10 +236,10 @@ func (ltp *logsTransformProcessor) consumerLoop(ctx context.Context) {
case pLogs, ok := <-ltp.converter.OutChannel():
if !ok {
ltp.logger.Debug("converter channel got closed")
continue
return
}

ltp.outputChannel <- pLogs
ltp.outputChannel <- outputType{logs: pLogs, err: nil}
}
}
}
47 changes: 33 additions & 14 deletions processor/logstransformprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ var (
Operators: stanza.OperatorConfigs{
map[string]interface{}{
"type": "regex_parser",
"regex": "^(?P<time>\\d{4}-\\d{2}-\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$",
"regex": "^(?P<time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$",
"severity": map[string]interface{}{
"parse_from": "attributes.sev",
},
"timestamp": map[string]interface{}{
"layout": "%Y-%m-%d",
"layout": "%Y-%m-%d %H:%M:%S",
"parse_from": "attributes.time",
},
},
Expand Down Expand Up @@ -74,14 +74,8 @@ type testLogMessage struct {
attributes *map[string]pdata.Value
}

// Skips test without applying unused rule: https://github.com/dominikh/go-tools/issues/633#issuecomment-606560616
var skip = func(t *testing.T) {
t.Skip("Flaky test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9761")
}

func TestLogsTransformProcessor(t *testing.T) {
skip(t)
baseMessage := pcommon.NewValueString("2022-01-01 INFO this is a test")
baseMessage := pcommon.NewValueString("2022-01-01 01:02:03 INFO this is a test message")
spanID := pcommon.NewSpanID([8]byte{0x32, 0xf0, 0xa2, 0x2b, 0x6a, 0x81, 0x2c, 0xff})
traceID := pcommon.NewTraceID([16]byte{0x48, 0x01, 0x40, 0xf3, 0xd7, 0x70, 0xa5, 0xae, 0x32, 0xf0, 0xa2, 0x2b, 0x6a, 0x81, 0x2c, 0xff})
infoSeverityText := "Info"
Expand All @@ -103,22 +97,44 @@ func TestLogsTransformProcessor(t *testing.T) {
flags: uint32(0x01),
observedTime: parseTime("2006-01-02", "2022-01-02"),
},
{
body: &baseMessage,
spanID: &spanID,
traceID: &traceID,
flags: uint32(0x02),
observedTime: parseTime("2006-01-02", "2022-01-03"),
},
},
parsedMessages: []testLogMessage{
{
body: &baseMessage,
severity: plog.SeverityNumberINFO,
severityText: &infoSeverityText,
attributes: &map[string]pdata.Value{
"msg": pcommon.NewValueString("this is a test"),
"time": pcommon.NewValueString("2022-01-01"),
"msg": pcommon.NewValueString("this is a test message"),
"time": pcommon.NewValueString("2022-01-01 01:02:03"),
"sev": pcommon.NewValueString("INFO"),
},
spanID: &spanID,
traceID: &traceID,
flags: uint32(0x01),
observedTime: parseTime("2006-01-02", "2022-01-02"),
time: parseTime("2006-01-02", "2022-01-01"),
time: parseTime("2006-01-02 15:04:05", "2022-01-01 01:02:03"),
},
{
body: &baseMessage,
severity: plog.SeverityNumberINFO,
severityText: &infoSeverityText,
attributes: &map[string]pdata.Value{
"msg": pcommon.NewValueString("this is a test message"),
"time": pcommon.NewValueString("2022-01-01 01:02:03"),
"sev": pcommon.NewValueString("INFO"),
},
spanID: &spanID,
traceID: &traceID,
flags: uint32(0x02),
observedTime: parseTime("2006-01-02", "2022-01-03"),
time: parseTime("2006-01-02 15:04:05", "2022-01-01 01:02:03"),
},
},
},
Expand All @@ -142,16 +158,19 @@ func TestLogsTransformProcessor(t *testing.T) {
logs := tln.AllLogs()
require.Len(t, logs, 1)

logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Sort()
for i := 0; i < logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len(); i++ {
logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Attributes().Sort()
}
assert.EqualValues(t, wantLogData, logs[0])
})
}
}

func generateLogData(messages []testLogMessage) pdata.Logs {
ld := testdata.GenerateLogsOneEmptyResourceLogs()
scope := ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty()
for _, content := range messages {
log := ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
log := scope.LogRecords().AppendEmpty()
if content.body != nil {
content.body.CopyTo(log.Body())
}
Expand Down
8 changes: 4 additions & 4 deletions processor/logstransformprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ processors:
logstransform:
operators:
- type: regex_parser
regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
regex: '^(?P<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
timestamp:
parse_from: body.time
layout: '%Y-%m-%d'
parse_from: attributes.time
layout: '%Y-%m-%d %H:%M:%S'
severity:
parse_from: body.sev
parse_from: attributes.sev


exporters:
Expand Down