From 26e03f27880a76d76849c6763ee8fbd94af92020 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Tue, 17 Oct 2023 12:51:38 +0200 Subject: [PATCH 1/5] + add logs to batches --- synchronizer/synchronizer.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 91e0b32299..2172dcf1c2 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -354,7 +354,7 @@ func (s *ClientSynchronizer) syncBlocksParallel(lastEthBlockSynced *state.Block) log.Infof("producer is not running. Resetting the state to start from block %v (last on DB)", lastEthBlockSynced.BlockNumber) s.l1SyncOrchestration.producer.Reset(lastEthBlockSynced.BlockNumber) } - log.Infof("Starting L1 sync orchestrator in parallel") + log.Infof("Starting L1 sync orchestrator in parallel block: %d", lastEthBlockSynced.BlockNumber) return s.l1SyncOrchestration.start(lastEthBlockSynced) } @@ -944,6 +944,7 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman. if errors.Is(err, state.ErrNotFound) { log.Debugf("BatchNumber: %d, not found in trusted state. Storing it...", batch.BatchNumber) // If it is not found, store batch + log.Infof("processSequenceBatches: (not found batch) ProcessAndStoreClosedBatch . BatchNumber: %d, BlockNumber: %d", processCtx.BatchNumber, blockNumber) newStateRoot, flushID, proverID, err := s.state.ProcessAndStoreClosedBatch(s.ctx, processCtx, batch.BatchL2Data, dbTx, stateMetrics.SynchronizerCallerLabel) if err != nil { log.Errorf("error storing trustedBatch. BatchNumber: %d, BlockNumber: %d, error: %v", batch.BatchNumber, blockNumber, err) @@ -1024,6 +1025,7 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman. } else { log.Warnf("missmatch in trusted state detected, discarding batches until batchNum %d", previousBatchNumber) } + log.Infof("ResetTrustedState: Resetting trusted state. delete batch > %d, ", previousBatchNumber) err = s.state.ResetTrustedState(s.ctx, previousBatchNumber, dbTx) // This method has to reset the forced batches deleting the batchNumber for higher batchNumbers if err != nil { log.Errorf("error resetting trusted state. BatchNumber: %d, BlockNumber: %d, error: %v", batch.BatchNumber, blockNumber, err) @@ -1035,6 +1037,7 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman. log.Errorf("error resetting trusted state. BatchNumber: %d, BlockNumber: %d, error: %v", batch.BatchNumber, blockNumber, err) return err } + log.Infof("processSequenceBatches: (deleted previous) ProcessAndStoreClosedBatch . BatchNumber: %d, BlockNumber: %d", processCtx.BatchNumber, blockNumber) _, flushID, proverID, err := s.state.ProcessAndStoreClosedBatch(s.ctx, processCtx, batch.BatchL2Data, dbTx, stateMetrics.SynchronizerCallerLabel) if err != nil { log.Errorf("error storing trustedBatch. BatchNumber: %d, BlockNumber: %d, error: %v", batch.BatchNumber, blockNumber, err) @@ -1050,6 +1053,7 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman. } // Store virtualBatch + log.Infof("processSequenceBatches: Storing virtualBatch. BatchNumber: %d, BlockNumber: %d", virtualBatch.BatchNumber, blockNumber) err = s.state.AddVirtualBatch(s.ctx, &virtualBatch, dbTx) if err != nil { log.Errorf("error storing virtualBatch. BatchNumber: %d, BlockNumber: %d, error: %v", virtualBatch.BatchNumber, blockNumber, err) @@ -1104,6 +1108,7 @@ func (s *ClientSynchronizer) processSequenceForceBatch(sequenceForceBatch []ethe s.trustedState.lastStateRoot = nil // Reset trusted state + log.Infof("ResetTrustedState: processSequenceForceBatch: Resetting trusted state. delete batch > (lastVirtualizedBatchNumber)%d, ", lastVirtualizedBatchNumber) err = s.state.ResetTrustedState(s.ctx, lastVirtualizedBatchNumber, dbTx) // This method has to reset the forced batches deleting the batchNumber for higher batchNumbers if err != nil { log.Errorf("error resetting trusted state. BatchNumber: %d, BlockNumber: %d, error: %v", lastVirtualizedBatchNumber, block.BlockNumber, err) @@ -1166,6 +1171,7 @@ func (s *ClientSynchronizer) processSequenceForceBatch(sequenceForceBatch []ethe BatchL2Data: &forcedBatches[i].RawTxsData, } // Process batch + log.Infof("processSequenceFoceBatches: ProcessAndStoreClosedBatch . BatchNumber: %d, BlockNumber: %d", batch.BatchNumber, block.BlockNumber) _, flushID, proverID, err := s.state.ProcessAndStoreClosedBatch(s.ctx, batch, forcedBatches[i].RawTxsData, dbTx, stateMetrics.SynchronizerCallerLabel) if err != nil { log.Errorf("error processing batch in processSequenceForceBatch. BatchNumber: %d, BlockNumber: %d, error: %v", batch.BatchNumber, block.BlockNumber, err) @@ -1180,6 +1186,7 @@ func (s *ClientSynchronizer) processSequenceForceBatch(sequenceForceBatch []ethe s.pendingFlushID(flushID, proverID) // Store virtualBatch + log.Infof("processSequenceFoceBatches: Storing virtualBatch. BatchNumber: %d, BlockNumber: %d", virtualBatch.BatchNumber, block.BlockNumber) err = s.state.AddVirtualBatch(s.ctx, &virtualBatch, dbTx) if err != nil { log.Errorf("error storing virtualBatch in processSequenceForceBatch. BatchNumber: %d, BlockNumber: %d, error: %v", virtualBatch.BatchNumber, block.BlockNumber, err) @@ -1221,6 +1228,7 @@ func (s *ClientSynchronizer) processForcedBatch(forcedBatch etherman.ForcedBatch RawTxsData: forcedBatch.RawTxsData, ForcedAt: forcedBatch.ForcedAt, } + log.Infof("processForcedBatch: Storing forcedBatch. BatchNumber: %d BlockNumber: %d", forcedBatch.ForcedBatchNumber, forcedBatch.BlockNumber) err := s.state.AddForcedBatch(s.ctx, &forcedB, dbTx) if err != nil { log.Errorf("error storing the forcedBatch in processForcedBatch. BlockNumber: %d", forcedBatch.BlockNumber) @@ -1306,6 +1314,7 @@ func (s *ClientSynchronizer) processTrustedVerifyBatches(lastVerifiedBatch ether TxHash: lastVerifiedBatch.TxHash, IsTrusted: true, } + log.Infof("processTrustedVerifyBatches: Storing verifiedB. BlockNumber: %d, BatchNumber: %d", verifiedB.BlockNumber, verifiedB.BatchNumber) err = s.state.AddVerifiedBatch(s.ctx, &verifiedB, dbTx) if err != nil { log.Errorf("error storing the verifiedB in processTrustedVerifyBatches. verifiedBatch: %+v, lastVerifiedBatch: %+v", verifiedB, lastVerifiedBatch) @@ -1372,7 +1381,7 @@ func (s *ClientSynchronizer) processTrustedBatch(trustedBatch *types.Batch, dbTx // Find txs to be processed and included in the trusted state if *s.trustedState.lastStateRoot == batches[1].StateRoot { prevBatch := uint64(trustedBatch.Number) - 1 - log.Info("Cleaning state until batch: ", prevBatch) + log.Infof("ResetTrustedState: processTrustedBatch: %d Cleaning state until batch:%d ", trustedBatch.Number, prevBatch) // Delete txs that were stored before restart. We need to reprocess all txs because the intermediary stateRoot is only stored in memory err := s.state.ResetTrustedState(s.ctx, prevBatch, dbTx) if err != nil { From b383d9e07c3bd7d1a0f05fe2b1b024a1cab619cb Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Tue, 17 Oct 2023 14:44:04 +0200 Subject: [PATCH 2/5] + consumer ignore no sequential blocks --- synchronizer/l1_rollup_info_consumer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/synchronizer/l1_rollup_info_consumer.go b/synchronizer/l1_rollup_info_consumer.go index 4856ce81cd..c9c0e1af04 100644 --- a/synchronizer/l1_rollup_info_consumer.go +++ b/synchronizer/l1_rollup_info_consumer.go @@ -128,6 +128,7 @@ func (l *l1RollupInfoConsumer) processIncommingRollupInfoData(rollupInfo rollupI if (l.lastEthBlockSynced != nil) && (l.lastEthBlockSynced.BlockNumber+1 != rollupInfo.blockRange.fromBlock) { log.Infof("consumer: received a rollupInfo with a wrong block range. Ignoring it. Last block synced: %d. RollupInfo block range: %s", l.lastEthBlockSynced.BlockNumber, rollupInfo.blockRange.String()) + return nil } // Uncommented that line to produce a infinite loop of errors, and resets! (just for develop) //return errors.New("forcing an continuous error!") From 82d5de111491eb115bfe9e577bc67d1e44d70d0a Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Tue, 17 Oct 2023 18:02:06 +0200 Subject: [PATCH 3/5] + fix consumer tracking blocks --- synchronizer/l1_rollup_info_consumer.go | 16 +++++++++++++--- synchronizer/l1_rollup_info_producer.go | 3 +++ test/Makefile | 16 ++++++++++++++-- test/config/test.node.config.toml | 4 ++-- 4 files changed, 32 insertions(+), 7 deletions(-) diff --git a/synchronizer/l1_rollup_info_consumer.go b/synchronizer/l1_rollup_info_consumer.go index c9c0e1af04..67a0992fad 100644 --- a/synchronizer/l1_rollup_info_consumer.go +++ b/synchronizer/l1_rollup_info_consumer.go @@ -44,6 +44,7 @@ type l1RollupInfoConsumer struct { ctx context.Context statistics l1RollupInfoConsumerStatistics lastEthBlockSynced *state.Block + highestBlockProcessed uint64 } func newL1RollupInfoConsumer(cfg configConsumer, @@ -62,12 +63,20 @@ func newL1RollupInfoConsumer(cfg configConsumer, startTime: time.Now(), cfg: cfg, }, + highestBlockProcessed: 0, } } func (l *l1RollupInfoConsumer) Start(ctx context.Context, lastEthBlockSynced *state.Block) error { l.ctx = ctx l.lastEthBlockSynced = lastEthBlockSynced + if l.highestBlockProcessed == 0 && lastEthBlockSynced != nil { + l.highestBlockProcessed = lastEthBlockSynced.BlockNumber + } + if lastEthBlockSynced == nil { + log.Infof("consumer: lastEthBlockSynced is nil, so resetting highestBlockProcessed to 0") + l.highestBlockProcessed = 0 + } l.statistics.onStart() err := l.step() for ; err == nil; err = l.step() { @@ -125,11 +134,12 @@ func (l *l1RollupInfoConsumer) processIncommingRollupInfoData(rollupInfo rollupI l.mutex.Lock() defer l.mutex.Unlock() var err error - if (l.lastEthBlockSynced != nil) && (l.lastEthBlockSynced.BlockNumber+1 != rollupInfo.blockRange.fromBlock) { - log.Infof("consumer: received a rollupInfo with a wrong block range. Ignoring it. Last block synced: %d. RollupInfo block range: %s", - l.lastEthBlockSynced.BlockNumber, rollupInfo.blockRange.String()) + if (l.highestBlockProcessed > 0) && (l.highestBlockProcessed+1 != rollupInfo.blockRange.fromBlock) { + log.Infof("consumer: received a rollupInfo with a wrong block range. Ignoring it. Highest block synced: %d. RollupInfo block range: %s", + l.highestBlockProcessed, rollupInfo.blockRange.String()) return nil } + l.highestBlockProcessed = rollupInfo.blockRange.toBlock // Uncommented that line to produce a infinite loop of errors, and resets! (just for develop) //return errors.New("forcing an continuous error!") statisticsMsg := l.statistics.onStartProcessIncommingRollupInfoData(rollupInfo) diff --git a/synchronizer/l1_rollup_info_producer.go b/synchronizer/l1_rollup_info_producer.go index 2a5f673b3e..98e0f04ff6 100644 --- a/synchronizer/l1_rollup_info_producer.go +++ b/synchronizer/l1_rollup_info_producer.go @@ -356,6 +356,9 @@ func (l *l1RollupInfoProducer) step(waitDuration *time.Duration) bool { // If after asking for a new lastBlockOnL1 we are still synchronized then we are synchronized if l.syncStatus.isNodeFullySynchronizedWithL1() { l.setStatus(producerSynchronized) + } else { + log.Infof("producer: producerWorking: still not synchronized with the new block range launch workers again") + l.launchWork() } case producerSynchronized: // renew last block on L1 if needed diff --git a/test/Makefile b/test/Makefile index 79bd1364e6..3100fda176 100644 --- a/test/Makefile +++ b/test/Makefile @@ -432,14 +432,26 @@ stop-grafana: ## Stops the grafana service $(STOPGRAFANA) .PHONY: run-permissionless -run-permissionless: run-node ## Runs the trusted and permissionless node +run-permissionless: run-node run-permissionless-dependencies ## Runs the trusted and permissionless node $(RUNPERMISSIONLESSDB) sleep 3 $(RUNPERMISSIONLESSZKPROVER) $(RUNPERMISSIONLESSNODE) .PHONY: stop-permissionless -stop-permissionless: stop-node## Stops the permissionless node +stop-permissionless: stop-node stop-permissionless-dependencies ## Stops the permissionless node + $(STOPPERMISSIONLESSNODE) + + +PHONY: run-permissionless-dependencies +run-permissionless-dependencies: ## Runs the permissionless dependencies (db + prover) without the node + $(RUNPERMISSIONLESSDB) + sleep 3 + $(RUNPERMISSIONLESSZKPROVER) + + +PHONY: stop-permissionless-dependencies +stop-permissionless-dependencies: ## Stop the permissionless dependencies (db + prover) without the node $(STOPPERMISSIONLESSNODE) $(STOPPERMISSIONLESSZKPROVER) $(STOPPERMISSIONLESSDB) diff --git a/test/config/test.node.config.toml b/test/config/test.node.config.toml index b1be8dfdc5..9b1327babe 100644 --- a/test/config/test.node.config.toml +++ b/test/config/test.node.config.toml @@ -34,7 +34,7 @@ IntervalToRefreshBlockedAddresses = "5m" IntervalToRefreshGasPrices = "5s" MaxTxBytesSize=100132 MaxTxDataBytesSize=100000 -DefaultMinGasPriceAllowed = 1000000000 +DefaultMinGasPriceAllowed = 0 MinAllowedGasPriceInterval = "5m" PollMinAllowedGasPriceInterval = "15s" [Pool.DB] @@ -60,7 +60,7 @@ ReadTimeout = "60s" WriteTimeout = "60s" MaxRequestsPerIPAndSecond = 5000 SequencerNodeURI = "" -EnableL2SuggestedGasPricePolling = true +EnableL2SuggestedGasPricePolling = false [RPC.WebSockets] Enabled = true Port = 8133 From 36f062ea284f0bfe0d8fc2f92955f15bfc6d7dd9 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Tue, 17 Oct 2023 22:14:36 +0200 Subject: [PATCH 4/5] + add logs and unittest --- synchronizer/l1_rollup_info_consumer.go | 49 ++++---- .../l1_rollup_info_consumer_statistics.go | 35 ++++-- ...l1_rollup_info_consumer_statistics_test.go | 117 ++++++++++++++++++ synchronizer/l1_rollup_info_consumer_test.go | 81 ++++++++++++ synchronizer/l1_sync_orchestration.go | 5 +- .../mock_l1_rollup_consumer_interface.go | 5 + 6 files changed, 262 insertions(+), 30 deletions(-) create mode 100644 synchronizer/l1_rollup_info_consumer_statistics_test.go diff --git a/synchronizer/l1_rollup_info_consumer.go b/synchronizer/l1_rollup_info_consumer.go index 67a0992fad..c5364bc75e 100644 --- a/synchronizer/l1_rollup_info_consumer.go +++ b/synchronizer/l1_rollup_info_consumer.go @@ -63,20 +63,18 @@ func newL1RollupInfoConsumer(cfg configConsumer, startTime: time.Now(), cfg: cfg, }, - highestBlockProcessed: 0, + highestBlockProcessed: invalidBlockNumber, } } func (l *l1RollupInfoConsumer) Start(ctx context.Context, lastEthBlockSynced *state.Block) error { l.ctx = ctx l.lastEthBlockSynced = lastEthBlockSynced - if l.highestBlockProcessed == 0 && lastEthBlockSynced != nil { + if l.highestBlockProcessed == invalidBlockNumber && lastEthBlockSynced != nil { + log.Infof("consumer: Starting consumer. setting HighestBlockProcessed: %d (lastEthBlockSynced)", lastEthBlockSynced.BlockNumber) l.highestBlockProcessed = lastEthBlockSynced.BlockNumber } - if lastEthBlockSynced == nil { - log.Infof("consumer: lastEthBlockSynced is nil, so resetting highestBlockProcessed to 0") - l.highestBlockProcessed = 0 - } + log.Infof("consumer: Starting consumer. HighestBlockProcessed: %d", l.highestBlockProcessed) l.statistics.onStart() err := l.step() for ; err == nil; err = l.step() { @@ -84,9 +82,18 @@ func (l *l1RollupInfoConsumer) Start(ctx context.Context, lastEthBlockSynced *st if err != errConsumerStopped && err != errConsumerStoppedBecauseIsSynchronized { return err } - // The errConsumerStopped is not an error, so we return nil meaning that the process finished in a normal way + // The errConsumerStopped||errConsumerStoppedBecauseIsSynchronized are not an error, so we return nil meaning that the process finished in a normal way return nil } + +func (l *l1RollupInfoConsumer) Reset(startingBlockNumber uint64) { + l.mutex.Lock() + defer l.mutex.Unlock() + l.highestBlockProcessed = startingBlockNumber + l.lastEthBlockSynced = nil + l.statistics.onReset() +} + func (l *l1RollupInfoConsumer) step() error { l.statistics.onStartStep() var err error @@ -134,8 +141,8 @@ func (l *l1RollupInfoConsumer) processIncommingRollupInfoData(rollupInfo rollupI l.mutex.Lock() defer l.mutex.Unlock() var err error - if (l.highestBlockProcessed > 0) && (l.highestBlockProcessed+1 != rollupInfo.blockRange.fromBlock) { - log.Infof("consumer: received a rollupInfo with a wrong block range. Ignoring it. Highest block synced: %d. RollupInfo block range: %s", + if (l.highestBlockProcessed != invalidBlockNumber) && (l.highestBlockProcessed+1 != rollupInfo.blockRange.fromBlock) { + log.Warnf("consumer: received a rollupInfo with a wrong block range. Ignoring it. Highest block synced: %d. RollupInfo block range: %s", l.highestBlockProcessed, rollupInfo.blockRange.String()) return nil } @@ -148,7 +155,7 @@ func (l *l1RollupInfoConsumer) processIncommingRollupInfoData(rollupInfo rollupI l.lastEthBlockSynced, err = l.processUnsafe(rollupInfo) l.statistics.onFinishProcessIncommingRollupInfoData(rollupInfo, time.Since(timeProcessingStart), err) if err != nil { - log.Info("consumer: error processing rollupInfo. Error: ", err) + log.Infof("consumer: error processing rollupInfo %s. Error: %s", rollupInfo.blockRange.String(), err.Error()) return err } l.statistics.numProcessedBlocks += uint64(len(rollupInfo.blocks)) @@ -179,16 +186,7 @@ func (l *l1RollupInfoConsumer) processUnsafe(rollupInfo rollupInfoByBlockRangeRe blocks := rollupInfo.blocks order := rollupInfo.order var lastEthBlockSynced *state.Block - err := l.synchronizer.processBlockRange(blocks, order) - if err != nil { - log.Info("consumer: Error processing block range: ", rollupInfo.blockRange, " err:", err) - return nil, err - } - if len(blocks) > 0 { - tmpStateBlock := convertEthmanBlockToStateBlock(&blocks[len(blocks)-1]) - lastEthBlockSynced = &tmpStateBlock - logBlocks(blocks) - } + if len(blocks) == 0 { lb := rollupInfo.lastBlockOfRange if lb == nil { @@ -196,7 +194,7 @@ func (l *l1RollupInfoConsumer) processUnsafe(rollupInfo rollupInfoByBlockRangeRe return nil, errMissingLastBlock } b := convertL1BlockToEthBlock(lb) - err = l.synchronizer.processBlockRange([]etherman.Block{b}, order) + err := l.synchronizer.processBlockRange([]etherman.Block{b}, order) if err != nil { log.Error("consumer: Error processing last block of range: ", rollupInfo.blockRange, " err:", err) return nil, err @@ -204,6 +202,15 @@ func (l *l1RollupInfoConsumer) processUnsafe(rollupInfo rollupInfoByBlockRangeRe block := convertL1BlockToStateBlock(lb) lastEthBlockSynced = &block log.Debug("consumer: Storing empty block. BlockNumber: ", b.BlockNumber, ". BlockHash: ", b.BlockHash) + } else { + tmpStateBlock := convertEthmanBlockToStateBlock(&blocks[len(blocks)-1]) + lastEthBlockSynced = &tmpStateBlock + logBlocks(blocks) + err := l.synchronizer.processBlockRange(blocks, order) + if err != nil { + log.Info("consumer: Error processing block range: ", rollupInfo.blockRange, " err:", err) + return nil, err + } } return lastEthBlockSynced, nil } diff --git a/synchronizer/l1_rollup_info_consumer_statistics.go b/synchronizer/l1_rollup_info_consumer_statistics.go index 9e71059541..176cbe1991 100644 --- a/synchronizer/l1_rollup_info_consumer_statistics.go +++ b/synchronizer/l1_rollup_info_consumer_statistics.go @@ -9,35 +9,54 @@ import ( ) type l1RollupInfoConsumerStatistics struct { - numProcessedRollupInfo uint64 - numProcessedBlocks uint64 - startTime time.Time - timePreviousProcessingDuration time.Duration - startStepTime time.Time - cfg configConsumer + numProcessedRollupInfo uint64 + numProcessedRollupInfoForCheckTime uint64 + numProcessedBlocks uint64 + startTime time.Time + timePreviousProcessingDuration time.Duration + startStepTime time.Time + cfg configConsumer } func (l *l1RollupInfoConsumerStatistics) onStart() { l.startTime = time.Now() l.startStepTime = time.Time{} + l.numProcessedRollupInfoForCheckTime = 0 } func (l *l1RollupInfoConsumerStatistics) onStartStep() { l.startStepTime = time.Now() } +func (l *l1RollupInfoConsumerStatistics) onReset() { + l.numProcessedRollupInfoForCheckTime = 0 + l.startStepTime = time.Time{} +} + func (l *l1RollupInfoConsumerStatistics) onStartProcessIncommingRollupInfoData(rollupInfo rollupInfoByBlockRangeResult) string { now := time.Now() // Time have have been blocked in the select statement waitingTimeForData := now.Sub(l.startStepTime) blocksPerSecond := float64(l.numProcessedBlocks) / time.Since(l.startTime).Seconds() - if l.numProcessedRollupInfo > uint64(l.cfg.numIterationsBeforeStartCheckingTimeWaitingForNewRollupInfoData) && waitingTimeForData > l.cfg.acceptableTimeWaitingForNewRollupInfoData { + generatedWarning := false + if l.numProcessedRollupInfoForCheckTime > uint64(l.cfg.numIterationsBeforeStartCheckingTimeWaitingForNewRollupInfoData) && waitingTimeForData > l.cfg.acceptableTimeWaitingForNewRollupInfoData { msg := fmt.Sprintf("wasted waiting for new rollupInfo from L1: %s last_process: %s new range: %s block_per_second: %f", waitingTimeForData, l.timePreviousProcessingDuration, rollupInfo.blockRange.String(), blocksPerSecond) log.Warnf("consumer:: Too much wasted time (waiting to receive a new data):%s", msg) + generatedWarning = true } l.numProcessedRollupInfo++ - msg := fmt.Sprintf("wasted_time_waiting_for_data [%s] last_process_time [%s] block_per_second [%f]", waitingTimeForData.Round(time.Second).String(), l.timePreviousProcessingDuration, blocksPerSecond) + l.numProcessedRollupInfoForCheckTime++ + msg := fmt.Sprintf("wasted_time_waiting_for_data [%s] last_process_time [%s] block_per_second [%f]", + waitingTimeForData.Round(time.Second).String(), + l.timePreviousProcessingDuration, + blocksPerSecond) + if waitingTimeForData > l.cfg.acceptableTimeWaitingForNewRollupInfoData { + msg = msg + " WASTED_TIME_EXCEED " + } + if generatedWarning { + msg = msg + " WARNING_WASTED_TIME " + } return msg } diff --git a/synchronizer/l1_rollup_info_consumer_statistics_test.go b/synchronizer/l1_rollup_info_consumer_statistics_test.go new file mode 100644 index 0000000000..c0acc01aac --- /dev/null +++ b/synchronizer/l1_rollup_info_consumer_statistics_test.go @@ -0,0 +1,117 @@ +package synchronizer + +import ( + "testing" + "time" + + "github.com/0xPolygonHermez/zkevm-node/etherman" + "github.com/stretchr/testify/assert" +) + +func TestL1RollupInfoConsumerStatistics(t *testing.T) { + cfg := configConsumer{ + numIterationsBeforeStartCheckingTimeWaitingForNewRollupInfoData: 10, + acceptableTimeWaitingForNewRollupInfoData: 5 * time.Second, + } + stats := l1RollupInfoConsumerStatistics{ + cfg: cfg, + } + + stats.onStart() + stats.onStartStep() + + // Test onFinishProcessIncommingRollupInfoData + rollupInfo := rollupInfoByBlockRangeResult{ + blockRange: blockRange{ + fromBlock: 1, + toBlock: 10, + }, + blocks: []etherman.Block{}, + } + executionTime := 2 * time.Second + stats.onStartProcessIncommingRollupInfoData(rollupInfo) + stats.onFinishProcessIncommingRollupInfoData(rollupInfo, executionTime, error(nil)) + assert.Equal(t, stats.timePreviousProcessingDuration, executionTime) + assert.Equal(t, stats.numProcessedRollupInfo, uint64(1)) + assert.Equal(t, stats.numProcessedBlocks, uint64(len(rollupInfo.blocks))) + + stats.onStart() + stats.onStartStep() + + msg := stats.onStartProcessIncommingRollupInfoData(rollupInfo) + assert.Contains(t, msg, "wasted_time_waiting_for_data") + assert.Contains(t, msg, "last_process_time") + assert.Contains(t, msg, "block_per_second") + assert.NotContains(t, msg, "WASTED_TIME_EXCEED") + assert.NotContains(t, msg, "WARNING_WASTED_TIME") +} + +func TestL1RollupInfoConsumerStatisticsWithExceedTimeButNoWarningGenerated(t *testing.T) { + cfg := configConsumer{ + numIterationsBeforeStartCheckingTimeWaitingForNewRollupInfoData: 10, + acceptableTimeWaitingForNewRollupInfoData: 0 * time.Second, + } + stats := l1RollupInfoConsumerStatistics{ + cfg: cfg, + } + + stats.onStart() + stats.onStartStep() + + // Test onFinishProcessIncommingRollupInfoData + rollupInfo := rollupInfoByBlockRangeResult{ + blockRange: blockRange{ + fromBlock: 1, + toBlock: 10, + }, + blocks: []etherman.Block{}, + } + executionTime := 2 * time.Second + err := error(nil) + stats.onStartProcessIncommingRollupInfoData(rollupInfo) + stats.onFinishProcessIncommingRollupInfoData(rollupInfo, executionTime, err) + + stats.onStartStep() + msg := stats.onStartProcessIncommingRollupInfoData(rollupInfo) + assert.Contains(t, msg, "wasted_time_waiting_for_data") + assert.Contains(t, msg, "last_process_time") + assert.Contains(t, msg, "block_per_second") + assert.Contains(t, msg, "WASTED_TIME_EXCEED") + assert.NotContains(t, msg, "WARNING_WASTED_TIME") +} + +func TestL1RollupInfoConsumerStatisticsWithExceedTimeButAndWarningGenerated(t *testing.T) { + cfg := configConsumer{ + numIterationsBeforeStartCheckingTimeWaitingForNewRollupInfoData: 1, + acceptableTimeWaitingForNewRollupInfoData: 0 * time.Second, + } + stats := l1RollupInfoConsumerStatistics{ + cfg: cfg, + } + + stats.onStart() + stats.onStartStep() + + // Test onFinishProcessIncommingRollupInfoData + rollupInfo := rollupInfoByBlockRangeResult{ + blockRange: blockRange{ + fromBlock: 1, + toBlock: 10, + }, + blocks: []etherman.Block{}, + } + executionTime := 2 * time.Second + err := error(nil) + stats.onStartProcessIncommingRollupInfoData(rollupInfo) + stats.onFinishProcessIncommingRollupInfoData(rollupInfo, executionTime, err) + stats.onStartProcessIncommingRollupInfoData(rollupInfo) + stats.onFinishProcessIncommingRollupInfoData(rollupInfo, executionTime, err) + + stats.onStartStep() + msg := stats.onStartProcessIncommingRollupInfoData(rollupInfo) + assert.Contains(t, msg, "wasted_time_waiting_for_data") + assert.Contains(t, msg, "last_process_time") + assert.Contains(t, msg, "block_per_second") + assert.Contains(t, msg, "WASTED_TIME_EXCEED") + assert.Contains(t, msg, "WARNING_WASTED_TIME") +} diff --git a/synchronizer/l1_rollup_info_consumer_test.go b/synchronizer/l1_rollup_info_consumer_test.go index 44209729ad..fa02f85561 100644 --- a/synchronizer/l1_rollup_info_consumer_test.go +++ b/synchronizer/l1_rollup_info_consumer_test.go @@ -3,11 +3,13 @@ package synchronizer import ( "context" "errors" + "math/big" "testing" "time" "github.com/0xPolygonHermez/zkevm-node/etherman" "github.com/ethereum/go-ethereum/common" + types "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -63,6 +65,85 @@ func TestGivenConsumerWhenFailsToProcessRollupThenDontKnownLastEthBlock(t *testi require.False(t, ok) } +func TestGivenConsumerWhenReceiveNoNextBlockThenDoNothing(t *testing.T) { + ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + data := setupConsumerTest(t) + defer cancel() + responseRollupInfoByBlockRange := rollupInfoByBlockRangeResult{ + blockRange: blockRange{ + fromBlock: 100, + toBlock: 200, + }, + blocks: []etherman.Block{}, + order: map[common.Hash][]etherman.Order{}, + lastBlockOfRange: nil, + } + // Is not going to call processBlockRange + data.ch <- *newL1SyncMessageData(&responseRollupInfoByBlockRange) + data.ch <- *newL1SyncMessageControl(eventProducerIsFullySynced) + data.sut.Reset(1234) + err := data.sut.Start(ctxTimeout, nil) + require.NoError(t, err) + _, ok := data.sut.GetLastEthBlockSynced() + require.False(t, ok) +} + +func TestGivenConsumerWhenNextBlockNumberIsNoSetThenAcceptAnythingAndProcess(t *testing.T) { + ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + data := setupConsumerTest(t) + defer cancel() + responseRollupInfoByBlockRange := rollupInfoByBlockRangeResult{ + blockRange: blockRange{ + fromBlock: 100, + toBlock: 200, + }, + blocks: []etherman.Block{}, + order: map[common.Hash][]etherman.Order{}, + lastBlockOfRange: types.NewBlock(&types.Header{Number: big.NewInt(123)}, nil, nil, nil, nil), + } + + data.ch <- *newL1SyncMessageData(&responseRollupInfoByBlockRange) + data.ch <- *newL1SyncMessageControl(eventProducerIsFullySynced) + data.syncMock. + On("processBlockRange", mock.Anything, mock.Anything). + Return(nil). + Once() + err := data.sut.Start(ctxTimeout, nil) + require.NoError(t, err) + resultBlock, ok := data.sut.GetLastEthBlockSynced() + require.True(t, ok) + require.Equal(t, uint64(123), resultBlock.BlockNumber) +} + +func TestGivenConsumerWhenNextBlockNumberIsNoSetThenFirstRollupInfoSetIt(t *testing.T) { + ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + data := setupConsumerTest(t) + defer cancel() + responseRollupInfoByBlockRange := rollupInfoByBlockRangeResult{ + blockRange: blockRange{ + fromBlock: 100, + toBlock: 200, + }, + blocks: []etherman.Block{}, + order: map[common.Hash][]etherman.Order{}, + lastBlockOfRange: types.NewBlock(&types.Header{Number: big.NewInt(123)}, nil, nil, nil, nil), + } + // Fist package set highestBlockProcessed + data.ch <- *newL1SyncMessageData(&responseRollupInfoByBlockRange) + // The repeated package is ignored because is not the next BlockRange + data.ch <- *newL1SyncMessageData(&responseRollupInfoByBlockRange) + data.ch <- *newL1SyncMessageControl(eventProducerIsFullySynced) + data.syncMock. + On("processBlockRange", mock.Anything, mock.Anything). + Return(nil). + Once() + err := data.sut.Start(ctxTimeout, nil) + require.NoError(t, err) + resultBlock, ok := data.sut.GetLastEthBlockSynced() + require.True(t, ok) + require.Equal(t, uint64(123), resultBlock.BlockNumber) +} + func setupConsumerTest(t *testing.T) consumerTestData { syncMock := newSynchronizerProcessBlockRangeMock(t) ch := make(chan l1SyncMessage, 10) diff --git a/synchronizer/l1_sync_orchestration.go b/synchronizer/l1_sync_orchestration.go index 365d46c3ac..1972d7681b 100644 --- a/synchronizer/l1_sync_orchestration.go +++ b/synchronizer/l1_sync_orchestration.go @@ -17,7 +17,7 @@ type l1RollupProducerInterface interface { Start(ctx context.Context) error // Stop cancel current process Stop() - // ResetAndStop set a new starting point and cancel current process if any + // Reset set a new starting point and cancel current process if any Reset(startingBlockNumber uint64) } @@ -25,6 +25,8 @@ type l1RollupConsumerInterface interface { Start(ctx context.Context, lastEthBlockSynced *state.Block) error StopAfterProcessChannelQueue() GetLastEthBlockSynced() (state.Block, bool) + // Reset set a new starting point + Reset(startingBlockNumber uint64) } type l1SyncOrchestration struct { @@ -63,6 +65,7 @@ func (l *l1SyncOrchestration) reset(startingBlockNumber uint64) { if l.isRunning { log.Infof("orchestration: reset(%d) is going to reset producer", startingBlockNumber) } + l.consumer.Reset(startingBlockNumber) l.producer.Reset(startingBlockNumber) // If orchestrator is running then producer is going to be started by orchestrate() select function when detects that producer has finished } diff --git a/synchronizer/mock_l1_rollup_consumer_interface.go b/synchronizer/mock_l1_rollup_consumer_interface.go index f7b62cd908..99a4c62cb0 100644 --- a/synchronizer/mock_l1_rollup_consumer_interface.go +++ b/synchronizer/mock_l1_rollup_consumer_interface.go @@ -38,6 +38,11 @@ func (_m *l1RollupConsumerInterfaceMock) GetLastEthBlockSynced() (state.Block, b return r0, r1 } +// Reset provides a mock function with given fields: startingBlockNumber +func (_m *l1RollupConsumerInterfaceMock) Reset(startingBlockNumber uint64) { + _m.Called(startingBlockNumber) +} + // Start provides a mock function with given fields: ctx, lastEthBlockSynced func (_m *l1RollupConsumerInterfaceMock) Start(ctx context.Context, lastEthBlockSynced *state.Block) error { ret := _m.Called(ctx, lastEthBlockSynced) From 95e78a4f2c6effa9fc521fe4acdd0059dc436eaa Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Wed, 18 Oct 2023 07:33:07 +0200 Subject: [PATCH 5/5] + fix unittest --- synchronizer/l1_rollup_info_consumer_test.go | 2 +- synchronizer/l1_sync_orchestration_test.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/synchronizer/l1_rollup_info_consumer_test.go b/synchronizer/l1_rollup_info_consumer_test.go index fa02f85561..80a4912af8 100644 --- a/synchronizer/l1_rollup_info_consumer_test.go +++ b/synchronizer/l1_rollup_info_consumer_test.go @@ -51,7 +51,7 @@ func TestGivenConsumerWhenFailsToProcessRollupThenDontKnownLastEthBlock(t *testi }, blocks: []etherman.Block{}, order: map[common.Hash][]etherman.Order{}, - lastBlockOfRange: nil, + lastBlockOfRange: types.NewBlock(&types.Header{Number: big.NewInt(123)}, nil, nil, nil, nil), } data.syncMock. On("processBlockRange", mock.Anything, mock.Anything). diff --git a/synchronizer/l1_sync_orchestration_test.go b/synchronizer/l1_sync_orchestration_test.go index 03a0ad35b4..28f03802ff 100644 --- a/synchronizer/l1_sync_orchestration_test.go +++ b/synchronizer/l1_sync_orchestration_test.go @@ -25,6 +25,7 @@ func TestGivenOrquestrationWhenHappyPathThenReturnsBlockAndNoErrorAndProducerIsR return nil }) block := state.Block{} + mocks.consumer.On("Reset", mock.Anything).Return() mocks.consumer.On("GetLastEthBlockSynced").Return(block, true) mocks.consumer.On("Start", mock.Anything, mock.Anything).Return(func(context.Context, *state.Block) error { time.Sleep(time.Millisecond * 100)