From 9899a562fb9293b31c9e16bb9c278273a3e9a888 Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Thu, 3 Jul 2025 10:06:01 +0200 Subject: [PATCH 1/6] feat: update batch header struct --- crates/codec/src/decoding/v0/batch_header.rs | 52 ++++++++++++++----- crates/codec/src/decoding/v1/batch_header.rs | 19 +++---- .../calldata_v0_with_skipped_l1_messages.bin | 1 + .../calldata_v0_with_skipped_l1_messages.bin | 1 + 4 files changed, 48 insertions(+), 25 deletions(-) create mode 100644 crates/codec/testdata/calldata_v0_with_skipped_l1_messages.bin create mode 100644 crates/derivation-pipeline/testdata/calldata_v0_with_skipped_l1_messages.bin diff --git a/crates/codec/src/decoding/v0/batch_header.rs b/crates/codec/src/decoding/v0/batch_header.rs index 93ef1030..8bf65c61 100644 --- a/crates/codec/src/decoding/v0/batch_header.rs +++ b/crates/codec/src/decoding/v0/batch_header.rs @@ -5,7 +5,7 @@ use crate::{ use alloy_primitives::{ bytes::{Buf, BufMut}, - keccak256, B256, U256, + keccak256, B256, }; /// The batch header for V0. @@ -24,7 +24,7 @@ pub struct BatchHeaderV0 { /// The parent batch hash. pub parent_batch_hash: B256, /// A bitmap to indicate which L1 messages are skipped in the batch. - pub skipped_l1_message_bitmap: Vec, + pub skipped_l1_message_bitmap: Vec, } impl BatchHeaderV0 { @@ -38,7 +38,7 @@ impl BatchHeaderV0 { total_l1_message_popped: u64, data_hash: B256, parent_batch_hash: B256, - skipped_l1_message_bitmap: Vec, + skipped_l1_message_bitmap: Vec, ) -> Self { Self { version, @@ -69,7 +69,9 @@ impl BatchHeaderV0 { let skipped_l1_message_bitmap: Vec<_> = buf .chunks(SKIPPED_L1_MESSAGE_BITMAP_ITEM_BYTES_SIZE) - .map(|chunk| U256::from_be_slice(chunk)) + .flatten() + .rev() + .copied() .collect(); // check leftover bytes are correct. @@ -78,7 +80,7 @@ impl BatchHeaderV0 { { return Err(DecodingError::Eof) } - buf.advance(skipped_l1_message_bitmap.len() * SKIPPED_L1_MESSAGE_BITMAP_ITEM_BYTES_SIZE); + buf.advance(skipped_l1_message_bitmap.len()); Ok(Self { version, @@ -104,12 +106,7 @@ impl BatchHeaderV0 { bytes.put_slice(&self.data_hash.0); bytes.put_slice(&self.parent_batch_hash.0); - let skipped_l1_message_flat_bitmap = self - .skipped_l1_message_bitmap - .iter() - .flat_map(|u| u.to_be_bytes::<32>()) - .collect::>(); - bytes.put_slice(&skipped_l1_message_flat_bitmap); + bytes.put_slice(&self.skipped_l1_message_bitmap); keccak256(bytes) } @@ -119,7 +116,7 @@ impl BatchHeaderV0 { mod tests { use crate::decoding::{test_utils::read_to_bytes, v0::BatchHeaderV0}; - use alloy_primitives::{b256, U256}; + use alloy_primitives::b256; use alloy_sol_types::SolCall; use scroll_l1::abi::calls::commitBatchCall; @@ -139,7 +136,34 @@ mod tests { 33, b256!("2aa3eeb5adebb96a49736583c744b89b0b3be45056e8e178106a42ab2cd1a063"), b256!("c0173d7e3561501cf57913763c7c34716216092a222a99fe8b85dcb466730f56"), - vec![U256::ZERO], + vec![0; 32], + ); + assert_eq!(header, expected); + + Ok(()) + } + + #[test] + fn test_should_decode_header_with_skipped_l1_messages() -> eyre::Result<()> { + // + let raw_commit_calldata = + read_to_bytes("./testdata/calldata_v0_with_skipped_l1_messages.bin")?; + let commit_calldata = commitBatchCall::abi_decode(&raw_commit_calldata)?; + + let mut raw_batch_header = &*commit_calldata.parent_batch_header.to_vec(); + let header = BatchHeaderV0::try_from_buf(&mut raw_batch_header)?; + + let expected = BatchHeaderV0::new( + 0, + 100, + 3, + 22, + b256!("4867e8b3c751abf5f0f8cd8e3e91f78ff15011b48b981ad742cb42dfd746844c"), + b256!("b4d0a673c704d567eebcd758802ce87cf103b16acbae7c52b2807928fd8dc76e"), + vec![ + 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, + ], ); assert_eq!(header, expected); @@ -156,7 +180,7 @@ mod tests { 33, b256!("2aa3eeb5adebb96a49736583c744b89b0b3be45056e8e178106a42ab2cd1a063"), b256!("c0173d7e3561501cf57913763c7c34716216092a222a99fe8b85dcb466730f56"), - vec![U256::ZERO], + vec![0; 32], ); let expected = b256!("A7F7C528E1827D3E64E406C76DE6C750D5FC3DE3DE4386E6C69958A89461D064"); diff --git a/crates/codec/src/decoding/v1/batch_header.rs b/crates/codec/src/decoding/v1/batch_header.rs index 43d56242..23d14e40 100644 --- a/crates/codec/src/decoding/v1/batch_header.rs +++ b/crates/codec/src/decoding/v1/batch_header.rs @@ -5,7 +5,7 @@ use crate::{ use alloy_primitives::{ bytes::{Buf, BufMut}, - keccak256, B256, U256, + keccak256, B256, }; /// The batch header for V1. @@ -26,7 +26,7 @@ pub struct BatchHeaderV1 { /// The parent batch hash. pub parent_batch_hash: B256, /// A bitmap to indicate which L1 messages are skipped in the batch. - pub skipped_l1_message_bitmap: Vec, + pub skipped_l1_message_bitmap: Vec, } impl BatchHeaderV1 { @@ -42,7 +42,7 @@ impl BatchHeaderV1 { data_hash: B256, blob_versioned_hash: B256, parent_batch_hash: B256, - skipped_l1_message_bitmap: Vec, + skipped_l1_message_bitmap: Vec, ) -> Self { Self { version, @@ -75,7 +75,9 @@ impl BatchHeaderV1 { let skipped_l1_message_bitmap: Vec<_> = buf .chunks(SKIPPED_L1_MESSAGE_BITMAP_ITEM_BYTES_SIZE) - .map(|chunk| U256::from_be_slice(chunk)) + .flatten() + .rev() + .copied() .collect(); // check leftover bytes are correct. @@ -84,7 +86,7 @@ impl BatchHeaderV1 { { return Err(DecodingError::Eof) } - buf.advance(skipped_l1_message_bitmap.len() * SKIPPED_L1_MESSAGE_BITMAP_ITEM_BYTES_SIZE); + buf.advance(skipped_l1_message_bitmap.len()); Ok(Self { version, @@ -112,12 +114,7 @@ impl BatchHeaderV1 { bytes.put_slice(&self.blob_versioned_hash.0); bytes.put_slice(&self.parent_batch_hash.0); - let skipped_l1_message_flat_bitmap = self - .skipped_l1_message_bitmap - .iter() - .flat_map(|u| u.to_be_bytes::<32>()) - .collect::>(); - bytes.put_slice(&skipped_l1_message_flat_bitmap); + bytes.put_slice(&self.skipped_l1_message_bitmap); keccak256(bytes) } diff --git a/crates/codec/testdata/calldata_v0_with_skipped_l1_messages.bin b/crates/codec/testdata/calldata_v0_with_skipped_l1_messages.bin new file mode 100644 index 00000000..08f7dabf --- /dev/null +++ b/crates/codec/testdata/calldata_v0_with_skipped_l1_messages.bin @@ -0,0 +1 @@ +0x1325aca000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000012000000000000000000000000000000000000000000000000000000000000006400000000000000000000000000000000000000000000000000000000000000079000000000000000064000000000000000300000000000000164867e8b3c751abf5f0f8cd8e3e91f78ff15011b48b981ad742cb42dfd746844cb4d0a673c704d567eebcd758802ce87cf103b16acbae7c52b2807928fd8dc76e000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000000000000000500000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000001a000000000000000000000000000000000000000000000000000000000000002a000000000000000000000000000000000000000000000000000000000000003a000000000000000000000000000000000000000000000000000000000000004a000000000000000000000000000000000000000000000000000000000000000ce0100000000000003110000000064d18d75000000000000000000000000000000000000000000000000000000000000000000000000007a1200000100000000008df88b82021c830f424082ce1d94530000000000000000000000000000000000000280a4bede39b5000000000000000000000000000000000000000000000000000000000b0528c183104ec1a0efedcf8b129a97dcf5142d6b3d0da93033c0cc8815057534bbd009ea93ffa64ca00b9381c46f1869b7eb722dc505fb3c1f4d6883aacb4a046003dedebf6e0ffba200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000ce0100000000000003120000000064d18d87000000000000000000000000000000000000000000000000000000000000000000000000007a1200000100000000008df88b82021d830f424082ce1d94530000000000000000000000000000000000000280a4bede39b5000000000000000000000000000000000000000000000000000000000bbd974583104ec2a00b4b682ac0013ace0bf6ce2ff006908e83dfbd08e511ab1c631e0335b76dbd94a03d49506aed45459656e9176188dbb7524278a273c156619785c3867d5c6ae07d00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000ce0100000000000003130000000064d18efb000000000000000000000000000000000000000000000000000000000000000000000000007a1200000100000000008df88b82021e830f424082ce1d94530000000000000000000000000000000000000280a4bede39b5000000000000000000000000000000000000000000000000000000000aff8de183104ec1a082410e24156d5837934e3e9628dac3f995de69eda6789e2bbe1a082d2b063b80a01bdbcb384b9ff032cbb1f806864b6cc359e1f4b5e7eed58b966241e7819e1ac900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000ce0100000000000003140000000064d18f04000000000000000000000000000000000000000000000000000000000000000000000000007a1200000100000000008df88b82021f830f424082ce1d94530000000000000000000000000000000000000280a4bede39b500000000000000000000000000000000000000000000000000000000099bd5e283104ec1a0317f5c505edf2e836cc92e33f62e8629ca355788bdc35e618fb46dbe8c06ff3ba00bd9d475ebb7a4be4423d7d6576280757f5e9e1fe8622fce33e0f04fce445f52000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003d0100000000000003150000000064d19066000000000000000000000000000000000000000000000000000000000000000000000000007a12000001000100000000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000000 \ No newline at end of file diff --git a/crates/derivation-pipeline/testdata/calldata_v0_with_skipped_l1_messages.bin b/crates/derivation-pipeline/testdata/calldata_v0_with_skipped_l1_messages.bin new file mode 100644 index 00000000..88a8a5bf --- /dev/null +++ b/crates/derivation-pipeline/testdata/calldata_v0_with_skipped_l1_messages.bin @@ -0,0 +1 @@ +1325aca000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000004c0000000000000000000000000000000000000000000000000000000000000005900000000000000006300000000000000000000000000000013c9f32b6ea1609fb5e066423b68516b4691d037ea3268f3be7cae727f9090f69a2e5a004ecd84ee753115b3bc107ced4345ef73181be9a90633a919c853088e0b00000000000000000000000000000000000000000000000000000000000000000000000000000500000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000000000018000000000000000000000000000000000000000000000000000000000000001e00000000000000000000000000000000000000000000000000000000000000240000000000000000000000000000000000000000000000000000000000000034000000000000000000000000000000000000000000000000000000000000000af01000000000000030c0000000064d18ac7000000000000000000000000000000000000000000000000000000000000000000000000007a1200000100000000006ef86c05830f42408252089498110937b5d6c5fcb0ba99480e585d2364e9809c87b1a2bc2ec500008083104ec2a082eeca3c55154e5065161163f215deceae8aedbe830d120f6fa5faeab455c74da008ee713445b985f80623744b3562fac3e6bfebc390801016c62a087460aaa1800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003d01000000000000030d0000000064d18b53000000000000000000000000000000000000000000000000000000000000000000000000007a120000000000000000000000000000000000000000000000000000000000000000000000000000003d01000000000000030e0000000064d18b56000000000000000000000000000000000000000000000000000000000000000000000000007a12000002000200000000000000000000000000000000000000000000000000000000000000000000ce01000000000000030f0000000064d18bf8000000000000000000000000000000000000000000000000000000000000000000000000007a1200000100000000008df88b82021b830f424082ce1d94530000000000000000000000000000000000000280a4bede39b50000000000000000000000000000000000000000000000000000000009c2c96e83104ec2a046b5f6899061bd43b4a1acf276a4d5dadcf1bf72ff32503118508b5ad15a2a8ea03e1a59ab811e2c9599ba8c39cfdaabf3e818c165f3566049ea9cedfc11caf0e6000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003d0100000000000003100000000064d18d66000000000000000000000000000000000000000000000000000000000000000000000000007a12000001000100000000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000001 \ No newline at end of file From 90ed803639015a1c251737f183fffac19a2a6fec Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Thu, 3 Jul 2025 10:11:56 +0200 Subject: [PATCH 2/6] feat: add skipped l1 messages bitmap in derivation pipeline --- crates/codec/src/decoding/payload.rs | 2 + crates/codec/src/decoding/v0/mod.rs | 7 +- crates/codec/src/decoding/v1/mod.rs | 16 ++++- crates/codec/src/decoding/v2/mod.rs | 8 ++- crates/codec/src/decoding/v4/mod.rs | 8 ++- crates/codec/src/decoding/v7/mod.rs | 1 + crates/derivation-pipeline/src/lib.rs | 96 +++++++++++++++++++++++++-- crates/l1/src/abi/calls.rs | 4 +- 8 files changed, 128 insertions(+), 14 deletions(-) diff --git a/crates/codec/src/decoding/payload.rs b/crates/codec/src/decoding/payload.rs index 803382ba..8ccabfe0 100644 --- a/crates/codec/src/decoding/payload.rs +++ b/crates/codec/src/decoding/payload.rs @@ -11,6 +11,8 @@ pub struct PayloadData { pub blocks: Vec, /// Contains information about the current state of the L1 message queue. pub l1_message_queue_info: L1MessageQueueInfo, + /// Contains the skipped L1 message bitmap if present. + pub skipped_l1_message_bitmap: Option>, } /// Information about the state of the L1 message queue. diff --git a/crates/codec/src/decoding/v0/mod.rs b/crates/codec/src/decoding/v0/mod.rs index 15038774..fdb499d2 100644 --- a/crates/codec/src/decoding/v0/mod.rs +++ b/crates/codec/src/decoding/v0/mod.rs @@ -61,8 +61,11 @@ pub fn decode_v0(calldata: &[u8]) -> Result { let parent_header = BatchHeaderV0::try_from_buf(&mut (&*raw_parent_header))?; let l1_message_start_index = parent_header.total_l1_message_popped; - let payload = - PayloadData { blocks: l2_blocks, l1_message_queue_info: l1_message_start_index.into() }; + let payload = PayloadData { + blocks: l2_blocks, + l1_message_queue_info: l1_message_start_index.into(), + skipped_l1_message_bitmap: call.skipped_l1_message_bitmap(), + }; Ok(Batch::new(call.version(), Some(chunks_block_count), payload)) } diff --git a/crates/codec/src/decoding/v1/mod.rs b/crates/codec/src/decoding/v1/mod.rs index 1632aa0b..72e8112b 100644 --- a/crates/codec/src/decoding/v1/mod.rs +++ b/crates/codec/src/decoding/v1/mod.rs @@ -52,12 +52,19 @@ pub fn decode_v1(calldata: &[u8], blob: &[u8]) -> Result { // move pass chunk information. buf.advance(TRANSACTION_DATA_BLOB_INDEX_OFFSET); - decode_v1_chunk(call.version(), l1_message_start_index, chunks, buf) + decode_v1_chunk( + call.version(), + call.skipped_l1_message_bitmap(), + l1_message_start_index, + chunks, + buf, + ) } /// Decode the provided chunks and blob data into [`L2Block`]. pub(crate) fn decode_v1_chunk( version: u8, + skipped_l1_message_bitmap: Option>, l1_message_start_index: u64, chunks: Vec<&[u8]>, blob: &[u8], @@ -98,8 +105,11 @@ pub(crate) fn decode_v1_chunk( } } - let payload = - PayloadData { blocks: l2_blocks, l1_message_queue_info: l1_message_start_index.into() }; + let payload = PayloadData { + blocks: l2_blocks, + l1_message_queue_info: l1_message_start_index.into(), + skipped_l1_message_bitmap, + }; Ok(Batch::new(version, Some(chunks_block_count), payload)) } diff --git a/crates/codec/src/decoding/v2/mod.rs b/crates/codec/src/decoding/v2/mod.rs index 6df74dae..45aba796 100644 --- a/crates/codec/src/decoding/v2/mod.rs +++ b/crates/codec/src/decoding/v2/mod.rs @@ -50,7 +50,13 @@ pub fn decode_v2(calldata: &[u8], blob: &[u8]) -> Result { // clone buf and move pass chunk information. buf.advance(TRANSACTION_DATA_BLOB_INDEX_OFFSET); - decode_v1_chunk(call.version(), l1_message_start_index, chunks, buf) + decode_v1_chunk( + call.version(), + call.skipped_l1_message_bitmap(), + l1_message_start_index, + chunks, + buf, + ) } #[cfg(test)] diff --git a/crates/codec/src/decoding/v4/mod.rs b/crates/codec/src/decoding/v4/mod.rs index a7514b3f..f7eb6b01 100644 --- a/crates/codec/src/decoding/v4/mod.rs +++ b/crates/codec/src/decoding/v4/mod.rs @@ -46,7 +46,13 @@ pub fn decode_v4(calldata: &[u8], blob: &[u8]) -> Result { // clone buf and move pass chunk information. buf.advance(super::v2::TRANSACTION_DATA_BLOB_INDEX_OFFSET); - decode_v1_chunk(call.version(), l1_message_start_index, chunks, buf) + decode_v1_chunk( + call.version(), + call.skipped_l1_message_bitmap(), + l1_message_start_index, + chunks, + buf, + ) } #[cfg(test)] diff --git a/crates/codec/src/decoding/v7/mod.rs b/crates/codec/src/decoding/v7/mod.rs index ac693e7d..bde59be4 100644 --- a/crates/codec/src/decoding/v7/mod.rs +++ b/crates/codec/src/decoding/v7/mod.rs @@ -97,6 +97,7 @@ pub(crate) fn decode_v7_payload(blob: &[u8]) -> Result { let payload = PayloadData { blocks: l2_blocks, l1_message_queue_info: (prev_message_queue_hash, post_message_queue_hash).into(), + skipped_l1_message_bitmap: None, }; Ok(Batch::new(7, None, payload)) diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index f56e629e..acd335e9 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -42,7 +42,7 @@ type DerivationPipelineFuture = Pin< >; /// Limit the amount of pipeline futures allowed to be polled concurrently. -const MAX_CONCURRENT_DERIVATION_PIPELINE_FUTS: usize = 20; +const MAX_CONCURRENT_DERIVATION_PIPELINE_FUTS: usize = 100; /// A structure holding the current unresolved futures for the derivation pipeline. #[derive(Debug)] @@ -203,12 +203,20 @@ pub async fn derive eyre::Result<()> { + // https://sepolia.etherscan.io/tx/0xe9d7a634a2afd8adee5deab180c30d261e05fea499ccbfd5c987436fe587850e + let raw_calldata = read_to_bytes("./testdata/calldata_v0_with_skipped_l1_messages.bin")?; + let batch_data = BatchCommitData { + hash: b256!("1e86131f4204278feb116e3043916c6bd598b1b092b550e236edb2e4a398730a"), + index: 100, + block_number: 4045729, + block_timestamp: 1691454067, + calldata: Arc::new(raw_calldata), + blob_versioned_hash: None, + finalized_block_number: None, + }; + + let l1_messages = vec![ + L1MessageEnvelope { + l1_block_number: 5, + l2_block_number: None, + queue_hash: None, + transaction: TxL1Message { + queue_index: 19, + gas_limit: 1000000, + to: address!("bA50F5340fb9f3bD074Bd638C9be13Ecb36e603D"), + value: U256::ZERO, + sender: address!("61d8d3E7F7c656493d1d76aAA1a836CEdfCBc27b"), + input: bytes!("8ef1332e0000000000000000000000008a54a2347da2562917304141ab67324615e9866d00000000000000000000000091e8addfe1358aca5314c644312d38237fc1101c000000000000000000000000000000000000000000000000016345785d8a0000000000000000000000000000000000000000000000000000000000000000001400000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e874800000000000000000000000098110937b5d6c5fcb0ba99480e585d2364e9809c00000000000000000000000098110937b5d6c5fcb0ba99480e585d2364e9809c000000000000000000000000000000000000000000000000016345785d8a00000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), + }, + }, + L1MessageEnvelope { + l1_block_number: 5, + l2_block_number: None, + queue_hash: None, + transaction: TxL1Message { + queue_index: 20, + gas_limit: 400000, + to: address!("bA50F5340fb9f3bD074Bd638C9be13Ecb36e603D"), + value: U256::ZERO, + sender: address!("61d8d3E7F7c656493d1d76aAA1a836CEdfCBc27b"), + input: bytes!("8ef1332e0000000000000000000000008a54a2347da2562917304141ab67324615e9866d00000000000000000000000091e8addfe1358aca5314c644312d38237fc1101c000000000000000000000000000000000000000000000000016345785d8a0000000000000000000000000000000000000000000000000000000000000000001400000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e874800000000000000000000000098110937b5d6c5fcb0ba99480e585d2364e9809c00000000000000000000000098110937b5d6c5fcb0ba99480e585d2364e9809c000000000000000000000000000000000000000000000000016345785d8a00000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), + }, + }, + L1MessageEnvelope { + l1_block_number: 10, + l2_block_number: None, + queue_hash: None, + transaction: TxL1Message { + queue_index: 21, + gas_limit: 400000, + to: address!("bA50F5340fb9f3bD074Bd638C9be13Ecb36e603D"), + value: U256::ZERO, + sender: address!("61d8d3E7F7c656493d1d76aAA1a836CEdfCBc27b"), + input: bytes!("8ef1332e0000000000000000000000008a54a2347da2562917304141ab67324615e9866d00000000000000000000000091e8addfe1358aca5314c644312d38237fc1101c0000000000000000000000000000000000000000000000004563918244f40000000000000000000000000000000000000000000000000000000000000000001500000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e87480000000000000000000000004721cf824b6750b58d781fd1336d92a082704c7a0000000000000000000000004721cf824b6750b58d781fd1336d92a082704c7a0000000000000000000000000000000000000000000000004563918244f400000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), + }, + }, + ]; + let l1_provider = + MockL1MessageProvider { messages: Arc::new(l1_messages.clone()), index: 0.into() }; + let l2_provider = MockL2Provider; + + let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?; + let derived_l1_messages: Vec<_> = attributes + .into_iter() + .filter_map(|a| a.transactions) + .flatten() + .filter_map(|rlp| { + let buf = &mut rlp.as_ref(); + TxL1Message::decode_2718(buf).ok() + }) + .collect(); + + let expected_l1_messages: Vec<_> = + l1_messages[1..].iter().map(|msg| msg.transaction.clone()).collect(); + assert_eq!(expected_l1_messages, derived_l1_messages); + Ok(()) + } } diff --git a/crates/l1/src/abi/calls.rs b/crates/l1/src/abi/calls.rs index 377c621b..285574f7 100644 --- a/crates/l1/src/abi/calls.rs +++ b/crates/l1/src/abi/calls.rs @@ -94,6 +94,8 @@ impl CommitBatchCall { Self::CommitBatchWithBlobProof(b) => &b.skipped_l1_message_bitmap, Self::CommitBatches(_) => return None, }; - Some(bitmap.to_vec()) + let mut bitmap = bitmap.to_vec(); + bitmap.reverse(); + Some(bitmap) } } From c6397cf7464d737b1f46570b08733ab2e08fcbc1 Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Thu, 3 Jul 2025 10:28:51 +0200 Subject: [PATCH 3/6] feat: comments --- crates/derivation-pipeline/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index acd335e9..046b2589 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -543,6 +543,7 @@ mod tests { finalized_block_number: None, }; + // prepare the l1 messages. let l1_messages = vec![ L1MessageEnvelope { l1_block_number: 5, @@ -588,6 +589,7 @@ mod tests { MockL1MessageProvider { messages: Arc::new(l1_messages.clone()), index: 0.into() }; let l2_provider = MockL2Provider; + // derive attributes and extract l1 messages. let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?; let derived_l1_messages: Vec<_> = attributes .into_iter() @@ -599,6 +601,7 @@ mod tests { }) .collect(); + // the first L1 message should be skipped. let expected_l1_messages: Vec<_> = l1_messages[1..].iter().map(|msg| msg.transaction.clone()).collect(); assert_eq!(expected_l1_messages, derived_l1_messages); From de07499a5af26bf4016a9556dfd90c77eaab4312 Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Thu, 3 Jul 2025 13:52:34 +0200 Subject: [PATCH 4/6] feat: handle batch revert --- crates/database/db/src/operations.rs | 67 ++++++++++++++++++--- crates/derivation-pipeline/src/lib.rs | 7 +++ crates/engine/src/driver.rs | 13 +++++ crates/indexer/src/event.rs | 10 +++- crates/indexer/src/lib.rs | 84 ++++++++------------------- crates/manager/src/manager/mod.rs | 11 +++- crates/primitives/src/metrics.rs | 3 +- 7 files changed, 123 insertions(+), 72 deletions(-) diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 854b1942..65fb9a55 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -1,5 +1,6 @@ use super::{models, DatabaseError}; use crate::DatabaseConnectionProvider; + use alloy_primitives::B256; use futures::{Stream, StreamExt}; use rollup_node_primitives::{ @@ -26,6 +27,8 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { models::batch_commit::Column::Hash, models::batch_commit::Column::BlockNumber, models::batch_commit::Column::BlockTimestamp, + models::batch_commit::Column::Calldata, + models::batch_commit::Column::BlobHash, models::batch_commit::Column::FinalizedBlockNumber, ]) .to_owned(), @@ -132,7 +135,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { } /// Delete all [`BatchCommitData`]s with a block number greater than the provided block number. - async fn delete_batches_gt(&self, block_number: u64) -> Result { + async fn delete_batches_gt_block_number( + &self, + block_number: u64, + ) -> Result { tracing::trace!(target: "scroll::db", block_number, "Deleting batch inputs greater than block number."); Ok(models::batch_commit::Entity::delete_many() .filter(models::batch_commit::Column::BlockNumber.gt(block_number as i64)) @@ -141,6 +147,16 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .map(|x| x.rows_affected)?) } + /// Delete all [`BatchCommitData`]s with a batch index greater than the provided index. + async fn delete_batches_gt_batch_index(&self, batch_index: u64) -> Result { + tracing::trace!(target: "scroll::db", batch_index, "Deleting batch inputs greater than batch index."); + Ok(models::batch_commit::Entity::delete_many() + .filter(models::batch_commit::Column::Index.gt(batch_index as i64)) + .exec(self.get_connection()) + .await + .map(|x| x.rows_affected)?) + } + /// Get an iterator over all [`BatchCommitData`]s in the database. async fn get_batches<'a>( &'a self, @@ -327,7 +343,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .get_batch_by_index(batch_info.index - 1) .await? .expect("Batch info must be present due to database query arguments"); - let l2_block = self.get_highest_block_for_batch(previous_batch.hash).await?; + let l2_block = self.get_highest_block_for_batch_hash(previous_batch.hash).await?; (l2_block, Some(batch.block_number)) } else { (None, None) @@ -337,7 +353,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { } /// Delete all L2 blocks with a block number greater than the provided block number. - async fn delete_l2_blocks_gt(&self, block_number: u64) -> Result { + async fn delete_l2_blocks_gt_block_number( + &self, + block_number: u64, + ) -> Result { tracing::trace!(target: "scroll::db", block_number, "Deleting L2 blocks greater than provided block number."); Ok(models::l2_block::Entity::delete_many() .filter(models::l2_block::Column::BlockNumber.gt(block_number as i64)) @@ -346,6 +365,23 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .map(|x| x.rows_affected)?) } + /// Delete all L2 blocks with a batch index greater than the batch index. + async fn delete_l2_blocks_gt_batch_index( + &self, + batch_index: u64, + ) -> Result { + tracing::trace!(target: "scroll::db", batch_index, "Deleting L2 blocks greater than provided batch index."); + Ok(models::l2_block::Entity::delete_many() + .filter( + Condition::all() + .add(models::l2_block::Column::BatchIndex.is_not_null()) + .add(models::l2_block::Column::BatchIndex.gt(batch_index as i64)), + ) + .exec(self.get_connection()) + .await + .map(|x| x.rows_affected)?) + } + /// Insert a new block in the database. async fn insert_block( &self, @@ -365,6 +401,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .on_conflict( OnConflict::column(models::l2_block::Column::BlockNumber) .update_columns([ + models::l2_block::Column::BlockHash, models::l2_block::Column::BatchHash, models::l2_block::Column::BatchIndex, ]) @@ -396,7 +433,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { /// Returns the highest L2 block originating from the provided `batch_hash` or the highest block /// for the batch's index. - async fn get_highest_block_for_batch( + async fn get_highest_block_for_batch_hash( &self, batch_hash: B256, ) -> Result, DatabaseError> { @@ -420,6 +457,20 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { } } + /// Returns the highest L2 block originating from the provided `batch_index` or the highest + /// block for the batch's index. + async fn get_highest_block_for_batch_index( + &self, + batch_index: u64, + ) -> Result, DatabaseError> { + Ok(models::l2_block::Entity::find() + .filter(models::l2_block::Column::BatchIndex.lte(batch_index)) + .order_by_desc(models::l2_block::Column::BlockNumber) + .one(self.get_connection()) + .await? + .map(|model| model.block_info())) + } + /// Unwinds the indexer by deleting all indexed data greater than the provided L1 block number. async fn unwind( &self, @@ -427,7 +478,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { l1_block_number: u64, ) -> Result { // delete batch inputs and l1 messages - let batches_removed = self.delete_batches_gt(l1_block_number).await?; + let batches_removed = self.delete_batches_gt_block_number(l1_block_number).await?; let deleted_messages = self.delete_l1_messages_gt(l1_block_number).await?; // filter and sort the executed L1 messages @@ -441,10 +492,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { if let Some(msg) = removed_executed_l1_messages.first() { let l2_reorg_block_number = msg .l2_block_number - .expect("we guarantee that this is Some(u64) due to the filter above") - - 1; + .expect("we guarantee that this is Some(u64) due to the filter above") + .saturating_sub(1); let l2_block_info = self.get_l2_block_info_by_number(l2_reorg_block_number).await?; - self.delete_l2_blocks_gt(l2_reorg_block_number).await?; + self.delete_l2_blocks_gt_block_number(l2_reorg_block_number).await?; (Some(msg.transaction.queue_index), l2_block_info) } else { (None, None) diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 046b2589..8ec64242 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -123,6 +123,13 @@ where } None } + + /// Flushes all the data in the pipeline. + pub fn flush(&mut self) { + self.attributes_queue.clear(); + self.batch_queue.clear(); + self.pipeline_futures = FuturesOrdered::new(); + } } impl

Stream for DerivationPipeline

diff --git a/crates/engine/src/driver.rs b/crates/engine/src/driver.rs index 857ed31e..101ef32c 100644 --- a/crates/engine/src/driver.rs +++ b/crates/engine/src/driver.rs @@ -108,6 +108,19 @@ where self.block_building_duration = block_building_duration; } + /// Clear the l1 attributes queue. + pub fn clear_l1_payload_attributes(&mut self) { + // clear the L1 attributes queue. + self.l1_payload_attributes.clear(); + + // drop the engine future if it is a L1 consolidation. + if let Some(MeteredFuture { fut: EngineFuture::L1Consolidation(_), .. }) = + self.engine_future + { + self.engine_future.take(); + } + } + /// Handles a block import request by adding it to the queue and waking up the driver. pub fn handle_block_import(&mut self, block_with_peer: NewBlockWithPeer) { tracing::trace!(target: "scroll::engine", ?block_with_peer, "new block import request received"); diff --git a/crates/indexer/src/event.rs b/crates/indexer/src/event.rs index 008996a7..5b9b7051 100644 --- a/crates/indexer/src/event.rs +++ b/crates/indexer/src/event.rs @@ -4,8 +4,14 @@ use rollup_node_primitives::{BatchInfo, BlockInfo, L2BlockInfoWithL1Messages}; /// An event emitted by the indexer. #[derive(Debug, Clone, PartialEq, Eq)] pub enum IndexerEvent { - /// A `BatchCommit` event has been indexed returning the batch info. - BatchCommitIndexed(BatchInfo), + /// A `BatchCommit` event has been indexed returning the batch info and the L2 block info to + /// revert to due to a batch revert. + BatchCommitIndexed { + /// The batch info. + batch_info: BatchInfo, + /// The safe L2 block info. + safe_head: Option, + }, /// A `BatchFinalization` event has been indexed returning the batch hash and new finalized L2 /// block. BatchFinalizationIndexed(B256, Option), diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs index 0827fe89..95f46368 100644 --- a/crates/indexer/src/lib.rs +++ b/crates/indexer/src/lib.rs @@ -249,8 +249,29 @@ impl Indexer< database: Arc, batch: BatchCommitData, ) -> Result { - let event = IndexerEvent::BatchCommitIndexed(BatchInfo::new(batch.index, batch.hash)); - database.insert_batch(batch).await?; + let txn = database.tx().await?; + let prev_batch_index = batch.index - 1; + + // remove any batches with an index greater than the previous batch. + let affected = txn.delete_batches_gt_batch_index(prev_batch_index).await?; + + // handle the case of a batch revert. + let new_safe_head = if affected > 0 { + txn.delete_l2_blocks_gt_batch_index(prev_batch_index).await?; + txn.get_highest_block_for_batch_index(prev_batch_index).await? + } else { + None + }; + + let event = IndexerEvent::BatchCommitIndexed { + batch_info: BatchInfo::new(batch.index, batch.hash), + safe_head: new_safe_head, + }; + + // insert the batch and commit the transaction. + txn.insert_batch(batch).await?; + txn.commit().await?; + Ok(event) } @@ -285,7 +306,7 @@ impl Indexer< batch_hash: B256, l2_block_number: Arc, ) -> Result, IndexerError> { - let finalized_block = database.get_highest_block_for_batch(batch_hash).await?; + let finalized_block = database.get_highest_block_for_batch_hash(batch_hash).await?; // only return the block if the indexer hasn't seen it. // in which case also update the `l2_finalized_block_number` value. @@ -301,63 +322,6 @@ impl Indexer< } } -/// Unwinds the indexer by deleting all indexed data greater than the provided L1 block number. -pub async fn unwind( - database: Arc, - chain_spec: Arc, - l1_block_number: u64, -) -> Result { - // create a database transaction so this operation is atomic - let txn = database.tx().await?; - - // delete batch inputs and l1 messages - let batches_removed = txn.delete_batches_gt(l1_block_number).await?; - let deleted_messages = txn.delete_l1_messages_gt(l1_block_number).await?; - - // filter and sort the executed L1 messages - let mut removed_executed_l1_messages: Vec<_> = - deleted_messages.into_iter().filter(|x| x.l2_block_number.is_some()).collect(); - removed_executed_l1_messages - .sort_by(|a, b| a.transaction.queue_index.cmp(&b.transaction.queue_index)); - - // check if we need to reorg the L2 head and delete some L2 blocks - let (queue_index, l2_head_block_info) = if let Some(msg) = removed_executed_l1_messages.first() - { - let l2_reorg_block_number = msg - .l2_block_number - .expect("we guarantee that this is Some(u64) due to the filter on line 130") - - 1; - let l2_block_info = txn - .get_l2_block_info_by_number(l2_reorg_block_number) - .await? - .ok_or(IndexerError::L2BlockNotFound(l2_reorg_block_number))?; - txn.delete_l2_blocks_gt(l2_reorg_block_number).await?; - (Some(msg.transaction.queue_index), Some(l2_block_info)) - } else { - (None, None) - }; - - // check if we need to reorg the L2 safe block - let l2_safe_block_info = if batches_removed > 0 { - if let Some(x) = txn.get_latest_safe_l2_info().await? { - Some(x.0) - } else { - Some(BlockInfo::new(0, chain_spec.genesis_hash())) - } - } else { - None - }; - - // commit the transaction - txn.commit().await?; - Ok(IndexerEvent::UnwindIndexed { - l1_block_number, - queue_index, - l2_head_block_info, - l2_safe_block_info, - }) -} - impl Stream for Indexer { type Item = Result; diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index e9051cef..9d3d51ac 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -237,7 +237,16 @@ where fn handle_indexer_event(&mut self, event: IndexerEvent) { trace!(target: "scroll::node::manager", ?event, "Received indexer event"); match event { - IndexerEvent::BatchCommitIndexed(batch_info) => { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + // if we detected a batch revert event, we reset the pipeline and the engine driver. + if let Some(new_safe_head) = safe_head { + if let Some(pipeline) = self.derivation_pipeline.as_mut() { + pipeline.flush() + } + self.engine.clear_l1_payload_attributes(); + self.engine.set_head_block_info(new_safe_head); + self.engine.set_safe_block_info(new_safe_head); + } // push the batch info into the derivation pipeline. if let Some(pipeline) = &mut self.derivation_pipeline { pipeline.handle_batch_commit(batch_info); diff --git a/crates/primitives/src/metrics.rs b/crates/primitives/src/metrics.rs index 487442b8..205eeff1 100644 --- a/crates/primitives/src/metrics.rs +++ b/crates/primitives/src/metrics.rs @@ -9,7 +9,8 @@ use std::time::Instant; /// A metered future that records the start of the polling. pub struct MeteredFuture { - fut: F, + /// The future being metered. + pub fut: F, started_at: Instant, } From fd116a9df8032eb483b15a230bfbab5a53af32d7 Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Thu, 3 Jul 2025 13:54:04 +0200 Subject: [PATCH 5/6] test: add testing --- crates/database/db/src/db.rs | 82 +++++++++++++++++-- crates/indexer/src/lib.rs | 154 +++++++++++++++++++++++++++++++++-- 2 files changed, 222 insertions(+), 14 deletions(-) diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index 580fa482..db771488 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -172,9 +172,14 @@ mod test { block_number += 1; } + // Fetch the highest block for the batch hash and verify number. + let highest_block_info = + db.get_highest_block_for_batch_hash(batch_info.hash).await.unwrap().unwrap(); + assert_eq!(highest_block_info.number, block_number - 1); + // Fetch the highest block for the batch and verify number. let highest_block_info = - db.get_highest_block_for_batch(batch_info.hash).await.unwrap().unwrap(); + db.get_highest_block_for_batch_index(batch_info.index).await.unwrap().unwrap(); assert_eq!(highest_block_info.number, block_number - 1); } @@ -211,9 +216,14 @@ mod test { block_number += 1; } - // Fetch the highest block for the batch and verify number. + // Fetch the highest block for the batch hash and verify number. let highest_block_info = - db.get_highest_block_for_batch(second_batch_info.hash).await.unwrap().unwrap(); + db.get_highest_block_for_batch_hash(second_batch_info.hash).await.unwrap().unwrap(); + assert_eq!(highest_block_info.number, block_number - 1); + + // Fetch the highest block for the batch index and verify number. + let highest_block_info = + db.get_highest_block_for_batch_index(second_batch_info.index).await.unwrap().unwrap(); assert_eq!(highest_block_info.number, block_number - 1); } @@ -480,7 +490,7 @@ mod test { } #[tokio::test] - async fn test_delete_l2_blocks_gt() { + async fn test_delete_l2_blocks_gt_block_number() { // Set up the test database. let db = setup_test_db().await; @@ -499,7 +509,7 @@ mod test { } // Delete blocks with number > 405 - let deleted_count = db.delete_l2_blocks_gt(405).await.unwrap(); + let deleted_count = db.delete_l2_blocks_gt_block_number(405).await.unwrap(); assert_eq!(deleted_count, 4); // Blocks 406, 407, 408, 409 // Verify remaining blocks still exist @@ -515,6 +525,68 @@ mod test { } } + #[tokio::test] + async fn test_delete_l2_blocks_gt_batch_index() { + // Set up the test database. + let db = setup_test_db().await; + + // Generate unstructured bytes. + let mut bytes = [0u8; 1024]; + rand::rng().fill(bytes.as_mut_slice()); + let mut u = Unstructured::new(&bytes); + + // Insert multiple batches + for i in 100..110 { + let batch_data = BatchCommitData { + index: i, + calldata: Arc::new(vec![].into()), + ..Arbitrary::arbitrary(&mut u).unwrap() + }; + db.insert_batch(batch_data).await.unwrap(); + } + + // Insert L2 blocks with different batch indices + for i in 100..110 { + let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap(); + let batch_info: BatchInfo = batch_data.into(); + + let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() }; + let l2_block = L2BlockInfoWithL1Messages { block_info, l1_messages: vec![] }; + + db.insert_block(l2_block, Some(batch_info)).await.unwrap(); + } + + // Insert some blocks without batch index (should not be deleted) + for i in 0..3 { + let block_info = BlockInfo { number: 600 + i, hash: B256::arbitrary(&mut u).unwrap() }; + let l2_block = L2BlockInfoWithL1Messages { block_info, l1_messages: vec![] }; + + db.insert_block(l2_block, None).await.unwrap(); + } + + // Delete L2 blocks with batch index > 105 + let deleted_count = db.delete_l2_blocks_gt_batch_index(105).await.unwrap(); + assert_eq!(deleted_count, 4); // Blocks with batch indices 106, 107, 108, 109 + + // Verify remaining blocks with batch index <= 105 still exist + for i in 100..=105 { + let block = db.get_l2_block_info_by_number(500 + i).await.unwrap(); + assert!(block.is_some()); + } + + // Verify deleted blocks with batch index > 105 no longer exist + for i in 106..110 { + let block = db.get_l2_block_info_by_number(500 + i).await.unwrap(); + assert!(block.is_none()); + } + + // Verify blocks without batch index are still there (not affected by batch index filter) + for i in 0..3 { + let block = db.get_l2_block_info_by_number(600 + i).await.unwrap(); + assert!(block.is_some()); + } + } + #[tokio::test] async fn test_insert_block_with_l1_messages() { // Set up the test database. diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs index 95f46368..afadeb1d 100644 --- a/crates/indexer/src/lib.rs +++ b/crates/indexer/src/lib.rs @@ -373,13 +373,143 @@ mod test { let batch_commit = BatchCommitData::arbitrary(&mut u).unwrap(); indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit.clone())); - let _ = indexer.next().await; + let event = indexer.next().await.unwrap().unwrap(); - let batch_commit_result = db.get_batch_by_index(batch_commit.index).await.unwrap().unwrap(); + // Verify the event structure + match event { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + assert_eq!(batch_info.index, batch_commit.index); + assert_eq!(batch_info.hash, batch_commit.hash); + assert_eq!(safe_head, None); // No safe head since no batch revert + } + _ => panic!("Expected BatchCommitIndexed event"), + } + let batch_commit_result = db.get_batch_by_index(batch_commit.index).await.unwrap().unwrap(); assert_eq!(batch_commit, batch_commit_result); } + #[tokio::test] + async fn test_handle_batch_commit_with_revert() { + // Instantiate indexer and db + let (mut indexer, db) = setup_test_indexer().await; + + // Generate unstructured bytes. + let mut bytes = [0u8; 1024]; + rand::rng().fill(bytes.as_mut_slice()); + let mut u = Unstructured::new(&bytes); + + // Create sequential batches + let batch_1 = BatchCommitData { + index: 100, + calldata: Arc::new(vec![].into()), + ..Arbitrary::arbitrary(&mut u).unwrap() + }; + let batch_2 = BatchCommitData { + index: 101, + calldata: Arc::new(vec![].into()), + ..Arbitrary::arbitrary(&mut u).unwrap() + }; + let batch_3 = BatchCommitData { + index: 102, + calldata: Arc::new(vec![].into()), + ..Arbitrary::arbitrary(&mut u).unwrap() + }; + + // Index first batch + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_1.clone())); + let event = indexer.next().await.unwrap().unwrap(); + match event { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + assert_eq!(batch_info.index, 100); + assert_eq!(safe_head, None); + } + _ => panic!("Expected BatchCommitIndexed event"), + } + + // Index second batch + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_2.clone())); + let event = indexer.next().await.unwrap().unwrap(); + match event { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + assert_eq!(batch_info.index, 101); + assert_eq!(safe_head, None); + } + _ => panic!("Expected BatchCommitIndexed event"), + } + + // Index third batch + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_3.clone())); + let event = indexer.next().await.unwrap().unwrap(); + match event { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + assert_eq!(batch_info.index, 102); + assert_eq!(safe_head, None); + } + _ => panic!("Expected BatchCommitIndexed event"), + } + + // Add some L2 blocks for the batches + let batch_1_info = BatchInfo::new(batch_1.index, batch_1.hash); + let batch_2_info = BatchInfo::new(batch_2.index, batch_2.hash); + + let block_1 = L2BlockInfoWithL1Messages { + block_info: BlockInfo { number: 500, hash: Arbitrary::arbitrary(&mut u).unwrap() }, + l1_messages: vec![], + }; + let block_2 = L2BlockInfoWithL1Messages { + block_info: BlockInfo { number: 501, hash: Arbitrary::arbitrary(&mut u).unwrap() }, + l1_messages: vec![], + }; + let block_3 = L2BlockInfoWithL1Messages { + block_info: BlockInfo { number: 502, hash: Arbitrary::arbitrary(&mut u).unwrap() }, + l1_messages: vec![], + }; + + indexer.handle_block(block_1.clone(), Some(batch_1_info)); + indexer.next().await.unwrap().unwrap(); + + indexer.handle_block(block_2.clone(), Some(batch_2_info)); + indexer.next().await.unwrap().unwrap(); + + indexer.handle_block(block_3.clone(), Some(batch_2_info)); + indexer.next().await.unwrap().unwrap(); + + // Now simulate a batch revert by submitting a new batch with index 101 + // This should delete batch 102 and any blocks associated with it + let new_batch_2 = BatchCommitData { + index: 101, + calldata: Arc::new(vec![1, 2, 3].into()), // Different data + ..Arbitrary::arbitrary(&mut u).unwrap() + }; + + indexer.handle_l1_notification(L1Notification::BatchCommit(new_batch_2.clone())); + let event = indexer.next().await.unwrap().unwrap(); + + // Verify the event indicates a batch revert + match event { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + assert_eq!(batch_info.index, 101); + assert_eq!(batch_info.hash, new_batch_2.hash); + // Safe head should be the highest block from batch index <= 100 + assert_eq!(safe_head, Some(block_1.block_info)); + } + _ => panic!("Expected BatchCommitIndexed event"), + } + + // Verify batch 102 was deleted + let batch_102 = db.get_batch_by_index(102).await.unwrap(); + assert!(batch_102.is_none()); + + // Verify batch 101 was replaced with new data + let updated_batch_101 = db.get_batch_by_index(101).await.unwrap().unwrap(); + assert_eq!(updated_batch_101, new_batch_2); + + // Verify batch 100 still exists + let batch_100 = db.get_batch_by_index(100).await.unwrap(); + assert!(batch_100.is_some()); + } + #[tokio::test] async fn test_handle_l1_message() { // Instantiate indexer and db @@ -462,14 +592,17 @@ mod test { // Generate a 3 random batch inputs and set their block numbers let mut batch_commit_block_1 = BatchCommitData::arbitrary(&mut u).unwrap(); batch_commit_block_1.block_number = 1; + batch_commit_block_1.index = 1; let batch_commit_block_1 = batch_commit_block_1; let mut batch_commit_block_20 = BatchCommitData::arbitrary(&mut u).unwrap(); batch_commit_block_20.block_number = 20; + batch_commit_block_20.index = 20; let batch_commit_block_20 = batch_commit_block_20; let mut batch_commit_block_30 = BatchCommitData::arbitrary(&mut u).unwrap(); batch_commit_block_30.block_number = 30; + batch_commit_block_30.index = 30; let batch_commit_block_30 = batch_commit_block_30; // Index batch inputs @@ -549,19 +682,22 @@ mod test { // Generate a 3 random batch inputs and set their block numbers let batch_commit_block_1 = - BatchCommitData { block_number: 5, ..Arbitrary::arbitrary(&mut u).unwrap() }; - let batch_commit_block_20 = - BatchCommitData { block_number: 10, ..Arbitrary::arbitrary(&mut u).unwrap() }; + BatchCommitData { block_number: 5, index: 5, ..Arbitrary::arbitrary(&mut u).unwrap() }; + let batch_commit_block_10 = BatchCommitData { + block_number: 10, + index: 10, + ..Arbitrary::arbitrary(&mut u).unwrap() + }; // Index batch inputs indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit_block_1.clone())); - indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit_block_20.clone())); + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit_block_10.clone())); for _ in 0..2 { let _event = indexer.next().await.unwrap().unwrap(); } let batch_1 = BatchInfo::new(batch_commit_block_1.index, batch_commit_block_1.hash); - let batch_20 = BatchInfo::new(batch_commit_block_20.index, batch_commit_block_20.hash); + let batch_10 = BatchInfo::new(batch_commit_block_10.index, batch_commit_block_10.hash); const UNITS_FOR_TESTING: u64 = 20; const L1_MESSAGES_NOT_EXECUTED_COUNT: u64 = 7; @@ -601,7 +737,7 @@ mod test { let batch_info = if block_number < 5 { Some(batch_1) } else if block_number < 10 { - Some(batch_20) + Some(batch_10) } else { None }; @@ -624,7 +760,7 @@ mod test { } ); - // Reorg at block 17 which is one of the messages that has not been executed yet. No reorg + // Reorg at block 7 which is one of the messages that has not been executed yet. No reorg // but we should ensure the L1 messages have been deleted. indexer.handle_l1_notification(L1Notification::Reorg(7)); let event = indexer.next().await.unwrap().unwrap(); From 4522811f0482a41a74c113e6252ae160e28ed787 Mon Sep 17 00:00:00 2001 From: Gregory Edison Date: Thu, 3 Jul 2025 13:54:26 +0200 Subject: [PATCH 6/6] feat: order commit logs in watcher by block number first --- crates/watcher/src/lib.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index b3fbaf50..b84531d0 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -22,6 +22,7 @@ use rollup_node_providers::SystemContractProvider; use scroll_alloy_consensus::TxL1Message; use scroll_l1::abi::logs::{try_decode_log, CommitBatch, FinalizeBatch, QueueTransaction}; use std::{ + cmp::Ordering, fmt::{Debug, Display, Formatter}, sync::Arc, time::Duration, @@ -420,9 +421,14 @@ where // prepare notifications let mut notifications = Vec::with_capacity(commit_logs_with_tx.len()); - // sort the commits and group by tx hash. - commit_logs_with_tx - .sort_by(|(_, data_a, _), (_, data_b, _)| data_a.batch_index.cmp(&data_b.batch_index)); + // sort the commits by block number then batch index, then group by tx hash. + commit_logs_with_tx.sort_by(|(log_a, data_a, _), (log_b, data_b, _)| { + log_a + .block_number + .and_then(|a| log_b.block_number.map(|b| a.cmp(&b))) + .unwrap_or(Ordering::Equal) + .then_with(|| data_a.batch_index.cmp(&data_b.batch_index)) + }); let groups = commit_logs_with_tx.into_iter().chunk_by(|(_, _, hash)| *hash); let groups: Vec<_> = groups.into_iter().map(|(hash, group)| (hash, group.collect::>())).collect();