From a99e167f25bdd809f6e7bb4272f3ce995a2979c3 Mon Sep 17 00:00:00 2001 From: Kamil Molendys Date: Mon, 22 Apr 2024 01:51:10 +0200 Subject: [PATCH] Process multiple blocks in parallel --- README.md | 5 +- parser/options.go | 8 ++- parser/options_test.go | 10 +++- parser/parser.go | 125 +++++++++++++++++------------------------ parser/parser_test.go | 45 ++------------- parser/processor.go | 84 +++++++++++++++++++++++++++ 6 files changed, 156 insertions(+), 121 deletions(-) diff --git a/README.md b/README.md index ab93bee..21cb35e 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,7 @@ I had a lot of fun working on this project. My approach was to create a parser that could be easily extended with new repositories and ethereum clients. Most of the code is tested with unit tests and I tried to keep it as clean as possible. -The most interesting parts are commented in the code so that reviewers can understand my thought process. The parser processes -blocks one by one (for simplicity) as this is not a perfect production ready service. In a prod environment I would -extend it to process multiple blocks at once by adding multiple workers. This way, the caller could control the batch size -and the parser could be more efficient. +The most interesting parts are commented in the code so that reviewers can understand my thought process. ## Packages diff --git a/parser/options.go b/parser/options.go index 94e40a4..cba5153 100644 --- a/parser/options.go +++ b/parser/options.go @@ -10,7 +10,7 @@ type Option func(p *Parser) func WithBlockProcessTimeout(timeout time.Duration) Option { return func(p *Parser) { - p.blockProcessTimeout = timeout + p.blocksProcessTimeout = timeout } } @@ -43,3 +43,9 @@ func WithNoNewBlocksPause(duration time.Duration) Option { p.noNewBlocksPause = duration } } + +func WithMaxBlocksToProcess(maxBlocks int) Option { + return func(p *Parser) { + p.maxNumberOfBlocksToProcess = maxBlocks + } +} diff --git a/parser/options_test.go b/parser/options_test.go index bfc8e6e..2388054 100644 --- a/parser/options_test.go +++ b/parser/options_test.go @@ -13,7 +13,7 @@ func TestWithBlockProcessTimeout(t *testing.T) { t.Run("set block process timeout opt", func(t *testing.T) { p, err := NewParser(endpoint, nil, WithBlockProcessTimeout(2000)) require.NoError(t, err) - require.Equal(t, time.Duration(2000), p.blockProcessTimeout) + require.Equal(t, time.Duration(2000), p.blocksProcessTimeout) }) } @@ -68,3 +68,11 @@ func TestWithNoNewBlocksPause(t *testing.T) { require.Equal(t, time.Duration(2000), p.noNewBlocksPause) }) } + +func TestWithMaxNumberOfBlocksToProcess(t *testing.T) { + t.Run("set max blocks to process opt", func(t *testing.T) { + p, err := NewParser(endpoint, nil, WithMaxBlocksToProcess(22)) + require.NoError(t, err) + require.Equal(t, 22, p.maxNumberOfBlocksToProcess) + }) +} diff --git a/parser/parser.go b/parser/parser.go index f48acc8..f47e773 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -13,21 +13,24 @@ import ( ) const ( - defaultBlockProcessTimeout = 5 * time.Second - defaultNoNewBlocksPause = 10 * time.Second // eth new block appears every ~12 seconds + defaultBlocksProcessTimeout = 5 * time.Second + defaultNoNewBlocksPause = 10 * time.Second // eth new block appears every ~12 seconds + defaultMaxNumberOfBlocksToProcess = 10 ) type Parser struct { - blockProcessTimeout time.Duration - ethClient EthereumClient - lastProcessedBlock uint64 - logger types.Logger - noNewBlocksPause time.Duration - transactionsRepo TransactionsRepository - addressesRepository AddressesRepository - running bool - singleWorkerChannel chan struct{} + blocksProcessTimeout time.Duration + ethClient EthereumClient + lastProcessedBlock uint64 + logger types.Logger + noNewBlocksPause time.Duration + transactionsRepo TransactionsRepository + addressesRepository AddressesRepository + running bool + batchesWorker chan struct{} sync.RWMutex + maxNumberOfBlocksToProcess int + processingErrs []error } func NewParser( @@ -36,12 +39,14 @@ func NewParser( opts ...Option, ) (*Parser, error) { p := &Parser{ - blockProcessTimeout: defaultBlockProcessTimeout, - logger: logger, - noNewBlocksPause: defaultNoNewBlocksPause, - transactionsRepo: storage.NewTransactionRepository(), - addressesRepository: storage.NewAddressesRepository(), - singleWorkerChannel: make(chan struct{}, 1), + blocksProcessTimeout: defaultBlocksProcessTimeout, + logger: logger, + noNewBlocksPause: defaultNoNewBlocksPause, + transactionsRepo: storage.NewTransactionRepository(), + addressesRepository: storage.NewAddressesRepository(), + batchesWorker: make(chan struct{}, 1), + maxNumberOfBlocksToProcess: defaultMaxNumberOfBlocksToProcess, + processingErrs: make([]error, 0), } for _, opt := range opts { @@ -57,7 +62,7 @@ func NewParser( p.ethClient = ethClient } - p.singleWorkerChannel <- struct{}{} + p.batchesWorker <- struct{}{} return p, nil } @@ -105,6 +110,30 @@ func (p *Parser) GetTransactions(address string) []types.Transaction { return transactions } +// getNumberOfBlocksToProcess calculates the number of blocks that the parser should process in the next iteration. +func (p *Parser) getNumberOfBlocksToProcess(ctx context.Context) (int, uint64, error) { + p.RLock() + defer p.RUnlock() + + lastBlockNumber, err := p.ethClient.GetMostRecentBlockNumber(ctx) + if err != nil { + return 0, 0, fmt.Errorf("could not get most recent block: %w", err) + } + + blocksToProcessCount := int(lastBlockNumber - uint64(p.GetCurrentBlock())) + + if blocksToProcessCount > p.maxNumberOfBlocksToProcess { + blocksToProcessCount = p.maxNumberOfBlocksToProcess + } + + lastBlockOfTheSequence := p.GetCurrentBlock() + blocksToProcessCount + + p.logger.Info("calculated blocks to process", + "blocks", blocksToProcessCount, "lastBlockOfTheSequence", lastBlockOfTheSequence) + + return blocksToProcessCount, uint64(lastBlockOfTheSequence), nil +} + // Run starts the parser and listens for new blocks. // This method is not specified in the task `Parser` interface, but I added it to start the // parser explicitly (not in the constructor). @@ -134,24 +163,12 @@ func (p *Parser) Run(ctx context.Context) error { case <-ctx.Done(): p.logger.Info("stopping parser") return nil - case <-p.singleWorkerChannel: - ctx, cancel := context.WithTimeout(ctx, defaultBlockProcessTimeout) - - p.logger.Info("fetching and parsing block") - - if err := p.fetchAndParseBlock(ctx); err != nil { - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - p.logger.Error("could not fetch and parse because the context expired") - } else { - p.logger.Error("could not fetch and parse block", "error", err) - } - } - if err == nil { - p.logger.Info("block fetched and parsed", "block", p.lastProcessedBlock) + case <-p.batchesWorker: + if err := p.processBlocks(ctx); err != nil { + p.logger.Error("could not process blocks", "error", err) } - p.singleWorkerChannel <- struct{}{} - cancel() + p.batchesWorker <- struct{}{} } } } @@ -170,46 +187,6 @@ func (p *Parser) setIsRunning(running bool) { p.running = running } -func (p *Parser) fetchAndParseBlock(ctx context.Context) error { - lastBlockNumber, err := p.ethClient.GetMostRecentBlockNumber(ctx) - if err != nil { - return fmt.Errorf("could not get most recent block: %w", err) - } - - // Check if there are new blocks to process. If not, sleep for a while to avoid spamming the node. - if !p.shouldProcessBlock(lastBlockNumber) { - p.logger.Info("no new blocks, sleeping to avoid spamming the node") - time.Sleep(p.noNewBlocksPause) - return nil - } - - // Get the next block to process in the sequence. - block, err := p.ethClient.GetBlockByNumber(ctx, p.lastProcessedBlock+1) - if err != nil { - return fmt.Errorf("could not get block by number: %w", err) - } - - // Process the block. - if err = p.processBlock(ctx, block); err != nil { - return fmt.Errorf("could not process block: %w", err) - } - - // Save the last processed block. - if err = p.transactionsRepo.SaveLastProcessedBlock(ctx, block.Number); err != nil { - return fmt.Errorf("could not save last processed block: %w", err) - } - - // Move to sequence to the next block. - p.setLastProcessedBlock(block.Number) - - return nil -} - -// shouldProcessBlock checks if there are new blocks to process. -func (p *Parser) shouldProcessBlock(lastBlockNumber uint64) bool { - return p.lastProcessedBlock < lastBlockNumber -} - // setLastProcessedBlock sets the last processed block number. func (p *Parser) setLastProcessedBlock(blockNumber uint64) { p.Lock() diff --git a/parser/parser_test.go b/parser/parser_test.go index 0bc90c1..d8ab299 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "math/big" - "strings" "sync" "testing" "time" @@ -109,7 +108,7 @@ func TestParser(t *testing.T) { require.Equal(t, "could not get transactions", log.GotErrors()[0]) }) - t.Run("parser should error because of context timeout", func(t *testing.T) { + t.Run("parser could not start - should error because of context timeout", func(t *testing.T) { log := &mock.Logger{} p, err := NewParser( @@ -126,43 +125,7 @@ func TestParser(t *testing.T) { require.ErrorIs(t, err, context.DeadlineExceeded) }) - t.Run("parser should log timeout", func(t *testing.T) { - log := &mock.Logger{} - ethMock := mock.EthereumClient{ - MostRecentBlock: 2, - BlockByNumber: types.Block{}, - WithError: context.DeadlineExceeded, - } - - p, err := NewParser( - endpoint, - log, - WithNoNewBlocksPause(noNewBlockPauseDuration), - WithEthereumClient(ethMock), - ) - require.NoError(t, err) - require.NotNil(t, p) - - ctx, cancel := context.WithCancel(context.TODO()) - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - err := p.Run(ctx) - require.NoError(t, err) - wg.Done() - }() - - require.Eventually(t, func() bool { - return p.isRunning() && len(log.GotErrors()) > 0 && - strings.Contains(log.GotErrors()[0], "could not fetch and parse because the context expired") - }, time.Second*2, time.Millisecond*100) - - cancel() - wg.Wait() - }) - - t.Run("parser should log error during fetchAndParseBlock", func(t *testing.T) { + t.Run("parser should log error during processing", func(t *testing.T) { log := &mock.Logger{} ethMock := mock.EthereumClient{ MostRecentBlock: 2, @@ -190,9 +153,9 @@ func TestParser(t *testing.T) { }() require.Eventually(t, func() bool { - return p.isRunning() && len(log.GotErrors()) > 0 && - strings.Contains(log.GotErrors()[0], "could not fetch and parse block") + return p.isRunning() && len(log.GotErrors()) > 10 }, time.Second*2, time.Millisecond*100) + require.Contains(t, log.GotErrors(), "could not process blocks") cancel() wg.Wait() diff --git a/parser/processor.go b/parser/processor.go index 37ed2f7..feae963 100644 --- a/parser/processor.go +++ b/parser/processor.go @@ -3,10 +3,72 @@ package parser import ( "context" "fmt" + "sync" + "time" "github.com/ilkamo/ethparser-go/types" ) +// processBlocks processes the blocks in batches. It gets the number of blocks to process, then processes them in parallel. +// It waits for processed blocks and updates the `last processed block indicator` only if all the blocks were processed successfully. +// If an error occurs during processing, the batch will be retried again in the next iteration. This assumes that +// parser repositories are idempotent. It is an all or nothing approach that works well if the num +func (p *Parser) processBlocks(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, defaultBlocksProcessTimeout) + defer cancel() + + blocksToProcess, lastBlockOfTheSequence, err := p.getNumberOfBlocksToProcess(ctx) + if err != nil { + return fmt.Errorf("could not get number of blocks to process: %w", err) + } + + if blocksToProcess == 0 { + p.logger.Info("no new blocks, sleeping to avoid spamming the node") + time.Sleep(p.noNewBlocksPause) + return nil + } + + wg := sync.WaitGroup{} + for i := 0; i < blocksToProcess; i++ { + wg.Add(1) + + go func(blockNumber uint64) { + defer wg.Done() + + block, err := p.ethClient.GetBlockByNumber(ctx, blockNumber) + if err != nil { + p.logger.Error("could not get block by number", "block", blockNumber, "error", err) + p.setProcessingError(err) + return + } + + if err := p.processBlock(ctx, block); err != nil { + p.logger.Error("could not process block", "block", block.Number, "error", err) + p.setProcessingError(err) + } + }(p.lastProcessedBlock + uint64(i) + 1) + } + wg.Wait() + + if len(p.getProcessingErrors()) > 0 { + return fmt.Errorf("errors occurred during block processing: %v", p.getProcessingErrors()) + } + + // Clear the processing errors for the next iteration. + p.clearProcessingErrors() + + // Save the last processed block of the sequence. + if err = p.transactionsRepo.SaveLastProcessedBlock(ctx, lastBlockOfTheSequence); err != nil { + return fmt.Errorf("could not save last processed block of the sequence: %w", err) + } + + // Move the sequence forward. + p.setLastProcessedBlock(lastBlockOfTheSequence) + + return nil +} + +// processBlock processes the block by filtering out observed transactions and saving them to the repository. func (p *Parser) processBlock(ctx context.Context, block types.Block) error { p.logger.Info("processing block", "block", block.Number, "transactions", len(block.Transactions)) @@ -24,6 +86,7 @@ func (p *Parser) processBlock(ctx context.Context, block types.Block) error { return nil } +// processAndFilterObservedTransactions filters out transactions that involve observed addresses. func (p *Parser) processAndFilterObservedTransactions( ctx context.Context, transactions []types.Transaction, @@ -48,3 +111,24 @@ func (p *Parser) processAndFilterObservedTransactions( return filtered, nil } + +func (p *Parser) setProcessingError(err error) { + p.Lock() + defer p.Unlock() + + p.processingErrs = append(p.processingErrs, err) +} + +func (p *Parser) getProcessingErrors() []error { + p.RLock() + defer p.RUnlock() + + return p.processingErrs +} + +func (p *Parser) clearProcessingErrors() { + p.Lock() + defer p.Unlock() + + p.processingErrs = nil +}