From 275accd1d50a656bcb393c8bd9fe417501114b67 Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Fri, 6 May 2022 13:03:52 -0400 Subject: [PATCH 1/6] Re-enable flaky test and capture errors better for logstransformprocessor --- processor/logstransformprocessor/processor.go | 24 +++++++++++++------ .../logstransformprocessor/processor_test.go | 6 ----- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/processor/logstransformprocessor/processor.go b/processor/logstransformprocessor/processor.go index 1844a703b131..710794445b39 100644 --- a/processor/logstransformprocessor/processor.go +++ b/processor/logstransformprocessor/processor.go @@ -17,6 +17,7 @@ package logstransformprocessor // import "github.com/open-telemetry/opentelemetr import ( "context" "errors" + "fmt" "math" "runtime" "sync" @@ -30,6 +31,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/stanza" ) +type outputType struct { + logs pdata.Logs + err error +} + type logsTransformProcessor struct { logger *zap.Logger config *Config @@ -40,7 +46,7 @@ type logsTransformProcessor struct { converter *stanza.Converter fromConverter *stanza.FromPdataConverter wg sync.WaitGroup - outputChannel chan pdata.Logs + outputChannel chan outputType } func (ltp *logsTransformProcessor) Shutdown(ctx context.Context) error { @@ -104,7 +110,7 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos ltp.fromConverter = stanza.NewFromPdataConverter(wkrCount, ltp.logger) ltp.fromConverter.Start() - ltp.outputChannel = make(chan pdata.Logs) + ltp.outputChannel = make(chan outputType) // Below we're starting 3 loops: // * first which reads all the logs translated by the fromConverter and then forwards @@ -141,12 +147,15 @@ func (ltp *logsTransformProcessor) processLogs(ctx context.Context, ld pdata.Log case <-doneChan: ltp.logger.Debug("loop stopped") return ld, errors.New("processor interrupted") - case pLogs, ok := <-ltp.outputChannel: + case output, ok := <-ltp.outputChannel: if !ok { return ld, errors.New("processor encountered an issue receiving logs from stanza operators pipeline") } + if output.err != nil { + return ld, err + } - return pLogs, nil + return output.logs, nil } } } @@ -171,7 +180,8 @@ func (ltp *logsTransformProcessor) converterLoop(ctx context.Context) { for _, e := range entries { // Add item to the first operator of the pipeline manually if err := ltp.pipe.Operators()[0].Process(ctx, e); err != nil { - ltp.logger.Error("unexpected error encountered adding entries to pipeline", zap.Error(err)) + ltp.outputChannel <- outputType{err: fmt.Errorf("processor encountered an issue with the pipeline: %w", err)} + break } } } @@ -196,7 +206,7 @@ func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context) { } if err := ltp.converter.Batch(e); err != nil { - ltp.logger.Error("unexpected error encountered batching logs to converter", zap.Error(err)) + ltp.outputChannel <- outputType{err: fmt.Errorf("processor encountered an issue with the converter: %w", err)} } } } @@ -218,7 +228,7 @@ func (ltp *logsTransformProcessor) consumerLoop(ctx context.Context) { continue } - ltp.outputChannel <- pLogs + ltp.outputChannel <- outputType{logs: pLogs, err: nil} } } } diff --git a/processor/logstransformprocessor/processor_test.go b/processor/logstransformprocessor/processor_test.go index 9df5b72b52a6..2c62ef265c63 100644 --- a/processor/logstransformprocessor/processor_test.go +++ b/processor/logstransformprocessor/processor_test.go @@ -74,13 +74,7 @@ type testLogMessage struct { attributes *map[string]pdata.Value } -// Skips test without applying unused rule: https://github.com/dominikh/go-tools/issues/633#issuecomment-606560616 -var skip = func(t *testing.T) { - t.Skip("Flaky test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9761") -} - func TestLogsTransformProcessor(t *testing.T) { - skip(t) baseMessage := pcommon.NewValueString("2022-01-01 INFO this is a test") spanID := pcommon.NewSpanID([8]byte{0x32, 0xf0, 0xa2, 0x2b, 0x6a, 0x81, 0x2c, 0xff}) traceID := pcommon.NewTraceID([16]byte{0x48, 0x01, 0x40, 0xf3, 0xd7, 0x70, 0xa5, 0xae, 0x32, 0xf0, 0xa2, 0x2b, 0x6a, 0x81, 0x2c, 0xff}) From 74e99292980108552521f423f685f27652c5d4fe Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Fri, 6 May 2022 14:27:56 -0400 Subject: [PATCH 2/6] Add additional log to processor simple test --- processor/logstransformprocessor/processor.go | 1 - .../logstransformprocessor/processor_test.go | 30 +++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/processor/logstransformprocessor/processor.go b/processor/logstransformprocessor/processor.go index 710794445b39..5d45f2b47250 100644 --- a/processor/logstransformprocessor/processor.go +++ b/processor/logstransformprocessor/processor.go @@ -198,7 +198,6 @@ func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context) { case <-ctx.Done(): ltp.logger.Debug("emitter loop stopped") return - case e, ok := <-ltp.emitter.OutChannel(): if !ok { ltp.logger.Debug("emitter channel got closed") diff --git a/processor/logstransformprocessor/processor_test.go b/processor/logstransformprocessor/processor_test.go index 2c62ef265c63..70aff57f19b1 100644 --- a/processor/logstransformprocessor/processor_test.go +++ b/processor/logstransformprocessor/processor_test.go @@ -97,6 +97,13 @@ func TestLogsTransformProcessor(t *testing.T) { flags: uint32(0x01), observedTime: parseTime("2006-01-02", "2022-01-02"), }, + { + body: &baseMessage, + spanID: &spanID, + traceID: &traceID, + flags: uint32(0x02), + observedTime: parseTime("2006-01-02", "2022-01-03"), + }, }, parsedMessages: []testLogMessage{ { @@ -114,6 +121,21 @@ func TestLogsTransformProcessor(t *testing.T) { observedTime: parseTime("2006-01-02", "2022-01-02"), time: parseTime("2006-01-02", "2022-01-01"), }, + { + body: &baseMessage, + severity: plog.SeverityNumberINFO, + severityText: &infoSeverityText, + attributes: &map[string]pdata.Value{ + "msg": pcommon.NewValueString("this is a test"), + "time": pcommon.NewValueString("2022-01-01"), + "sev": pcommon.NewValueString("INFO"), + }, + spanID: &spanID, + traceID: &traceID, + flags: uint32(0x02), + observedTime: parseTime("2006-01-02", "2022-01-03"), + time: parseTime("2006-01-02", "2022-01-01"), + }, }, }, } @@ -136,7 +158,10 @@ func TestLogsTransformProcessor(t *testing.T) { logs := tln.AllLogs() require.Len(t, logs, 1) - logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Sort() + //logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Sort() + for i := 0; i < logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len(); i++ { + logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Attributes().Sort() + } assert.EqualValues(t, wantLogData, logs[0]) }) } @@ -144,8 +169,9 @@ func TestLogsTransformProcessor(t *testing.T) { func generateLogData(messages []testLogMessage) pdata.Logs { ld := testdata.GenerateLogsOneEmptyResourceLogs() + scope := ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty() for _, content := range messages { - log := ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + log := scope.LogRecords().AppendEmpty() if content.body != nil { content.body.CopyTo(log.Body()) } From 0533d0013f25e6d232ada11503a1ba2ff66bf0fd Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Fri, 6 May 2022 14:58:50 -0400 Subject: [PATCH 3/6] Cleanup & lint --- processor/logstransformprocessor/config_test.go | 8 ++++---- .../logstransformprocessor/processor_test.go | 15 +++++++-------- .../logstransformprocessor/testdata/config.yaml | 8 ++++---- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/processor/logstransformprocessor/config_test.go b/processor/logstransformprocessor/config_test.go index 1b51e76fcb5d..314abd288768 100644 --- a/processor/logstransformprocessor/config_test.go +++ b/processor/logstransformprocessor/config_test.go @@ -44,13 +44,13 @@ func TestLoadConfig(t *testing.T) { Operators: stanza.OperatorConfigs{ map[string]interface{}{ "type": "regex_parser", - "regex": "^(?P