diff --git a/processor/routingprocessor/processor_test.go b/processor/routingprocessor/processor_test.go index a37ed5ddb536..898874f62aac 100644 --- a/processor/routingprocessor/processor_test.go +++ b/processor/routingprocessor/processor_test.go @@ -18,7 +18,6 @@ package routingprocessor import ( "context" "errors" - "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -27,7 +26,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -221,18 +220,18 @@ func TestTraces_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) { // The numbers below stem from the fact that data is routed and grouped // per resource attribute which is used for routing. - // Hence the first 2 traces are grouped together under one plog.Logs. - assert.Equal(t, 1, defaultExp.getTraceCount(), - "one log should be routed to default exporter", + // Hence the first 2 traces are grouped together under one ptrace.Traces. + assert.Len(t, defaultExp.AllTraces(), 1, + "one trace should be routed to default exporter", ) - assert.Equal(t, 1, tExp.getTraceCount(), - "one log should be routed to non default exporter", + assert.Len(t, tExp.AllTraces(), 1, + "one trace should be routed to non default exporter", ) } func TestTraces_RoutingWorks_Context(t *testing.T) { defaultExp := &mockTracesExporter{} - lExp := &mockTracesExporter{} + tExp := &mockTracesExporter{} host := &mockHost{ Host: componenttest.NewNopHost(), @@ -240,7 +239,7 @@ func TestTraces_RoutingWorks_Context(t *testing.T) { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.TracesDataType: { config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): lExp, + config.NewComponentID("otlp/2"): tExp, }, } }, @@ -270,10 +269,10 @@ func TestTraces_RoutingWorks_Context(t *testing.T) { })), tr, )) - assert.Equal(t, 0, defaultExp.getTraceCount(), + assert.Len(t, defaultExp.AllTraces(), 0, "trace should not be routed to default exporter", ) - assert.Equal(t, 1, lExp.getTraceCount(), + assert.Len(t, tExp.AllTraces(), 1, "trace should be routed to non default exporter", ) }) @@ -285,10 +284,10 @@ func TestTraces_RoutingWorks_Context(t *testing.T) { })), tr, )) - assert.Equal(t, 1, defaultExp.getTraceCount(), + assert.Len(t, defaultExp.AllTraces(), 1, "trace should be routed to default exporter", ) - assert.Equal(t, 1, lExp.getTraceCount(), + assert.Len(t, tExp.AllTraces(), 1, "trace should not be routed to non default exporter", ) }) @@ -296,7 +295,7 @@ func TestTraces_RoutingWorks_Context(t *testing.T) { func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) { defaultExp := &mockTracesExporter{} - mExp := &mockTracesExporter{} + tExp := &mockTracesExporter{} host := &mockHost{ Host: componenttest.NewNopHost(), @@ -304,7 +303,7 @@ func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) { return map[config.DataType]map[config.ComponentID]component.Exporter{ config.TracesDataType: { config.NewComponentID("otlp"): defaultExp, - config.NewComponentID("otlp/2"): mExp, + config.NewComponentID("otlp/2"): tExp, }, } }, @@ -329,10 +328,10 @@ func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) { rs.Resource().Attributes().InsertString("X-Tenant", "acme") assert.NoError(t, exp.ConsumeTraces(context.Background(), tr)) - assert.Equal(t, 0, defaultExp.getTraceCount(), + assert.Len(t, defaultExp.AllTraces(), 0, "trace should not be routed to default exporter", ) - assert.Equal(t, 1, mExp.getTraceCount(), + assert.Len(t, tExp.AllTraces(), 1, "trace should be routed to non default exporter", ) }) @@ -343,10 +342,10 @@ func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) { rs.Resource().Attributes().InsertString("X-Tenant", "some-custom-value") assert.NoError(t, exp.ConsumeTraces(context.Background(), tr)) - assert.Equal(t, 1, defaultExp.getTraceCount(), + assert.Len(t, defaultExp.AllTraces(), 1, "trace should be routed to default exporter", ) - assert.Equal(t, 1, mExp.getTraceCount(), + assert.Len(t, tExp.AllTraces(), 1, "trace should not be routed to non default exporter", ) }) @@ -388,12 +387,12 @@ func TestTraces_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing. rm.Resource().Attributes().InsertString("attr", "acme") assert.NoError(t, exp.ConsumeTraces(context.Background(), tr)) - assert.Equal(t, 1, tExp.getTraceCount(), + traces := tExp.AllTraces() + require.Len(t, traces, 1, "trace should be routed to non default exporter", ) - require.Len(t, tExp.traces, 1) - require.Equal(t, 1, tExp.traces[0].ResourceSpans().Len()) - attrs := tExp.traces[0].ResourceSpans().At(0).Resource().Attributes() + require.Equal(t, 1, traces[0].ResourceSpans().Len()) + attrs := traces[0].ResourceSpans().At(0).Resource().Attributes() _, ok := attrs.Get("X-Tenant") assert.False(t, ok, "routing attribute should have been dropped") v, ok := attrs.Get("attr") @@ -474,10 +473,10 @@ func TestMetrics_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) { // The numbers below stem from the fact that data is routed and grouped // per resource attribute which is used for routing. // Hence the first 2 metrics are grouped together under one pmetric.Metrics. - assert.Equal(t, 1, defaultExp.getMetricCount(), + assert.Len(t, defaultExp.AllMetrics(), 1, "one metric should be routed to default exporter", ) - assert.Equal(t, 1, mExp.getMetricCount(), + assert.Len(t, mExp.AllMetrics(), 1, "one metric should be routed to non default exporter", ) } @@ -522,10 +521,10 @@ func TestMetrics_RoutingWorks_Context(t *testing.T) { })), m, )) - assert.Equal(t, 0, defaultExp.getMetricCount(), + assert.Len(t, defaultExp.AllMetrics(), 0, "metric should not be routed to default exporter", ) - assert.Equal(t, 1, mExp.getMetricCount(), + assert.Len(t, mExp.AllMetrics(), 1, "metric should be routed to non default exporter", ) }) @@ -537,10 +536,10 @@ func TestMetrics_RoutingWorks_Context(t *testing.T) { })), m, )) - assert.Equal(t, 1, defaultExp.getMetricCount(), + assert.Len(t, defaultExp.AllMetrics(), 1, "metric should be routed to default exporter", ) - assert.Equal(t, 1, mExp.getMetricCount(), + assert.Len(t, mExp.AllMetrics(), 1, "metric should not be routed to non default exporter", ) }) @@ -581,10 +580,10 @@ func TestMetrics_RoutingWorks_ResourceAttribute(t *testing.T) { rm.Resource().Attributes().InsertString("X-Tenant", "acme") assert.NoError(t, exp.ConsumeMetrics(context.Background(), m)) - assert.Equal(t, 0, defaultExp.getMetricCount(), + assert.Len(t, defaultExp.AllMetrics(), 0, "metric should not be routed to default exporter", ) - assert.Equal(t, 1, mExp.getMetricCount(), + assert.Len(t, mExp.AllMetrics(), 1, "metric should be routed to non default exporter", ) }) @@ -595,10 +594,10 @@ func TestMetrics_RoutingWorks_ResourceAttribute(t *testing.T) { rm.Resource().Attributes().InsertString("X-Tenant", "some-custom-value") assert.NoError(t, exp.ConsumeMetrics(context.Background(), m)) - assert.Equal(t, 1, defaultExp.getMetricCount(), + assert.Len(t, defaultExp.AllMetrics(), 1, "metric should be routed to default exporter", ) - assert.Equal(t, 1, mExp.getMetricCount(), + assert.Len(t, mExp.AllMetrics(), 1, "metric should not be routed to non default exporter", ) }) @@ -640,12 +639,10 @@ func TestMetrics_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing rm.Resource().Attributes().InsertString("attr", "acme") assert.NoError(t, exp.ConsumeMetrics(context.Background(), m)) - assert.Equal(t, 1, mExp.getMetricCount(), - "metric should be routed to non default exporter", - ) - require.Len(t, mExp.metrics, 1) - require.Equal(t, 1, mExp.metrics[0].ResourceMetrics().Len()) - attrs := mExp.metrics[0].ResourceMetrics().At(0).Resource().Attributes() + metrics := mExp.AllMetrics() + require.Len(t, metrics, 1, "metric should be routed to non default exporter") + require.Equal(t, 1, metrics[0].ResourceMetrics().Len()) + attrs := metrics[0].ResourceMetrics().At(0).Resource().Attributes() _, ok := attrs.Get("X-Tenant") assert.False(t, ok, "routing attribute should have been dropped") v, ok := attrs.Get("attr") @@ -693,10 +690,10 @@ func TestLogs_RoutingWorks_Context(t *testing.T) { })), l, )) - assert.Equal(t, 0, defaultExp.getLogCount(), + assert.Len(t, defaultExp.AllLogs(), 0, "log should not be routed to default exporter", ) - assert.Equal(t, 1, lExp.getLogCount(), + assert.Len(t, lExp.AllLogs(), 1, "log should be routed to non default exporter", ) }) @@ -708,10 +705,10 @@ func TestLogs_RoutingWorks_Context(t *testing.T) { })), l, )) - assert.Equal(t, 1, defaultExp.getLogCount(), + assert.Len(t, defaultExp.AllLogs(), 1, "log should be routed to default exporter", ) - assert.Equal(t, 1, lExp.getLogCount(), + assert.Len(t, lExp.AllLogs(), 1, "log should not be routed to non default exporter", ) }) @@ -752,10 +749,10 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) { rl.Resource().Attributes().InsertString("X-Tenant", "acme") assert.NoError(t, exp.ConsumeLogs(context.Background(), l)) - assert.Equal(t, 0, defaultExp.getLogCount(), + assert.Len(t, defaultExp.AllLogs(), 0, "log should not be routed to default exporter", ) - assert.Equal(t, 1, lExp.getLogCount(), + assert.Len(t, lExp.AllLogs(), 1, "log should be routed to non default exporter", ) }) @@ -766,10 +763,10 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) { rl.Resource().Attributes().InsertString("X-Tenant", "some-custom-value") assert.NoError(t, exp.ConsumeLogs(context.Background(), l)) - assert.Equal(t, 1, defaultExp.getLogCount(), + assert.Len(t, defaultExp.AllLogs(), 1, "log should be routed to default exporter", ) - assert.Equal(t, 1, lExp.getLogCount(), + assert.Len(t, lExp.AllLogs(), 1, "log should not be routed to non default exporter", ) }) @@ -811,12 +808,10 @@ func TestLogs_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T) rm.Resource().Attributes().InsertString("attr", "acme") assert.NoError(t, exp.ConsumeLogs(context.Background(), l)) - assert.Equal(t, 1, lExp.getLogCount(), - "log should be routed to non-default exporter", - ) - require.Len(t, lExp.logs, 1) - require.Equal(t, 1, lExp.logs[0].ResourceLogs().Len()) - attrs := lExp.logs[0].ResourceLogs().At(0).Resource().Attributes() + logs := lExp.AllLogs() + require.Len(t, logs, 1, "log should be routed to non-default exporter") + require.Equal(t, 1, logs[0].ResourceLogs().Len()) + attrs := logs[0].ResourceLogs().At(0).Resource().Attributes() _, ok := attrs.Get("X-Tenant") assert.False(t, ok, "routing attribute should have been dropped") v, ok := attrs.Get("attr") @@ -873,10 +868,10 @@ func TestLogs_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) { // The numbers below stem from the fact that data is routed and grouped // per resource attribute which is used for routing. // Hence the first 2 metrics are grouped together under one plog.Logs. - assert.Equal(t, 1, defaultExp.getLogCount(), + assert.Len(t, defaultExp.AllLogs(), 1, "one log should be routed to default exporter", ) - assert.Equal(t, 1, lExp.getLogCount(), + assert.Len(t, lExp.AllLogs(), 1, "one log should be routed to non default exporter", ) } @@ -941,72 +936,22 @@ func (m *mockHost) GetExporters() map[config.DataType]map[config.ComponentID]com return m.Host.GetExporters() } -type mockComponent struct{} - -func (m *mockComponent) Start(context.Context, component.Host) error { - return nil -} - -func (m *mockComponent) Shutdown(context.Context) error { - return nil +type mockComponent struct { + component.StartFunc + component.ShutdownFunc } type mockMetricsExporter struct { mockComponent - metricCount int32 - metrics []pmetric.Metrics -} - -func (m *mockMetricsExporter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -func (m *mockMetricsExporter) ConsumeMetrics(_ context.Context, metrics pmetric.Metrics) error { - atomic.AddInt32(&m.metricCount, 1) - m.metrics = append(m.metrics, metrics) - return nil -} - -func (m *mockMetricsExporter) getMetricCount() int { - return int(atomic.LoadInt32(&m.metricCount)) + consumertest.MetricsSink } type mockLogsExporter struct { mockComponent - logCount int32 - logs []plog.Logs -} - -func (m *mockLogsExporter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -func (m *mockLogsExporter) ConsumeLogs(_ context.Context, logs plog.Logs) error { - atomic.AddInt32(&m.logCount, 1) - m.logs = append(m.logs, logs) - return nil -} - -func (m *mockLogsExporter) getLogCount() int { - return int(atomic.LoadInt32(&m.logCount)) + consumertest.LogsSink } type mockTracesExporter struct { mockComponent - traceCount int32 - traces []ptrace.Traces -} - -func (m *mockTracesExporter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -func (m *mockTracesExporter) ConsumeTraces(_ context.Context, traces ptrace.Traces) error { - atomic.AddInt32(&m.traceCount, 1) - m.traces = append(m.traces, traces) - return nil -} - -func (m *mockTracesExporter) getTraceCount() int { - return int(atomic.LoadInt32(&m.traceCount)) + consumertest.TracesSink }