diff --git a/.changeset/funny-poets-sneeze.md b/.changeset/funny-poets-sneeze.md new file mode 100644 index 0000000000..214ba4504a --- /dev/null +++ b/.changeset/funny-poets-sneeze.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Remove LogPoller filters for outdated Functions coordinator contracts diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index 471f18b4b0..4e37770f90 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -410,6 +410,7 @@ func (l *logPollerWrapper) handleRouteUpdate(ctx context.Context, activeCoordina } l.lggr.Debugw("LogPollerWrapper: new routes", "activeCoordinator", activeCoordinator.Hex(), "proposedCoordinator", proposedCoordinator.Hex()) + l.activeCoordinator = activeCoordinator l.proposedCoordinator = proposedCoordinator @@ -419,10 +420,28 @@ func (l *logPollerWrapper) handleRouteUpdate(ctx context.Context, activeCoordina l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "err", err) } } + + filters := l.logPoller.GetFilters() + for _, filter := range filters { + if filter.Name[:len(l.filterPrefix())] != l.filterPrefix() { + continue + } + if filter.Name == l.filterName(l.activeCoordinator) || filter.Name == l.filterName(l.proposedCoordinator) { + continue + } + if err := l.logPoller.UnregisterFilter(ctx, filter.Name); err != nil { + l.lggr.Errorw("LogPollerWrapper: Failed to unregister filter", "filterName", filter.Name, "err", err) + } + l.lggr.Debugw("LogPollerWrapper: Successfully unregistered filter", "filterName", filter.Name) + } +} + +func (l *logPollerWrapper) filterPrefix() string { + return "FunctionsLogPollerWrapper:" + l.pluginConfig.DONID } -func filterName(addr common.Address) string { - return logpoller.FilterName("FunctionsLogPollerWrapper", addr.String()) +func (l *logPollerWrapper) filterName(addr common.Address) string { + return logpoller.FilterName(l.filterPrefix(), addr.String()) } func (l *logPollerWrapper) registerFilters(ctx context.Context, coordinatorAddress common.Address) error { @@ -432,7 +451,7 @@ func (l *logPollerWrapper) registerFilters(ctx context.Context, coordinatorAddre return l.logPoller.RegisterFilter( ctx, logpoller.Filter{ - Name: filterName(coordinatorAddress), + Name: l.filterName(coordinatorAddress), EventSigs: []common.Hash{ functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), diff --git a/core/services/relay/evm/functions/logpoller_wrapper_test.go b/core/services/relay/evm/functions/logpoller_wrapper_test.go index b9a1684050..583e661741 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper_test.go +++ b/core/services/relay/evm/functions/logpoller_wrapper_test.go @@ -95,6 +95,7 @@ func TestLogPollerWrapper_SingleSubscriberEmptyEvents(t *testing.T) { lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr(t, "01"), nil) lp.On("RegisterFilter", mock.Anything, mock.Anything).Return(nil) + lp.On("GetFilters").Return(map[string]logpoller.Filter{}, nil) subscriber := newSubscriber(1) lpWrapper.SubscribeToUpdates(ctx, "mock_subscriber", subscriber) @@ -127,6 +128,8 @@ func TestLogPollerWrapper_LatestEvents_ReorgHandling(t *testing.T) { lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr(t, "01"), nil) lp.On("RegisterFilter", mock.Anything, mock.Anything).Return(nil) + lp.On("GetFilters").Return(map[string]logpoller.Filter{}, nil) + subscriber := newSubscriber(1) lpWrapper.SubscribeToUpdates(ctx, "mock_subscriber", subscriber) mockedLog := getMockedRequestLog(t) @@ -213,3 +216,34 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_FiltersPreviouslyDetect assert.Equal(t, 0, len(mockedDetectedEvents.detectedEventsOrdered)) assert.Equal(t, 0, len(mockedDetectedEvents.isPreviouslyDetected)) } + +func TestLogPollerWrapper_UnregisterOldFiltersOnRouteUpgrade(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + lp, lpWrapper, _ := setUp(t, 100_000) // check only once + wrapper := lpWrapper.(*logPollerWrapper) + + activeCoord := common.HexToAddress("0x1") + proposedCoord := common.HexToAddress("0x2") + newActiveCoord := proposedCoord + newProposedCoord := common.HexToAddress("0x3") + + wrapper.activeCoordinator = activeCoord + wrapper.proposedCoordinator = proposedCoord + activeCoordFilterName := wrapper.filterName(activeCoord) + proposedCoordFilterName := wrapper.filterName(proposedCoord) + newProposedCoordFilterName := wrapper.filterName(newProposedCoord) + + lp.On("RegisterFilter", ctx, mock.Anything).Return(nil) + existingFilters := map[string]logpoller.Filter{ + activeCoordFilterName: {Name: activeCoordFilterName}, + proposedCoordFilterName: {Name: proposedCoordFilterName}, + newProposedCoordFilterName: {Name: newProposedCoordFilterName}, + } + lp.On("GetFilters").Return(existingFilters, nil) + lp.On("UnregisterFilter", ctx, activeCoordFilterName).Return(nil) + + wrapper.handleRouteUpdate(ctx, newActiveCoord, newProposedCoord) + + lp.AssertCalled(t, "UnregisterFilter", ctx, activeCoordFilterName) +}