Skip to content

Commit

Permalink
Process multiple blocks in parallel (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilkamo committed Apr 21, 2024
1 parent b18745f commit ad530c4
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 121 deletions.
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

I had a lot of fun working on this project. My approach was to create a parser that could be easily extended with new
repositories and ethereum clients. Most of the code is tested with unit tests and I tried to keep it as clean as possible.
The most interesting parts are commented in the code so that reviewers can understand my thought process. The parser processes
blocks one by one (for simplicity) as this is not a perfect production ready service. In a prod environment I would
extend it to process multiple blocks at once by adding multiple workers. This way, the caller could control the batch size
and the parser could be more efficient.
The most interesting parts are commented in the code so that reviewers can understand my thought process.

## Packages

Expand Down
8 changes: 7 additions & 1 deletion parser/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Option func(p *Parser)

func WithBlockProcessTimeout(timeout time.Duration) Option {
return func(p *Parser) {
p.blockProcessTimeout = timeout
p.blocksProcessTimeout = timeout
}
}

Expand Down Expand Up @@ -43,3 +43,9 @@ func WithNoNewBlocksPause(duration time.Duration) Option {
p.noNewBlocksPause = duration
}
}

func WithMaxBlocksToProcess(maxBlocks int) Option {
return func(p *Parser) {
p.maxNumberOfBlocksToProcess = maxBlocks
}
}
10 changes: 9 additions & 1 deletion parser/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestWithBlockProcessTimeout(t *testing.T) {
t.Run("set block process timeout opt", func(t *testing.T) {
p, err := NewParser(endpoint, nil, WithBlockProcessTimeout(2000))
require.NoError(t, err)
require.Equal(t, time.Duration(2000), p.blockProcessTimeout)
require.Equal(t, time.Duration(2000), p.blocksProcessTimeout)
})
}

Expand Down Expand Up @@ -68,3 +68,11 @@ func TestWithNoNewBlocksPause(t *testing.T) {
require.Equal(t, time.Duration(2000), p.noNewBlocksPause)
})
}

func TestWithMaxNumberOfBlocksToProcess(t *testing.T) {
t.Run("set max blocks to process opt", func(t *testing.T) {
p, err := NewParser(endpoint, nil, WithMaxBlocksToProcess(22))
require.NoError(t, err)
require.Equal(t, 22, p.maxNumberOfBlocksToProcess)
})
}
125 changes: 51 additions & 74 deletions parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@ import (
)

const (
defaultBlockProcessTimeout = 5 * time.Second
defaultNoNewBlocksPause = 10 * time.Second // eth new block appears every ~12 seconds
defaultBlocksProcessTimeout = 5 * time.Second
defaultNoNewBlocksPause = 10 * time.Second // eth new block appears every ~12 seconds
defaultMaxNumberOfBlocksToProcess = 10
)

type Parser struct {
blockProcessTimeout time.Duration
ethClient EthereumClient
lastProcessedBlock uint64
logger types.Logger
noNewBlocksPause time.Duration
transactionsRepo TransactionsRepository
addressesRepository AddressesRepository
running bool
singleWorkerChannel chan struct{}
blocksProcessTimeout time.Duration
ethClient EthereumClient
lastProcessedBlock uint64
logger types.Logger
noNewBlocksPause time.Duration
transactionsRepo TransactionsRepository
addressesRepository AddressesRepository
running bool
batchesWorker chan struct{}
sync.RWMutex
maxNumberOfBlocksToProcess int
processingErrs []error
}

func NewParser(
Expand All @@ -36,12 +39,14 @@ func NewParser(
opts ...Option,
) (*Parser, error) {
p := &Parser{
blockProcessTimeout: defaultBlockProcessTimeout,
logger: logger,
noNewBlocksPause: defaultNoNewBlocksPause,
transactionsRepo: storage.NewTransactionRepository(),
addressesRepository: storage.NewAddressesRepository(),
singleWorkerChannel: make(chan struct{}, 1),
blocksProcessTimeout: defaultBlocksProcessTimeout,
logger: logger,
noNewBlocksPause: defaultNoNewBlocksPause,
transactionsRepo: storage.NewTransactionRepository(),
addressesRepository: storage.NewAddressesRepository(),
batchesWorker: make(chan struct{}, 1),
maxNumberOfBlocksToProcess: defaultMaxNumberOfBlocksToProcess,
processingErrs: make([]error, 0),
}

for _, opt := range opts {
Expand All @@ -57,7 +62,7 @@ func NewParser(
p.ethClient = ethClient
}

p.singleWorkerChannel <- struct{}{}
p.batchesWorker <- struct{}{}

return p, nil
}
Expand Down Expand Up @@ -105,6 +110,30 @@ func (p *Parser) GetTransactions(address string) []types.Transaction {
return transactions
}

// getNumberOfBlocksToProcess calculates the number of blocks that the parser should process in the next iteration.
func (p *Parser) getNumberOfBlocksToProcess(ctx context.Context) (int, uint64, error) {
p.RLock()
defer p.RUnlock()

lastBlockNumber, err := p.ethClient.GetMostRecentBlockNumber(ctx)
if err != nil {
return 0, 0, fmt.Errorf("could not get most recent block: %w", err)
}

blocksToProcessCount := int(lastBlockNumber - uint64(p.GetCurrentBlock()))

if blocksToProcessCount > p.maxNumberOfBlocksToProcess {
blocksToProcessCount = p.maxNumberOfBlocksToProcess
}

lastBlockOfTheSequence := p.GetCurrentBlock() + blocksToProcessCount

p.logger.Info("calculated blocks to process",
"blocks", blocksToProcessCount, "lastBlockOfTheSequence", lastBlockOfTheSequence)

return blocksToProcessCount, uint64(lastBlockOfTheSequence), nil
}

// Run starts the parser and listens for new blocks.
// This method is not specified in the task `Parser` interface, but I added it to start the
// parser explicitly (not in the constructor).
Expand Down Expand Up @@ -134,24 +163,12 @@ func (p *Parser) Run(ctx context.Context) error {
case <-ctx.Done():
p.logger.Info("stopping parser")
return nil
case <-p.singleWorkerChannel:
ctx, cancel := context.WithTimeout(ctx, defaultBlockProcessTimeout)

p.logger.Info("fetching and parsing block")

if err := p.fetchAndParseBlock(ctx); err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
p.logger.Error("could not fetch and parse because the context expired")
} else {
p.logger.Error("could not fetch and parse block", "error", err)
}
}
if err == nil {
p.logger.Info("block fetched and parsed", "block", p.lastProcessedBlock)
case <-p.batchesWorker:
if err := p.processBlocks(ctx); err != nil {
p.logger.Error("could not process blocks", "error", err)
}

p.singleWorkerChannel <- struct{}{}
cancel()
p.batchesWorker <- struct{}{}
}
}
}
Expand All @@ -170,46 +187,6 @@ func (p *Parser) setIsRunning(running bool) {
p.running = running
}

