diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index 580fa48..db77148 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -172,9 +172,14 @@ mod test { block_number += 1; } + // Fetch the highest block for the batch hash and verify number. + let highest_block_info = + db.get_highest_block_for_batch_hash(batch_info.hash).await.unwrap().unwrap(); + assert_eq!(highest_block_info.number, block_number - 1); + // Fetch the highest block for the batch and verify number. let highest_block_info = - db.get_highest_block_for_batch(batch_info.hash).await.unwrap().unwrap(); + db.get_highest_block_for_batch_index(batch_info.index).await.unwrap().unwrap(); assert_eq!(highest_block_info.number, block_number - 1); } @@ -211,9 +216,14 @@ mod test { block_number += 1; } - // Fetch the highest block for the batch and verify number. + // Fetch the highest block for the batch hash and verify number. let highest_block_info = - db.get_highest_block_for_batch(second_batch_info.hash).await.unwrap().unwrap(); + db.get_highest_block_for_batch_hash(second_batch_info.hash).await.unwrap().unwrap(); + assert_eq!(highest_block_info.number, block_number - 1); + + // Fetch the highest block for the batch index and verify number. + let highest_block_info = + db.get_highest_block_for_batch_index(second_batch_info.index).await.unwrap().unwrap(); assert_eq!(highest_block_info.number, block_number - 1); } @@ -480,7 +490,7 @@ mod test { } #[tokio::test] - async fn test_delete_l2_blocks_gt() { + async fn test_delete_l2_blocks_gt_block_number() { // Set up the test database. let db = setup_test_db().await; @@ -499,7 +509,7 @@ mod test { } // Delete blocks with number > 405 - let deleted_count = db.delete_l2_blocks_gt(405).await.unwrap(); + let deleted_count = db.delete_l2_blocks_gt_block_number(405).await.unwrap(); assert_eq!(deleted_count, 4); // Blocks 406, 407, 408, 409 // Verify remaining blocks still exist @@ -515,6 +525,68 @@ mod test { } } + #[tokio::test] + async fn test_delete_l2_blocks_gt_batch_index() { + // Set up the test database. + let db = setup_test_db().await; + + // Generate unstructured bytes. + let mut bytes = [0u8; 1024]; + rand::rng().fill(bytes.as_mut_slice()); + let mut u = Unstructured::new(&bytes); + + // Insert multiple batches + for i in 100..110 { + let batch_data = BatchCommitData { + index: i, + calldata: Arc::new(vec![].into()), + ..Arbitrary::arbitrary(&mut u).unwrap() + }; + db.insert_batch(batch_data).await.unwrap(); + } + + // Insert L2 blocks with different batch indices + for i in 100..110 { + let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap(); + let batch_info: BatchInfo = batch_data.into(); + + let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() }; + let l2_block = L2BlockInfoWithL1Messages { block_info, l1_messages: vec![] }; + + db.insert_block(l2_block, Some(batch_info)).await.unwrap(); + } + + // Insert some blocks without batch index (should not be deleted) + for i in 0..3 { + let block_info = BlockInfo { number: 600 + i, hash: B256::arbitrary(&mut u).unwrap() }; + let l2_block = L2BlockInfoWithL1Messages { block_info, l1_messages: vec![] }; + + db.insert_block(l2_block, None).await.unwrap(); + } + + // Delete L2 blocks with batch index > 105 + let deleted_count = db.delete_l2_blocks_gt_batch_index(105).await.unwrap(); + assert_eq!(deleted_count, 4); // Blocks with batch indices 106, 107, 108, 109 + + // Verify remaining blocks with batch index <= 105 still exist + for i in 100..=105 { + let block = db.get_l2_block_info_by_number(500 + i).await.unwrap(); + assert!(block.is_some()); + } + + // Verify deleted blocks with batch index > 105 no longer exist + for i in 106..110 { + let block = db.get_l2_block_info_by_number(500 + i).await.unwrap(); + assert!(block.is_none()); + } + + // Verify blocks without batch index are still there (not affected by batch index filter) + for i in 0..3 { + let block = db.get_l2_block_info_by_number(600 + i).await.unwrap(); + assert!(block.is_some()); + } + } + #[tokio::test] async fn test_insert_block_with_l1_messages() { // Set up the test database. diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 854b194..65fb9a5 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -1,5 +1,6 @@ use super::{models, DatabaseError}; use crate::DatabaseConnectionProvider; + use alloy_primitives::B256; use futures::{Stream, StreamExt}; use rollup_node_primitives::{ @@ -26,6 +27,8 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { models::batch_commit::Column::Hash, models::batch_commit::Column::BlockNumber, models::batch_commit::Column::BlockTimestamp, + models::batch_commit::Column::Calldata, + models::batch_commit::Column::BlobHash, models::batch_commit::Column::FinalizedBlockNumber, ]) .to_owned(), @@ -132,7 +135,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { } /// Delete all [`BatchCommitData`]s with a block number greater than the provided block number. - async fn delete_batches_gt(&self, block_number: u64) -> Result { + async fn delete_batches_gt_block_number( + &self, + block_number: u64, + ) -> Result { tracing::trace!(target: "scroll::db", block_number, "Deleting batch inputs greater than block number."); Ok(models::batch_commit::Entity::delete_many() .filter(models::batch_commit::Column::BlockNumber.gt(block_number as i64)) @@ -141,6 +147,16 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .map(|x| x.rows_affected)?) } + /// Delete all [`BatchCommitData`]s with a batch index greater than the provided index. + async fn delete_batches_gt_batch_index(&self, batch_index: u64) -> Result { + tracing::trace!(target: "scroll::db", batch_index, "Deleting batch inputs greater than batch index."); + Ok(models::batch_commit::Entity::delete_many() + .filter(models::batch_commit::Column::Index.gt(batch_index as i64)) + .exec(self.get_connection()) + .await + .map(|x| x.rows_affected)?) + } + /// Get an iterator over all [`BatchCommitData`]s in the database. async fn get_batches<'a>( &'a self, @@ -327,7 +343,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .get_batch_by_index(batch_info.index - 1) .await? .expect("Batch info must be present due to database query arguments"); - let l2_block = self.get_highest_block_for_batch(previous_batch.hash).await?; + let l2_block = self.get_highest_block_for_batch_hash(previous_batch.hash).await?; (l2_block, Some(batch.block_number)) } else { (None, None) @@ -337,7 +353,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { } /// Delete all L2 blocks with a block number greater than the provided block number. - async fn delete_l2_blocks_gt(&self, block_number: u64) -> Result { + async fn delete_l2_blocks_gt_block_number( + &self, + block_number: u64, + ) -> Result { tracing::trace!(target: "scroll::db", block_number, "Deleting L2 blocks greater than provided block number."); Ok(models::l2_block::Entity::delete_many() .filter(models::l2_block::Column::BlockNumber.gt(block_number as i64)) @@ -346,6 +365,23 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .map(|x| x.rows_affected)?) } + /// Delete all L2 blocks with a batch index greater than the batch index. + async fn delete_l2_blocks_gt_batch_index( + &self, + batch_index: u64, + ) -> Result { + tracing::trace!(target: "scroll::db", batch_index, "Deleting L2 blocks greater than provided batch index."); + Ok(models::l2_block::Entity::delete_many() + .filter( + Condition::all() + .add(models::l2_block::Column::BatchIndex.is_not_null()) + .add(models::l2_block::Column::BatchIndex.gt(batch_index as i64)), + ) + .exec(self.get_connection()) + .await + .map(|x| x.rows_affected)?) + } + /// Insert a new block in the database. async fn insert_block( &self, @@ -365,6 +401,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .on_conflict( OnConflict::column(models::l2_block::Column::BlockNumber) .update_columns([ + models::l2_block::Column::BlockHash, models::l2_block::Column::BatchHash, models::l2_block::Column::BatchIndex, ]) @@ -396,7 +433,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { /// Returns the highest L2 block originating from the provided `batch_hash` or the highest block /// for the batch's index. - async fn get_highest_block_for_batch( + async fn get_highest_block_for_batch_hash( &self, batch_hash: B256, ) -> Result, DatabaseError> { @@ -420,6 +457,20 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { } } + /// Returns the highest L2 block originating from the provided `batch_index` or the highest + /// block for the batch's index. + async fn get_highest_block_for_batch_index( + &self, + batch_index: u64, + ) -> Result, DatabaseError> { + Ok(models::l2_block::Entity::find() + .filter(models::l2_block::Column::BatchIndex.lte(batch_index)) + .order_by_desc(models::l2_block::Column::BlockNumber) + .one(self.get_connection()) + .await? + .map(|model| model.block_info())) + } + /// Unwinds the indexer by deleting all indexed data greater than the provided L1 block number. async fn unwind( &self, @@ -427,7 +478,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { l1_block_number: u64, ) -> Result { // delete batch inputs and l1 messages - let batches_removed = self.delete_batches_gt(l1_block_number).await?; + let batches_removed = self.delete_batches_gt_block_number(l1_block_number).await?; let deleted_messages = self.delete_l1_messages_gt(l1_block_number).await?; // filter and sort the executed L1 messages @@ -441,10 +492,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { if let Some(msg) = removed_executed_l1_messages.first() { let l2_reorg_block_number = msg .l2_block_number - .expect("we guarantee that this is Some(u64) due to the filter above") - - 1; + .expect("we guarantee that this is Some(u64) due to the filter above") + .saturating_sub(1); let l2_block_info = self.get_l2_block_info_by_number(l2_reorg_block_number).await?; - self.delete_l2_blocks_gt(l2_reorg_block_number).await?; + self.delete_l2_blocks_gt_block_number(l2_reorg_block_number).await?; (Some(msg.transaction.queue_index), l2_block_info) } else { (None, None) diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 046b258..8ec6424 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -123,6 +123,13 @@ where } None } + + /// Flushes all the data in the pipeline. + pub fn flush(&mut self) { + self.attributes_queue.clear(); + self.batch_queue.clear(); + self.pipeline_futures = FuturesOrdered::new(); + } } impl

