Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/stanza: Avoid writing test/mock consumers when not necessary #9802

Merged
merged 1 commit into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions internal/stanza/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/obsreport"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -87,8 +88,8 @@ func BenchmarkEmitterToConsumer(b *testing.B) {

for _, wc := range workerCounts {
b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) {
consumer := &mockLogsConsumer{}
logsReceiver, err := createNoopReceiver(wc, consumer)
cl := &consumertest.LogsSink{}
logsReceiver, err := createNoopReceiver(wc, cl)
require.NoError(b, err)

err = logsReceiver.Start(context.Background(), componenttest.NewNopHost())
Expand All @@ -97,7 +98,7 @@ func BenchmarkEmitterToConsumer(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
consumer.ResetReceivedCount()
cl.Reset()

go func() {
ctx := context.Background()
Expand All @@ -108,9 +109,9 @@ func BenchmarkEmitterToConsumer(b *testing.B) {

require.Eventually(b,
func() bool {
return consumer.Received() == entryCount
return cl.LogRecordCount() == entryCount
},
30*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", consumer.Received(),
30*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", cl.LogRecordCount(),
)
}
})
Expand All @@ -126,8 +127,8 @@ func TestEmitterToConsumer(t *testing.T) {

entries := complexEntriesForNDifferentHosts(entryCount, hostsCount)

consumer := &mockLogsConsumer{}
logsReceiver, err := createNoopReceiver(workerCount, consumer)
cl := &consumertest.LogsSink{}
logsReceiver, err := createNoopReceiver(workerCount, cl)
require.NoError(t, err)

err = logsReceiver.Start(context.Background(), componenttest.NewNopHost())
Expand All @@ -142,13 +143,13 @@ func TestEmitterToConsumer(t *testing.T) {

require.Eventually(t,
func() bool {
return consumer.Received() == entryCount
return cl.LogRecordCount() == entryCount
},
5*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", consumer.Received(),
5*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", cl.LogRecordCount(),
)

// Wait for a small bit of time in order to let any potential extra entries drain out of the pipeline
<-time.After(500 * time.Millisecond)

require.Equal(t, entryCount, consumer.Received())
require.Equal(t, entryCount, cl.LogRecordCount())
}
38 changes: 3 additions & 35 deletions internal/stanza/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
"github.com/open-telemetry/opentelemetry-log-collection/operator/transformer/noop"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
Expand Down Expand Up @@ -77,46 +76,15 @@ func (o *UnstartableOperator) Process(ctx context.Context, entry *entry.Entry) e
return nil
}

type mockLogsConsumer struct {
received int32
}

func (m *mockLogsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (m *mockLogsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
atomic.AddInt32(&m.received, int32(ld.LogRecordCount()))
return nil
}

func (m *mockLogsConsumer) Received() int {
ret := atomic.LoadInt32(&m.received)
return int(ret)
}

func (m *mockLogsConsumer) ResetReceivedCount() {
atomic.StoreInt32(&m.received, 0)
}

type mockLogsRejecter struct {
rejected int32
}

func (m *mockLogsRejecter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
consumertest.LogsSink
}

func (m *mockLogsRejecter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
atomic.AddInt32(&m.rejected, 1)
_ = m.LogsSink.ConsumeLogs(ctx, ld)
return fmt.Errorf("no")
}

func (m *mockLogsRejecter) Rejected() int {
ret := atomic.LoadInt32(&m.rejected)
return int(ret)
}

const testType = "test"

type TestConfig struct {
Expand Down
19 changes: 10 additions & 9 deletions internal/stanza/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ import (
"github.com/open-telemetry/opentelemetry-log-collection/pipeline"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.uber.org/zap"
"gopkg.in/yaml.v2"
)

func TestStart(t *testing.T) {
mockConsumer := mockLogsConsumer{}
mockConsumer := &consumertest.LogsSink{}

factory := NewFactory(TestReceiverType{})

logsReceiver, err := factory.CreateLogsReceiver(
context.Background(),
componenttest.NewNopReceiverCreateSettings(),
factory.CreateDefaultConfig(),
&mockConsumer,
mockConsumer,
)
require.NoError(t, err, "receiver should successfully build")

Expand All @@ -55,33 +56,33 @@ func TestStart(t *testing.T) {
// Eventually because of asynchronuous nature of the receiver.
require.Eventually(t,
func() bool {
return mockConsumer.Received() == 1
return mockConsumer.LogRecordCount() == 1
},
10*time.Second, 5*time.Millisecond, "one log entry expected",
)
logsReceiver.Shutdown(context.Background())
}

func TestHandleStartError(t *testing.T) {
mockConsumer := mockLogsConsumer{}
mockConsumer := &consumertest.LogsSink{}

factory := NewFactory(TestReceiverType{})

cfg := factory.CreateDefaultConfig().(*TestConfig)
cfg.Input = newUnstartableParams()

receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, &mockConsumer)
receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, mockConsumer)
require.NoError(t, err, "receiver should successfully build")

err = receiver.Start(context.Background(), componenttest.NewNopHost())
require.Error(t, err, "receiver fails to start under rare circumstances")
}

func TestHandleConsumeError(t *testing.T) {
mockConsumer := mockLogsRejecter{}
mockConsumer := &mockLogsRejecter{}
factory := NewFactory(TestReceiverType{})

logsReceiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), factory.CreateDefaultConfig(), &mockConsumer)
logsReceiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), factory.CreateDefaultConfig(), mockConsumer)
require.NoError(t, err, "receiver should successfully build")

err = logsReceiver.Start(context.Background(), componenttest.NewNopHost())
Expand All @@ -93,11 +94,11 @@ func TestHandleConsumeError(t *testing.T) {
// Eventually because of asynchronuous nature of the receiver.
require.Eventually(t,
func() bool {
return mockConsumer.Rejected() == 1
return mockConsumer.LogRecordCount() == 1
},
10*time.Second, 5*time.Millisecond, "one log entry expected",
)
logsReceiver.Shutdown(context.Background())
require.NoError(t, logsReceiver.Shutdown(context.Background()))
}

func BenchmarkReadLine(b *testing.B) {
Expand Down
4 changes: 2 additions & 2 deletions internal/stanza/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/otel/metric/nonrecording"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -98,15 +99,14 @@ func createReceiver(t *testing.T) *receiver {
MetricsLevel: configtelemetry.LevelNone,
},
}
mockConsumer := mockLogsConsumer{}

factory := NewFactory(TestReceiverType{})

logsReceiver, err := factory.CreateLogsReceiver(
context.Background(),
params,
factory.CreateDefaultConfig(),
&mockConsumer,
consumertest.NewNop(),
)
require.NoError(t, err, "receiver should successfully build")

Expand Down