From e2a4f8538e39bb63239d89b885db4affff194c77 Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Mon, 4 Sep 2023 16:02:50 +0700 Subject: [PATCH] fix: and optimising fetching membership events --- .../rln/group_manager/dynamic/web3.go | 125 ++++++++---------- 1 file changed, 55 insertions(+), 70 deletions(-) diff --git a/waku/v2/protocol/rln/group_manager/dynamic/web3.go b/waku/v2/protocol/rln/group_manager/dynamic/web3.go index 4e8509f04..93ea936e4 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/web3.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/web3.go @@ -38,7 +38,12 @@ func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler R gm.log.Info("resuming onchain sync", zap.Uint64("fromBlock", fromBlock)) } - err = gm.loadOldEvents(ctx, gm.rlnContract, fromBlock, handler) + latestBlockNumber, err := gm.latestBlockNumber(ctx) + if err != nil { + return err + } + // + err = gm.loadOldEvents(ctx, gm.rlnContract, fromBlock, latestBlockNumber, handler) if err != nil { return err } @@ -46,19 +51,32 @@ func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler R errCh := make(chan error) gm.wg.Add(1) - go gm.watchNewEvents(ctx, gm.rlnContract, handler, gm.log, errCh) + go gm.watchNewEvents(ctx, latestBlockNumber+1, handler, errCh) // we have already fetched the events for latestBlocNumber in oldEvents return <-errCh } -func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, rlnContract *contracts.RLN, fromBlock uint64, handler RegistrationEventHandler) error { - events, err := gm.getEvents(ctx, fromBlock, nil) +func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, rlnContract *contracts.RLN, fromBlock, toBlock uint64, handler RegistrationEventHandler) error { + var results []*contracts.RLNMemberRegistered + for ; fromBlock+maxBatchSize < toBlock; fromBlock += maxBatchSize + 1 { // check if the end of the batch is within the toBlock range + events, err := gm.getEvents(ctx, fromBlock, fromBlock+maxBatchSize) + if err != nil { + return err + } + results = append(results, events...) + } + + // + events, err := gm.getEvents(ctx, fromBlock, toBlock) if err != nil { return err } - return handler(gm, events) + results = append(results, events...) + // + // process all the fetched events + return handler(gm, results) } -func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) { +func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler, errCh chan<- error) { defer gm.wg.Done() // Watch for new events @@ -70,13 +88,13 @@ func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract * if err == rpc.ErrNotificationsUnsupported { err = errors.New("notifications not supported. The node must support websockets") } - if firstErr { - errCh <- err - } gm.log.Error("subscribing to rln events", zap.Error(err)) } - firstErr = false - close(errCh) + if firstErr { // errCh can be closed only once + errCh <- err + close(errCh) + firstErr = false + } return s, err }) @@ -86,11 +104,13 @@ func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract * for { select { case h := <-headerCh: - blk := h.Number.Uint64() - events, err := gm.getEvents(ctx, blk, &blk) + toBlock := h.Number.Uint64() + events, err := gm.getEvents(ctx, fromBlock, toBlock) if err != nil { gm.log.Error("obtaining rln events", zap.Error(err)) } + // update the last processed block + fromBlock = toBlock + 1 err = handler(gm, events) if err != nil { @@ -108,78 +128,43 @@ func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract * } const maxBatchSize = uint64(5000) -const additiveFactorMultiplier = 0.10 -const multiplicativeDecreaseDivisor = 2 func tooMuchDataRequestedError(err error) bool { // this error is only infura specific (other providers might have different error messages) return err.Error() == "query returned more than 10000 results" } -func (gm *DynamicGroupManager) getEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) { - var results []*contracts.RLNMemberRegistered - - // Adapted from prysm logic for fetching historical logs - - toBlock := to - if to == nil { - block, err := gm.ethClient.BlockByNumber(ctx, nil) - if err != nil { - return nil, err - } - - blockNumber := block.Number().Uint64() - toBlock = &blockNumber - } - - if from == *toBlock { // Only loading a single block - return gm.fetchEvents(ctx, from, toBlock) +func (gm *DynamicGroupManager) latestBlockNumber(ctx context.Context) (uint64, error) { + block, err := gm.ethClient.BlockByNumber(ctx, nil) + if err != nil { + return 0, err } - // Fetching blocks in batches - batchSize := maxBatchSize - additiveFactor := uint64(float64(batchSize) * additiveFactorMultiplier) - - currentBlockNum := from - for currentBlockNum < *toBlock { - start := currentBlockNum - end := currentBlockNum + batchSize - if end > *toBlock { - end = *toBlock - } + return block.Number().Uint64(), nil +} - evts, err := gm.fetchEvents(ctx, start, &end) - if err != nil { - if tooMuchDataRequestedError(err) { - if batchSize == 0 { - return nil, errors.New("batch size is zero") - } - - // multiplicative decrease - batchSize = batchSize / multiplicativeDecreaseDivisor - continue +func (gm *DynamicGroupManager) getEvents(ctx context.Context, fromBlock uint64, toBlock uint64) ([]*contracts.RLNMemberRegistered, error) { + evts, err := gm.fetchEvents(ctx, fromBlock, toBlock) + if err != nil { + if tooMuchDataRequestedError(err) { // divide the range and try again + mid := (fromBlock + toBlock) / 2 + firstHalfEvents, err := gm.getEvents(ctx, fromBlock, mid) + if err != nil { + return nil, err } - return nil, err - } - - results = append(results, evts...) - - currentBlockNum = end - - if batchSize < maxBatchSize { - // update the batchSize with additive increase - batchSize = batchSize + additiveFactor - if batchSize > maxBatchSize { - batchSize = maxBatchSize + secondHalfEvents, err := gm.getEvents(ctx, mid+1, toBlock) + if err != nil { + return nil, err } + return append(firstHalfEvents, secondHalfEvents...), nil } + return nil, err } - - return results, nil + return evts, nil } -func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) { - logIterator, err := gm.rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: to, Context: ctx}) +func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to uint64) ([]*contracts.RLNMemberRegistered, error) { + logIterator, err := gm.rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: &to, Context: ctx}) if err != nil { return nil, err }