func (p *Parser) fetchAndParseBlock(ctx context.Context) error {
lastBlockNumber, err := p.ethClient.GetMostRecentBlockNumber(ctx)
if err != nil {
return fmt.Errorf("could not get most recent block: %w", err)
}

// Check if there are new blocks to process. If not, sleep for a while to avoid spamming the node.
if !p.shouldProcessBlock(lastBlockNumber) {
p.logger.Info("no new blocks, sleeping to avoid spamming the node")
time.Sleep(p.noNewBlocksPause)
return nil
}

// Get the next block to process in the sequence.
block, err := p.ethClient.GetBlockByNumber(ctx, p.lastProcessedBlock+1)
if err != nil {
return fmt.Errorf("could not get block by number: %w", err)
}

// Process the block.
if err = p.processBlock(ctx, block); err != nil {
return fmt.Errorf("could not process block: %w", err)
}

// Save the last processed block.
if err = p.transactionsRepo.SaveLastProcessedBlock(ctx, block.Number); err != nil {
return fmt.Errorf("could not save last processed block: %w", err)
}

// Move to sequence to the next block.
p.setLastProcessedBlock(block.Number)

return nil
}

// shouldProcessBlock checks if there are new blocks to process.
func (p *Parser) shouldProcessBlock(lastBlockNumber uint64) bool {
return p.lastProcessedBlock < lastBlockNumber
}

// setLastProcessedBlock sets the last processed block number.
func (p *Parser) setLastProcessedBlock(blockNumber uint64) {
p.Lock()
Expand Down
45 changes: 4 additions & 41 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"math/big"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -109,7 +108,7 @@ func TestParser(t *testing.T) {
require.Equal(t, "could not get transactions", log.GotErrors()[0])
})

t.Run("parser should error because of context timeout", func(t *testing.T) {
t.Run("parser could not start - should error because of context timeout", func(t *testing.T) {
log := &mock.Logger{}

p, err := NewParser(
Expand All @@ -126,43 +125,7 @@ func TestParser(t *testing.T) {
require.ErrorIs(t, err, context.DeadlineExceeded)
})

t.Run("parser should log timeout", func(t *testing.T) {
log := &mock.Logger{}
ethMock := mock.EthereumClient{
MostRecentBlock: 2,
BlockByNumber: types.Block{},
WithError: context.DeadlineExceeded,
}

p, err := NewParser(
endpoint,
log,
WithNoNewBlocksPause(noNewBlockPauseDuration),
WithEthereumClient(ethMock),
)
require.NoError(t, err)
require.NotNil(t, p)

ctx, cancel := context.WithCancel(context.TODO())

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
err := p.Run(ctx)
require.NoError(t, err)
wg.Done()
}()

require.Eventually(t, func() bool {
return p.isRunning() && len(log.GotErrors()) > 0 &&
strings.Contains(log.GotErrors()[0], "could not fetch and parse because the context expired")
}, time.Second*2, time.Millisecond*100)

cancel()
wg.Wait()
})

t.Run("parser should log error during fetchAndParseBlock", func(t *testing.T) {
t.Run("parser should log error during processing", func(t *testing.T) {
log := &mock.Logger{}
ethMock := mock.EthereumClient{
MostRecentBlock: 2,
Expand Down Expand Up @@ -190,9 +153,9 @@ func TestParser(t *testing.T) {
}()

require.Eventually(t, func() bool {
return p.isRunning() && len(log.GotErrors()) > 0 &&
strings.Contains(log.GotErrors()[0], "could not fetch and parse block")
return p.isRunning() && len(log.GotErrors()) > 10
}, time.Second*2, time.Millisecond*100)
require.Contains(t, log.GotErrors(), "could not process blocks")

cancel()
wg.Wait()
Expand Down
Loading

0 comments on commit ad530c4

Please sign in to comment.