Skip to content

Commit

Permalink
fix: and optimising fetching membership events
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh-98 committed Sep 4, 2023
1 parent e8bd38a commit e2a4f85
Showing 1 changed file with 55 additions and 70 deletions.
125 changes: 55 additions & 70 deletions waku/v2/protocol/rln/group_manager/dynamic/web3.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,45 @@ func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler R
gm.log.Info("resuming onchain sync", zap.Uint64("fromBlock", fromBlock))
}

err = gm.loadOldEvents(ctx, gm.rlnContract, fromBlock, handler)
latestBlockNumber, err := gm.latestBlockNumber(ctx)
if err != nil {
return err
}
//
err = gm.loadOldEvents(ctx, gm.rlnContract, fromBlock, latestBlockNumber, handler)
if err != nil {
return err
}

errCh := make(chan error)

gm.wg.Add(1)
go gm.watchNewEvents(ctx, gm.rlnContract, handler, gm.log, errCh)
go gm.watchNewEvents(ctx, latestBlockNumber+1, handler, errCh) // we have already fetched the events for latestBlocNumber in oldEvents
return <-errCh
}

func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, rlnContract *contracts.RLN, fromBlock uint64, handler RegistrationEventHandler) error {
events, err := gm.getEvents(ctx, fromBlock, nil)
func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, rlnContract *contracts.RLN, fromBlock, toBlock uint64, handler RegistrationEventHandler) error {
var results []*contracts.RLNMemberRegistered
for ; fromBlock+maxBatchSize < toBlock; fromBlock += maxBatchSize + 1 { // check if the end of the batch is within the toBlock range
events, err := gm.getEvents(ctx, fromBlock, fromBlock+maxBatchSize)
if err != nil {
return err
}
results = append(results, events...)
}

//
events, err := gm.getEvents(ctx, fromBlock, toBlock)
if err != nil {
return err
}
return handler(gm, events)
results = append(results, events...)
//
// process all the fetched events
return handler(gm, results)
}

func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) {
func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler, errCh chan<- error) {
defer gm.wg.Done()

// Watch for new events
Expand All @@ -70,13 +88,13 @@ func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract *
if err == rpc.ErrNotificationsUnsupported {
err = errors.New("notifications not supported. The node must support websockets")
}
if firstErr {
errCh <- err
}
gm.log.Error("subscribing to rln events", zap.Error(err))
}
firstErr = false
close(errCh)
if firstErr { // errCh can be closed only once
errCh <- err
close(errCh)
firstErr = false
}
return s, err
})

Expand All @@ -86,11 +104,13 @@ func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract *
for {
select {
case h := <-headerCh:
blk := h.Number.Uint64()
events, err := gm.getEvents(ctx, blk, &blk)
toBlock := h.Number.Uint64()
events, err := gm.getEvents(ctx, fromBlock, toBlock)
if err != nil {
gm.log.Error("obtaining rln events", zap.Error(err))
}
// update the last processed block
fromBlock = toBlock + 1

err = handler(gm, events)
if err != nil {
Expand All @@ -108,78 +128,43 @@ func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract *
}

const maxBatchSize = uint64(5000)
const additiveFactorMultiplier = 0.10
const multiplicativeDecreaseDivisor = 2

func tooMuchDataRequestedError(err error) bool {
// this error is only infura specific (other providers might have different error messages)
return err.Error() == "query returned more than 10000 results"
}

func (gm *DynamicGroupManager) getEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) {
var results []*contracts.RLNMemberRegistered

// Adapted from prysm logic for fetching historical logs

toBlock := to
if to == nil {
block, err := gm.ethClient.BlockByNumber(ctx, nil)
if err != nil {
return nil, err
}

blockNumber := block.Number().Uint64()
toBlock = &blockNumber
}

if from == *toBlock { // Only loading a single block
return gm.fetchEvents(ctx, from, toBlock)
func (gm *DynamicGroupManager) latestBlockNumber(ctx context.Context) (uint64, error) {
block, err := gm.ethClient.BlockByNumber(ctx, nil)
if err != nil {
return 0, err
}

// Fetching blocks in batches
batchSize := maxBatchSize
additiveFactor := uint64(float64(batchSize) * additiveFactorMultiplier)

currentBlockNum := from
for currentBlockNum < *toBlock {
start := currentBlockNum
end := currentBlockNum + batchSize
if end > *toBlock {
end = *toBlock
}
return block.Number().Uint64(), nil
}

evts, err := gm.fetchEvents(ctx, start, &end)
if err != nil {
if tooMuchDataRequestedError(err) {
if batchSize == 0 {
return nil, errors.New("batch size is zero")
}

// multiplicative decrease
batchSize = batchSize / multiplicativeDecreaseDivisor
continue
func (gm *DynamicGroupManager) getEvents(ctx context.Context, fromBlock uint64, toBlock uint64) ([]*contracts.RLNMemberRegistered, error) {
evts, err := gm.fetchEvents(ctx, fromBlock, toBlock)
if err != nil {
if tooMuchDataRequestedError(err) { // divide the range and try again
mid := (fromBlock + toBlock) / 2
firstHalfEvents, err := gm.getEvents(ctx, fromBlock, mid)
if err != nil {
return nil, err
}
return nil, err
}

results = append(results, evts...)

currentBlockNum = end

if batchSize < maxBatchSize {
// update the batchSize with additive increase
batchSize = batchSize + additiveFactor
if batchSize > maxBatchSize {
batchSize = maxBatchSize
secondHalfEvents, err := gm.getEvents(ctx, mid+1, toBlock)
if err != nil {
return nil, err
}
return append(firstHalfEvents, secondHalfEvents...), nil
}
return nil, err
}

return results, nil
return evts, nil
}

func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) {
logIterator, err := gm.rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: to, Context: ctx})
func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to uint64) ([]*contracts.RLNMemberRegistered, error) {
logIterator, err := gm.rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: &to, Context: ctx})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit e2a4f85

Please sign in to comment.