Stream for DerivationPipeline

diff --git a/crates/engine/src/driver.rs b/crates/engine/src/driver.rs index 857ed31..101ef32 100644 --- a/crates/engine/src/driver.rs +++ b/crates/engine/src/driver.rs @@ -108,6 +108,19 @@ where self.block_building_duration = block_building_duration; } + /// Clear the l1 attributes queue. + pub fn clear_l1_payload_attributes(&mut self) { + // clear the L1 attributes queue. + self.l1_payload_attributes.clear(); + + // drop the engine future if it is a L1 consolidation. + if let Some(MeteredFuture { fut: EngineFuture::L1Consolidation(_), .. }) = + self.engine_future + { + self.engine_future.take(); + } + } + /// Handles a block import request by adding it to the queue and waking up the driver. pub fn handle_block_import(&mut self, block_with_peer: NewBlockWithPeer) { tracing::trace!(target: "scroll::engine", ?block_with_peer, "new block import request received"); diff --git a/crates/indexer/src/event.rs b/crates/indexer/src/event.rs index 008996a..5b9b705 100644 --- a/crates/indexer/src/event.rs +++ b/crates/indexer/src/event.rs @@ -4,8 +4,14 @@ use rollup_node_primitives::{BatchInfo, BlockInfo, L2BlockInfoWithL1Messages}; /// An event emitted by the indexer. #[derive(Debug, Clone, PartialEq, Eq)] pub enum IndexerEvent { - /// A `BatchCommit` event has been indexed returning the batch info. - BatchCommitIndexed(BatchInfo), + /// A `BatchCommit` event has been indexed returning the batch info and the L2 block info to + /// revert to due to a batch revert. + BatchCommitIndexed { + /// The batch info. + batch_info: BatchInfo, + /// The safe L2 block info. + safe_head: Option, + }, /// A `BatchFinalization` event has been indexed returning the batch hash and new finalized L2 /// block. BatchFinalizationIndexed(B256, Option), diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs index 0827fe8..afadeb1 100644 --- a/crates/indexer/src/lib.rs +++ b/crates/indexer/src/lib.rs @@ -249,8 +249,29 @@ impl Indexer< database: Arc, batch: BatchCommitData, ) -> Result { - let event = IndexerEvent::BatchCommitIndexed(BatchInfo::new(batch.index, batch.hash)); - database.insert_batch(batch).await?; + let txn = database.tx().await?; + let prev_batch_index = batch.index - 1; + + // remove any batches with an index greater than the previous batch. + let affected = txn.delete_batches_gt_batch_index(prev_batch_index).await?; + + // handle the case of a batch revert. + let new_safe_head = if affected > 0 { + txn.delete_l2_blocks_gt_batch_index(prev_batch_index).await?; + txn.get_highest_block_for_batch_index(prev_batch_index).await? + } else { + None + }; + + let event = IndexerEvent::BatchCommitIndexed { + batch_info: BatchInfo::new(batch.index, batch.hash), + safe_head: new_safe_head, + }; + + // insert the batch and commit the transaction. + txn.insert_batch(batch).await?; + txn.commit().await?; + Ok(event) } @@ -285,7 +306,7 @@ impl Indexer< batch_hash: B256, l2_block_number: Arc, ) -> Result, IndexerError> { - let finalized_block = database.get_highest_block_for_batch(batch_hash).await?; + let finalized_block = database.get_highest_block_for_batch_hash(batch_hash).await?; // only return the block if the indexer hasn't seen it. // in which case also update the `l2_finalized_block_number` value. @@ -301,63 +322,6 @@ impl Indexer< } } -/// Unwinds the indexer by deleting all indexed data greater than the provided L1 block number. -pub async fn unwind( - database: Arc, - chain_spec: Arc, - l1_block_number: u64, -) -> Result { - // create a database transaction so this operation is atomic - let txn = database.tx().await?; - - // delete batch inputs and l1 messages - let batches_removed = txn.delete_batches_gt(l1_block_number).await?; - let deleted_messages = txn.delete_l1_messages_gt(l1_block_number).await?; - - // filter and sort the executed L1 messages - let mut removed_executed_l1_messages: Vec<_> = - deleted_messages.into_iter().filter(|x| x.l2_block_number.is_some()).collect(); - removed_executed_l1_messages - .sort_by(|a, b| a.transaction.queue_index.cmp(&b.transaction.queue_index)); - - // check if we need to reorg the L2 head and delete some L2 blocks - let (queue_index, l2_head_block_info) = if let Some(msg) = removed_executed_l1_messages.first() - { - let l2_reorg_block_number = msg - .l2_block_number - .expect("we guarantee that this is Some(u64) due to the filter on line 130") - - 1; - let l2_block_info = txn - .get_l2_block_info_by_number(l2_reorg_block_number) - .await? - .ok_or(IndexerError::L2BlockNotFound(l2_reorg_block_number))?; - txn.delete_l2_blocks_gt(l2_reorg_block_number).await?; - (Some(msg.transaction.queue_index), Some(l2_block_info)) - } else { - (None, None) - }; - - // check if we need to reorg the L2 safe block - let l2_safe_block_info = if batches_removed > 0 { - if let Some(x) = txn.get_latest_safe_l2_info().await? { - Some(x.0) - } else { - Some(BlockInfo::new(0, chain_spec.genesis_hash())) - } - } else { - None - }; - - // commit the transaction - txn.commit().await?; - Ok(IndexerEvent::UnwindIndexed { - l1_block_number, - queue_index, - l2_head_block_info, - l2_safe_block_info, - }) -} - impl Stream for Indexer { type Item = Result; @@ -409,13 +373,143 @@ mod test { let batch_commit = BatchCommitData::arbitrary(&mut u).unwrap(); indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit.clone())); - let _ = indexer.next().await; + let event = indexer.next().await.unwrap().unwrap(); - let batch_commit_result = db.get_batch_by_index(batch_commit.index).await.unwrap().unwrap(); + // Verify the event structure + match event { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + assert_eq!(batch_info.index, batch_commit.index); + assert_eq!(batch_info.hash, batch_commit.hash); + assert_eq!(safe_head, None); // No safe head since no batch revert + } + _ => panic!("Expected BatchCommitIndexed event"), + } + let batch_commit_result = db.get_batch_by_index(batch_commit.index).await.unwrap().unwrap(); assert_eq!(batch_commit, batch_commit_result); } + #[tokio::test] + async fn test_handle_batch_commit_with_revert() { + // Instantiate indexer and db + let (mut indexer, db) = setup_test_indexer().await; + + // Generate unstructured bytes. + let mut bytes = [0u8; 1024]; + rand::rng().fill(bytes.as_mut_slice()); + let mut u = Unstructured::new(&bytes); + + // Create sequential batches + let batch_1 = BatchCommitData { + index: 100, + calldata: Arc::new(vec![].into()), + ..Arbitrary::arbitrary(&mut u).unwrap() + }; + let batch_2 = BatchCommitData { + index: 101, + calldata: Arc::new(vec![].into()), + ..Arbitrary::arbitrary(&mut u).unwrap() + }; + let batch_3 = BatchCommitData { + index: 102, + calldata: Arc::new(vec![].into()), + ..Arbitrary::arbitrary(&mut u).unwrap() + }; + + // Index first batch + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_1.clone())); + let event = indexer.next().await.unwrap().unwrap(); + match event { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + assert_eq!(batch_info.index, 100); + assert_eq!(safe_head, None); + } + _ => panic!("Expected BatchCommitIndexed event"), + } + + // Index second batch + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_2.clone())); + let event = indexer.next().await.unwrap().unwrap(); + match event { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + assert_eq!(batch_info.index, 101); + assert_eq!(safe_head, None); + } + _ => panic!("Expected BatchCommitIndexed event"), + } + + // Index third batch + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_3.clone())); + let event = indexer.next().await.unwrap().unwrap(); + match event { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + assert_eq!(batch_info.index, 102); + assert_eq!(safe_head, None); + } + _ => panic!("Expected BatchCommitIndexed event"), + } + + // Add some L2 blocks for the batches + let batch_1_info = BatchInfo::new(batch_1.index, batch_1.hash); + let batch_2_info = BatchInfo::new(batch_2.index, batch_2.hash); + + let block_1 = L2BlockInfoWithL1Messages { + block_info: BlockInfo { number: 500, hash: Arbitrary::arbitrary(&mut u).unwrap() }, + l1_messages: vec![], + }; + let block_2 = L2BlockInfoWithL1Messages { + block_info: BlockInfo { number: 501, hash: Arbitrary::arbitrary(&mut u).unwrap() }, + l1_messages: vec![], + }; + let block_3 = L2BlockInfoWithL1Messages { + block_info: BlockInfo { number: 502, hash: Arbitrary::arbitrary(&mut u).unwrap() }, + l1_messages: vec![], + }; + + indexer.handle_block(block_1.clone(), Some(batch_1_info)); + indexer.next().await.unwrap().unwrap(); + + indexer.handle_block(block_2.clone(), Some(batch_2_info)); + indexer.next().await.unwrap().unwrap(); + + indexer.handle_block(block_3.clone(), Some(batch_2_info)); + indexer.next().await.unwrap().unwrap(); + + // Now simulate a batch revert by submitting a new batch with index 101 + // This should delete batch 102 and any blocks associated with it + let new_batch_2 = BatchCommitData { + index: 101, + calldata: Arc::new(vec![1, 2, 3].into()), // Different data + ..Arbitrary::arbitrary(&mut u).unwrap() + }; + + indexer.handle_l1_notification(L1Notification::BatchCommit(new_batch_2.clone())); + let event = indexer.next().await.unwrap().unwrap(); + + // Verify the event indicates a batch revert + match event { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + assert_eq!(batch_info.index, 101); + assert_eq!(batch_info.hash, new_batch_2.hash); + // Safe head should be the highest block from batch index <= 100 + assert_eq!(safe_head, Some(block_1.block_info)); + } + _ => panic!("Expected BatchCommitIndexed event"), + } + + // Verify batch 102 was deleted + let batch_102 = db.get_batch_by_index(102).await.unwrap(); + assert!(batch_102.is_none()); + + // Verify batch 101 was replaced with new data + let updated_batch_101 = db.get_batch_by_index(101).await.unwrap().unwrap(); + assert_eq!(updated_batch_101, new_batch_2); + + // Verify batch 100 still exists + let batch_100 = db.get_batch_by_index(100).await.unwrap(); + assert!(batch_100.is_some()); + } + #[tokio::test] async fn test_handle_l1_message() { // Instantiate indexer and db @@ -498,14 +592,17 @@ mod test { // Generate a 3 random batch inputs and set their block numbers let mut batch_commit_block_1 = BatchCommitData::arbitrary(&mut u).unwrap(); batch_commit_block_1.block_number = 1; + batch_commit_block_1.index = 1; let batch_commit_block_1 = batch_commit_block_1; let mut batch_commit_block_20 = BatchCommitData::arbitrary(&mut u).unwrap(); batch_commit_block_20.block_number = 20; + batch_commit_block_20.index = 20; let batch_commit_block_20 = batch_commit_block_20; let mut batch_commit_block_30 = BatchCommitData::arbitrary(&mut u).unwrap(); batch_commit_block_30.block_number = 30; + batch_commit_block_30.index = 30; let batch_commit_block_30 = batch_commit_block_30; // Index batch inputs @@ -585,19 +682,22 @@ mod test { // Generate a 3 random batch inputs and set their block numbers let batch_commit_block_1 = - BatchCommitData { block_number: 5, ..Arbitrary::arbitrary(&mut u).unwrap() }; - let batch_commit_block_20 = - BatchCommitData { block_number: 10, ..Arbitrary::arbitrary(&mut u).unwrap() }; + BatchCommitData { block_number: 5, index: 5, ..Arbitrary::arbitrary(&mut u).unwrap() }; + let batch_commit_block_10 = BatchCommitData { + block_number: 10, + index: 10, + ..Arbitrary::arbitrary(&mut u).unwrap() + }; // Index batch inputs indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit_block_1.clone())); - indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit_block_20.clone())); + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit_block_10.clone())); for _ in 0..2 { let _event = indexer.next().await.unwrap().unwrap(); } let batch_1 = BatchInfo::new(batch_commit_block_1.index, batch_commit_block_1.hash); - let batch_20 = BatchInfo::new(batch_commit_block_20.index, batch_commit_block_20.hash); + let batch_10 = BatchInfo::new(batch_commit_block_10.index, batch_commit_block_10.hash); const UNITS_FOR_TESTING: u64 = 20; const L1_MESSAGES_NOT_EXECUTED_COUNT: u64 = 7; @@ -637,7 +737,7 @@ mod test { let batch_info = if block_number < 5 { Some(batch_1) } else if block_number < 10 { - Some(batch_20) + Some(batch_10) } else { None }; @@ -660,7 +760,7 @@ mod test { } ); - // Reorg at block 17 which is one of the messages that has not been executed yet. No reorg + // Reorg at block 7 which is one of the messages that has not been executed yet. No reorg // but we should ensure the L1 messages have been deleted. indexer.handle_l1_notification(L1Notification::Reorg(7)); let event = indexer.next().await.unwrap().unwrap(); diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index e9051ce..9d3d51a 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -237,7 +237,16 @@ where fn handle_indexer_event(&mut self, event: IndexerEvent) { trace!(target: "scroll::node::manager", ?event, "Received indexer event"); match event { - IndexerEvent::BatchCommitIndexed(batch_info) => { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + // if we detected a batch revert event, we reset the pipeline and the engine driver. + if let Some(new_safe_head) = safe_head { + if let Some(pipeline) = self.derivation_pipeline.as_mut() { + pipeline.flush() + } + self.engine.clear_l1_payload_attributes(); + self.engine.set_head_block_info(new_safe_head); + self.engine.set_safe_block_info(new_safe_head); + } // push the batch info into the derivation pipeline. if let Some(pipeline) = &mut self.derivation_pipeline { pipeline.handle_batch_commit(batch_info); diff --git a/crates/primitives/src/metrics.rs b/crates/primitives/src/metrics.rs index 487442b..205eeff 100644 --- a/crates/primitives/src/metrics.rs +++ b/crates/primitives/src/metrics.rs @@ -9,7 +9,8 @@ use std::time::Instant; /// A metered future that records the start of the polling. pub struct MeteredFuture { - fut: F, + /// The future being metered. + pub fut: F, started_at: Instant, } diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index b3fbaf5..b84531d 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -22,6 +22,7 @@ use rollup_node_providers::SystemContractProvider; use scroll_alloy_consensus::TxL1Message; use scroll_l1::abi::logs::{try_decode_log, CommitBatch, FinalizeBatch, QueueTransaction}; use std::{ + cmp::Ordering, fmt::{Debug, Display, Formatter}, sync::Arc, time::Duration, @@ -420,9 +421,14 @@ where // prepare notifications let mut notifications = Vec::with_capacity(commit_logs_with_tx.len()); - // sort the commits and group by tx hash. - commit_logs_with_tx - .sort_by(|(_, data_a, _), (_, data_b, _)| data_a.batch_index.cmp(&data_b.batch_index)); + // sort the commits by block number then batch index, then group by tx hash. + commit_logs_with_tx.sort_by(|(log_a, data_a, _), (log_b, data_b, _)| { + log_a + .block_number + .and_then(|a| log_b.block_number.map(|b| a.cmp(&b))) + .unwrap_or(Ordering::Equal) + .then_with(|| data_a.batch_index.cmp(&data_b.batch_index)) + }); let groups = commit_logs_with_tx.into_iter().chunk_by(|(_, _, hash)| *hash); let groups: Vec<_> = groups.into_iter().map(|(hash, group)| (hash, group.collect::>())).collect();