diff --git a/internal/e2e/status_test.go b/internal/e2e/status_test.go index 25e0c52a3e3..3d886d5c999 100644 --- a/internal/e2e/status_test.go +++ b/internal/e2e/status_test.go @@ -6,6 +6,8 @@ package e2e import ( "context" "errors" + "fmt" + "sync" "testing" "time" @@ -116,31 +118,40 @@ func Test_ComponentStatusReporting_SharedInstance(t *testing.T) { err = s.Shutdown(context.Background()) require.NoError(t, err) - assert.Equal(t, 5, len(eventsReceived)) + require.Equal(t, 2, len(eventsReceived)) for instanceID, events := range eventsReceived { - if instanceID.ComponentID() == component.NewID(component.MustNewType("test")) { - for i, e := range events { - if i == 0 { - assert.Equal(t, componentstatus.StatusStarting, e.Status()) - } - if i == 1 { - assert.Equal(t, componentstatus.StatusRecoverableError, e.Status()) - } - if i == 2 { - assert.Equal(t, componentstatus.StatusOK, e.Status()) - } - if i == 3 { - assert.Equal(t, componentstatus.StatusStopping, e.Status()) - } - if i == 4 { - assert.Equal(t, componentstatus.StatusStopped, e.Status()) - } - if i >= 5 { - assert.Fail(t, "received too many events") - } + pipelineIDs := "" + instanceID.AllPipelineIDs(func(id component.ID) bool { + pipelineIDs += id.String() + "," + return true + }) + + t.Logf("checking errors for %v - %v - %v", pipelineIDs, instanceID.Kind().String(), instanceID.ComponentID().String()) + + eventStr := "" + for i, e := range events { + eventStr += fmt.Sprintf("%v,", e.Status()) + if i == 0 { + assert.Equal(t, componentstatus.StatusStarting, e.Status()) + } + if i == 1 { + assert.Equal(t, componentstatus.StatusRecoverableError, e.Status()) + } + if i == 2 { + assert.Equal(t, componentstatus.StatusOK, e.Status()) + } + if i == 3 { + assert.Equal(t, componentstatus.StatusStopping, e.Status()) + } + if i == 4 { + assert.Equal(t, componentstatus.StatusStopped, e.Status()) + } + if i >= 5 { + assert.Fail(t, "received too many events") } } + t.Logf("events received: %v", eventStr) } } @@ -156,12 +167,10 @@ func newReceiverFactory() receiver.Factory { type testReceiver struct{} func (t *testReceiver) Start(_ context.Context, host component.Host) error { - if statusReporter, ok := host.(componentstatus.Reporter); ok { - statusReporter.Report(componentstatus.NewRecoverableErrorEvent(errors.New("test recoverable error"))) - go func() { - statusReporter.Report(componentstatus.NewEvent(componentstatus.StatusOK)) - }() - } + componentstatus.ReportStatus(host, componentstatus.NewRecoverableErrorEvent(errors.New("test recoverable error"))) + go func() { + componentstatus.ReportStatus(host, componentstatus.NewEvent(componentstatus.StatusOK)) + }() return nil } @@ -237,6 +246,7 @@ func createExtension(_ context.Context, _ extension.Settings, cfg component.Conf type testExtension struct { eventsReceived map[*componentstatus.InstanceID][]*componentstatus.Event + lock sync.Mutex } type extensionConfig struct { @@ -262,7 +272,11 @@ func (t *testExtension) ComponentStatusChanged( source *componentstatus.InstanceID, event *componentstatus.Event, ) { - t.eventsReceived[source] = append(t.eventsReceived[source], event) + t.lock.Lock() + defer t.lock.Unlock() + if source.ComponentID() == component.NewID(component.MustNewType("test")) { + t.eventsReceived[source] = append(t.eventsReceived[source], event) + } } // NotifyConfig implements the extension.ConfigWatcher interface.