Skip to content

Commit

Permalink
Feature/optimizations (#663)
Browse files Browse the repository at this point in the history
* sync optimizations

* topics optimization + logs

* Sync unit tests

* linter

* fix gha
  • Loading branch information
ARR552 committed Aug 13, 2024
1 parent ec3d79a commit 170ba3c
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 93 deletions.
39 changes: 20 additions & 19 deletions etherman/etherman.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func (etherMan *Client) GetRollupInfoByBlockRange(ctx context.Context, fromBlock
query := ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(fromBlock),
Addresses: etherMan.SCAddresses,
Topics: [][]common.Hash{{updateGlobalExitRootSignatureHash, updateL1InfoTreeSignatureHash, depositEventSignatureHash, claimEventSignatureHash, oldClaimEventSignatureHash, newWrappedTokenEventSignatureHash, verifyBatchesTrustedAggregatorSignatureHash, rollupManagerVerifyBatchesSignatureHash, addExistingRollupSignatureHash, createNewRollupSignatureHash}},
}
if toBlock != nil {
query.ToBlock = new(big.Int).SetUint64(*toBlock)
Expand Down Expand Up @@ -411,10 +412,6 @@ func (etherMan *Client) updateL1InfoTreeEvent(ctx context.Context, vLog types.Lo
}

func (etherMan *Client) processUpdateGlobalExitRootEvent(ctx context.Context, mainnetExitRoot, rollupExitRoot common.Hash, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
fullBlock, err := etherMan.EtherClient.BlockByHash(ctx, vLog.BlockHash)
if err != nil {
return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %v", vLog.BlockNumber, err)
}
var gExitRoot GlobalExitRoot
gExitRoot.ExitRoots = make([]common.Hash, 0)
gExitRoot.ExitRoots = append(gExitRoot.ExitRoots, mainnetExitRoot)
Expand All @@ -423,7 +420,11 @@ func (etherMan *Client) processUpdateGlobalExitRootEvent(ctx context.Context, ma
gExitRoot.BlockNumber = vLog.BlockNumber

if len(*blocks) == 0 || ((*blocks)[len(*blocks)-1].BlockHash != vLog.BlockHash || (*blocks)[len(*blocks)-1].BlockNumber != vLog.BlockNumber) {
t := time.Unix(int64(fullBlock.Time()), 0)
fullBlock, err := etherMan.EtherClient.HeaderByHash(ctx, vLog.BlockHash)
if err != nil {
return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %v", vLog.BlockNumber, err)
}
t := time.Unix(int64(fullBlock.Time), 0)
block := prepareBlock(vLog, t, fullBlock)
block.GlobalExitRoots = append(block.GlobalExitRoots, gExitRoot)
*blocks = append(*blocks, block)
Expand Down Expand Up @@ -460,11 +461,11 @@ func (etherMan *Client) depositEvent(ctx context.Context, vLog types.Log, blocks
deposit.LeafType = d.LeafType

if len(*blocks) == 0 || ((*blocks)[len(*blocks)-1].BlockHash != vLog.BlockHash || (*blocks)[len(*blocks)-1].BlockNumber != vLog.BlockNumber) {
fullBlock, err := etherMan.EtherClient.BlockByHash(ctx, vLog.BlockHash)
fullBlock, err := etherMan.EtherClient.HeaderByHash(ctx, vLog.BlockHash)
if err != nil {
return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %v", vLog.BlockNumber, err)
}
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time()), 0), fullBlock)
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time), 0), fullBlock)
block.Deposits = append(block.Deposits, deposit)
*blocks = append(*blocks, block)
} else if (*blocks)[len(*blocks)-1].BlockHash == vLog.BlockHash && (*blocks)[len(*blocks)-1].BlockNumber == vLog.BlockNumber {
Expand Down Expand Up @@ -516,11 +517,11 @@ func (etherMan *Client) claimEvent(ctx context.Context, vLog types.Log, blocks *
claim.MainnetFlag = mainnetFlag

if len(*blocks) == 0 || ((*blocks)[len(*blocks)-1].BlockHash != vLog.BlockHash || (*blocks)[len(*blocks)-1].BlockNumber != vLog.BlockNumber) {
fullBlock, err := etherMan.EtherClient.BlockByHash(ctx, vLog.BlockHash)
fullBlock, err := etherMan.EtherClient.HeaderByHash(ctx, vLog.BlockHash)
if err != nil {
return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %v", vLog.BlockNumber, err)
}
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time()), 0), fullBlock)
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time), 0), fullBlock)
block.Claims = append(block.Claims, claim)
*blocks = append(*blocks, block)
} else if (*blocks)[len(*blocks)-1].BlockHash == vLog.BlockHash && (*blocks)[len(*blocks)-1].BlockNumber == vLog.BlockNumber {
Expand Down Expand Up @@ -550,11 +551,11 @@ func (etherMan *Client) tokenWrappedEvent(ctx context.Context, vLog types.Log, b
tokenWrapped.BlockNumber = vLog.BlockNumber

if len(*blocks) == 0 || ((*blocks)[len(*blocks)-1].BlockHash != vLog.BlockHash || (*blocks)[len(*blocks)-1].BlockNumber != vLog.BlockNumber) {
fullBlock, err := etherMan.EtherClient.BlockByHash(ctx, vLog.BlockHash)
fullBlock, err := etherMan.EtherClient.HeaderByHash(ctx, vLog.BlockHash)
if err != nil {
return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %v", vLog.BlockNumber, err)
}
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time()), 0), fullBlock)
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time), 0), fullBlock)
block.Tokens = append(block.Tokens, tokenWrapped)
*blocks = append(*blocks, block)
} else if (*blocks)[len(*blocks)-1].BlockHash == vLog.BlockHash && (*blocks)[len(*blocks)-1].BlockNumber == vLog.BlockNumber {
Expand All @@ -571,11 +572,11 @@ func (etherMan *Client) tokenWrappedEvent(ctx context.Context, vLog types.Log, b
return nil
}

func prepareBlock(vLog types.Log, t time.Time, fullBlock *types.Block) Block {
func prepareBlock(vLog types.Log, t time.Time, fullBlock *types.Header) Block {
var block Block
block.BlockNumber = vLog.BlockNumber
block.BlockHash = vLog.BlockHash
block.ParentHash = fullBlock.ParentHash()
block.ParentHash = fullBlock.ParentHash
block.ReceivedAt = t
return block
}
Expand Down Expand Up @@ -648,11 +649,11 @@ func (etherMan *Client) verifyBatches(ctx context.Context, vLog types.Log, block
verifyBatch.Aggregator = aggregator

if len(*blocks) == 0 || ((*blocks)[len(*blocks)-1].BlockHash != vLog.BlockHash || (*blocks)[len(*blocks)-1].BlockNumber != vLog.BlockNumber) {
fullBlock, err := etherMan.EtherClient.BlockByHash(ctx, vLog.BlockHash)
fullBlock, err := etherMan.EtherClient.HeaderByHash(ctx, vLog.BlockHash)
if err != nil {
return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %v", vLog.BlockNumber, err)
}
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time()), 0), fullBlock)
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time), 0), fullBlock)
block.VerifiedBatches = append(block.VerifiedBatches, verifyBatch)
*blocks = append(*blocks, block)
} else if (*blocks)[len(*blocks)-1].BlockHash == vLog.BlockHash && (*blocks)[len(*blocks)-1].BlockNumber == vLog.BlockNumber {
Expand Down Expand Up @@ -715,11 +716,11 @@ func (etherMan *Client) createNewRollupEvent(ctx context.Context, vLog types.Log
}

if len(*blocks) == 0 || ((*blocks)[len(*blocks)-1].BlockHash != vLog.BlockHash || (*blocks)[len(*blocks)-1].BlockNumber != vLog.BlockNumber) {
fullBlock, err := etherMan.EtherClient.BlockByHash(ctx, vLog.BlockHash)
fullBlock, err := etherMan.EtherClient.HeaderByHash(ctx, vLog.BlockHash)
if err != nil {
return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %v", vLog.BlockNumber, err)
}
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time()), 0), fullBlock)
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time), 0), fullBlock)
block.ActivateEtrog = append(block.ActivateEtrog, true)
*blocks = append(*blocks, block)
} else if (*blocks)[len(*blocks)-1].BlockHash == vLog.BlockHash && (*blocks)[len(*blocks)-1].BlockNumber == vLog.BlockNumber {
Expand Down Expand Up @@ -747,11 +748,11 @@ func (etherMan *Client) AddExistingRollupEvent(ctx context.Context, vLog types.L
}

if len(*blocks) == 0 || ((*blocks)[len(*blocks)-1].BlockHash != vLog.BlockHash || (*blocks)[len(*blocks)-1].BlockNumber != vLog.BlockNumber) {
fullBlock, err := etherMan.EtherClient.BlockByHash(ctx, vLog.BlockHash)
fullBlock, err := etherMan.EtherClient.HeaderByHash(ctx, vLog.BlockHash)
if err != nil {
return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %v", vLog.BlockNumber, err)
}
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time()), 0), fullBlock)
block := prepareBlock(vLog, time.Unix(int64(fullBlock.Time), 0), fullBlock)
block.ActivateEtrog = append(block.ActivateEtrog, true)
*blocks = append(*blocks, block)
} else if (*blocks)[len(*blocks)-1].BlockHash == vLog.BlockHash && (*blocks)[len(*blocks)-1].BlockNumber == vLog.BlockNumber {
Expand Down
112 changes: 63 additions & 49 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *ClientSynchronizer) Sync() error {

// Stop function stops the synchronizer
func (s *ClientSynchronizer) Stop() {
log.Info("Stopping synchronizer and cancelling context")
log.Infof("NetworkID: %d, Stopping synchronizer and cancelling context", s.networkID)
s.cancelCtx()
}

Expand Down Expand Up @@ -247,19 +247,25 @@ func (s *ClientSynchronizer) syncBlocks(lastBlockSynced *etherman.Block) (*ether
}
log.Debugf("NetworkID: %d, after checkReorg: no reorg detected", s.networkID)

var fromBlock uint64
if lastBlockSynced.BlockNumber > 0 {
fromBlock := lastBlockSynced.BlockNumber + 1
if s.synced {
fromBlock = lastBlockSynced.BlockNumber
}
toBlock := fromBlock + s.cfg.SyncChunkSize

for {
if toBlock > lastKnownBlock.Uint64() {
log.Debug("Setting toBlock to the lastKnownBlock: ", lastKnownBlock)
log.Debugf("NetworkID: %d, Setting toBlock to the lastKnownBlock: %s", s.networkID, lastKnownBlock.String())
toBlock = lastKnownBlock.Uint64()
if !s.synced {
log.Infof("NetworkID %d Synced!", s.networkID)
waitDuration = s.cfg.SyncInterval.Duration
s.synced = true
s.chSynced <- s.networkID
}
}
if fromBlock > toBlock {
log.Debug("FromBlock is higher than toBlock. Skipping...")
log.Debugf("NetworkID: %d, FromBlock is higher than toBlock. Skipping...", s.networkID)
return lastBlockSynced, nil
}

Expand All @@ -279,54 +285,56 @@ func (s *ClientSynchronizer) syncBlocks(lastBlockSynced *etherman.Block) (*ether
blocks = append([]etherman.Block{{}}, blocks...)
}
} else if fromBlock < s.genBlockNumber {
err := fmt.Errorf("NetworkID: %d. fromBlock %d is lower than the genesisBlockNumber %d", s.networkID, fromBlock, s.genBlockNumber)
err := fmt.Errorf("networkID: %d. fromBlock %d is lower than the genesisBlockNumber %d", s.networkID, fromBlock, s.genBlockNumber)
log.Warn(err)
return lastBlockSynced, err
}
var initBlockReceived *etherman.Block
if len(blocks) != 0 {
initBlockReceived = &blocks[0]
// First position of the array must be deleted
blocks = removeBlockElement(blocks, 0)
} else {
// Reorg detected
log.Infof("NetworkID: %d, reorg detected in block %d while querying GetRollupInfoByBlockRange. Rolling back to at least the previous block", s.networkID, fromBlock)
prevBlock, err := s.storage.GetPreviousBlock(s.ctx, s.networkID, 1, nil)
if errors.Is(err, gerror.ErrStorageNotFound) {
log.Warnf("networkID: %d, error checking reorg: previous block not found in db: %v", s.networkID, err)
prevBlock = &etherman.Block{}
} else if err != nil {
log.Errorf("networkID: %d, error getting previousBlock from db. Error: %v", s.networkID, err)
return lastBlockSynced, err
}
blockReorged, err := s.checkReorg(prevBlock, nil)
if err != nil {
log.Errorf("networkID: %d, error checking reorgs in previous blocks. Error: %v", s.networkID, err)
return lastBlockSynced, err
}
if blockReorged == nil {
blockReorged = prevBlock
if s.synced {
var initBlockReceived *etherman.Block
if len(blocks) != 0 {
initBlockReceived = &blocks[0]
// First position of the array must be deleted
blocks = removeBlockElement(blocks, 0)
} else {
// Reorg detected
log.Infof("NetworkID: %d, reorg detected in block %d while querying GetRollupInfoByBlockRange. Rolling back to at least the previous block", s.networkID, fromBlock)
prevBlock, err := s.storage.GetPreviousBlock(s.ctx, s.networkID, 1, nil)
if errors.Is(err, gerror.ErrStorageNotFound) {
log.Warnf("networkID: %d, error checking reorg: previous block not found in db: %v", s.networkID, err)
prevBlock = &etherman.Block{}
} else if err != nil {
log.Errorf("networkID: %d, error getting previousBlock from db. Error: %v", s.networkID, err)
return lastBlockSynced, err
}
blockReorged, err := s.checkReorg(prevBlock, nil)
if err != nil {
log.Errorf("networkID: %d, error checking reorgs in previous blocks. Error: %v", s.networkID, err)
return lastBlockSynced, err
}
if blockReorged == nil {
blockReorged = prevBlock
}
err = s.resetState(blockReorged.BlockNumber)
if err != nil {
log.Errorf("networkID: %d, error resetting the state to a previous block. Retrying... Err: %v", s.networkID, err)
return lastBlockSynced, fmt.Errorf("error resetting the state to a previous block")
}
return blockReorged, nil
}
err = s.resetState(blockReorged.BlockNumber)
// Check reorg again to be sure that the chain has not changed between the previous checkReorg and the call GetRollupInfoByBlockRange
block, err := s.checkReorg(lastBlockSynced, initBlockReceived)
if err != nil {
log.Errorf("networkID: %d, error resetting the state to a previous block. Retrying... Err: %v", s.networkID, err)
return lastBlockSynced, fmt.Errorf("error resetting the state to a previous block")
log.Errorf("networkID: %d, error checking reorgs. Retrying... Err: %v", s.networkID, err)
return lastBlockSynced, fmt.Errorf("networkID: %d, error checking reorgs", s.networkID)
}
return blockReorged, nil
}
// Check reorg again to be sure that the chain has not changed between the previous checkReorg and the call GetRollupInfoByBlockRange
block, err := s.checkReorg(lastBlockSynced, initBlockReceived)
if err != nil {
log.Errorf("networkID: %d, error checking reorgs. Retrying... Err: %v", s.networkID, err)
return lastBlockSynced, fmt.Errorf("networkID: %d, error checking reorgs", s.networkID)
}
if block != nil {
err = s.resetState(block.BlockNumber)
if err != nil {
log.Errorf("networkID: %d, error resetting the state to a previous block. Retrying... Err: %v", s.networkID, err)
return lastBlockSynced, fmt.Errorf("networkID: %d, error resetting the state to a previous block", s.networkID)
if block != nil {
err = s.resetState(block.BlockNumber)
if err != nil {
log.Errorf("networkID: %d, error resetting the state to a previous block. Retrying... Err: %v", s.networkID, err)
return lastBlockSynced, fmt.Errorf("networkID: %d, error resetting the state to a previous block", s.networkID)
}
return block, nil
}
return block, nil
}

err = s.processBlockRange(blocks, order)
Expand All @@ -348,9 +356,15 @@ func (s *ClientSynchronizer) syncBlocks(lastBlockSynced *etherman.Block) (*ether
s.chSynced <- s.networkID
}
break
} else if !s.synced {
fromBlock = toBlock + 1
toBlock = fromBlock + s.cfg.SyncChunkSize
log.Debugf("NetworkID: %d, not synced yet. Avoid check the same interval. New interval: from block %d, to block %d", s.networkID, fromBlock, toBlock)
} else {
fromBlock = lastBlockSynced.BlockNumber
toBlock = toBlock + s.cfg.SyncChunkSize
log.Debugf("NetworkID: %d, synced!. New interval: from block %d, to block %d", s.networkID, fromBlock, toBlock)
}
fromBlock = lastBlockSynced.BlockNumber
toBlock = toBlock + s.cfg.SyncChunkSize
}

return lastBlockSynced, nil
Expand Down Expand Up @@ -417,7 +431,7 @@ func (s *ClientSynchronizer) processBlockRange(blocks []etherman.Block, order ma
}
case etherman.ActivateEtrogOrder:
// this is activated when the bridge detects the CreateNewRollup or the AddExistingRollup event from the rollupManager
log.Info("Event received. Activating LxLyEtrog...")
log.Infof("NetworkID: %d, Event received. Activating LxLyEtrog...", s.networkID)
}
}
err = s.storage.Commit(s.ctx, dbTx)
Expand Down
Loading

0 comments on commit 170ba3c

Please sign in to comment.