diff --git a/.codespellrc b/.codespellrc index ed79b8bc..1e4d8f23 100644 --- a/.codespellrc +++ b/.codespellrc @@ -1,3 +1,3 @@ [codespell] -skip = .git,target,Cargo.toml,Cargo.lock +skip = .git,target,Cargo.toml,Cargo.lock,docker-compose ignore-words-list = crate diff --git a/Cargo.lock b/Cargo.lock index ff0c82ea..033e85a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10071,6 +10071,7 @@ dependencies = [ "reth-transaction-pool", "reth-trie-db", "rollup-node", + "rollup-node-indexer", "rollup-node-manager", "rollup-node-primitives", "rollup-node-providers", @@ -10098,23 +10099,39 @@ dependencies = [ name = "rollup-node-indexer" version = "0.0.1" dependencies = [ + "alloy-consensus 1.0.9", + "alloy-eips 1.0.9", "alloy-primitives", + "alloy-provider", + "alloy-rpc-client", + "alloy-transport", "arbitrary", "futures", "metrics", "metrics-derive", + "parking_lot 0.12.3", "rand 0.9.1", + "reqwest", "reth-chainspec", + "reth-eth-wire-types", + "reth-network-p2p", + "reth-network-peers", + "reth-primitives-traits", "reth-scroll-chainspec", "reth-scroll-forks", + "reth-scroll-primitives", "rollup-node-primitives", "rollup-node-watcher", "scroll-alloy-consensus", "scroll-alloy-hardforks", + "scroll-alloy-network", "scroll-db", + "scroll-network", + "serde_json", "strum 0.27.1", "thiserror 2.0.12", "tokio", + "tracing", ] [[package]] @@ -10168,6 +10185,7 @@ dependencies = [ "alloy-rpc-types-engine 1.0.9", "arbitrary", "derive_more", + "reth-network-peers", "reth-primitives-traits", "reth-scroll-primitives", "scroll-alloy-consensus", @@ -10854,6 +10872,7 @@ dependencies = [ "reqwest-middleware", "reqwest-retry", "reth-chainspec", + "reth-scroll-chainspec", "sea-orm", "sea-orm-migration", "sha2 0.10.9", @@ -10867,6 +10886,7 @@ dependencies = [ "alloy-primitives", "futures", "parking_lot 0.12.3", + "reth-chainspec", "reth-eth-wire-types", "reth-network", "reth-network-api", @@ -10877,6 +10897,8 @@ dependencies = [ "reth-scroll-node", "reth-scroll-primitives", "reth-storage-api", + "reth-tokio-util", + "scroll-alloy-hardforks", "scroll-wire", "thiserror 2.0.12", "tokio", diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index 580fa482..4e7179da 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -315,8 +315,13 @@ mod test { db.insert_l1_message(l1_message_2.clone()).await.unwrap(); // collect the L1Messages - let l1_messages = - db.get_l1_messages().await.unwrap().map(|res| res.unwrap()).collect::>().await; + let l1_messages = db + .get_l1_messages(None) + .await + .unwrap() + .map(|res| res.unwrap()) + .collect::>() + .await; // Apply the assertions. assert!(l1_messages.contains(&l1_message_1)); @@ -412,9 +417,10 @@ mod test { rand::rng().fill(bytes.as_mut_slice()); let mut u = Unstructured::new(&bytes); - // Initially should return None - let latest_safe = db.get_latest_safe_l2_info().await.unwrap(); - assert!(latest_safe.is_none()); + // Initially should return the genesis block and hash. + let (latest_safe_block, batch) = db.get_latest_safe_l2_info().await.unwrap().unwrap(); + assert_eq!(latest_safe_block.number, 0); + assert_eq!(batch.index, 0); // Generate and insert a batch let batch_data = BatchCommitData { index: 100, ..Arbitrary::arbitrary(&mut u).unwrap() }; diff --git a/crates/database/db/src/lib.rs b/crates/database/db/src/lib.rs index 3022048b..0184b06a 100644 --- a/crates/database/db/src/lib.rs +++ b/crates/database/db/src/lib.rs @@ -13,7 +13,7 @@ mod models; pub use models::*; mod operations; -pub use operations::{DatabaseOperations, UnwindResult}; +pub use operations::{DatabaseOperations, L1MessageStart, UnwindResult}; mod transaction; pub use transaction::DatabaseTransaction; diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 854b1942..c5d616dc 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -227,12 +227,30 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .map(|x| x.map(Into::into))?) } - /// Gets an iterator over all [`L1MessageEnvelope`]s in the database. + /// Get an iterator over all [`L1MessageEnvelope`]s in the database starting from the provided + /// `start` point. async fn get_l1_messages<'a>( &'a self, + start: Option, ) -> Result> + 'a, DatabaseError> { + let queue_index = match start { + Some(L1MessageStart::Index(i)) => i, + Some(L1MessageStart::Hash(ref h)) => { + // Lookup message by hash + let record = models::l1_message::Entity::find() + .filter(models::l1_message::Column::Hash.eq(h.to_vec())) + .one(self.get_connection()) + .await? + .ok_or_else(|| DatabaseError::L1MessageNotFound(0))?; + + record.queue_index as u64 + } + None => 0, + }; + Ok(models::l1_message::Entity::find() + .filter(models::l1_message::Column::QueueIndex.gte(queue_index)) .stream(self.get_connection()) .await? .map(|res| Ok(res.map(Into::into)?))) @@ -250,6 +268,24 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .map(|x| x.map(Into::into))?) } + /// Get the [`BlockInfo`] and optional [`BatchInfo`] for the provided block hash. + async fn get_l2_block_and_batch_info_by_hash( + &self, + block_hash: B256, + ) -> Result)>, DatabaseError> { + tracing::trace!(target: "scroll::db", ?block_hash, "Fetching L2 block and batch info by hash from database."); + Ok(models::l2_block::Entity::find() + .filter(models::l2_block::Column::BlockHash.eq(block_hash.to_vec())) + .one(self.get_connection()) + .await + .map(|x| { + x.map(|x| { + let (block_info, batch_info): (BlockInfo, Option) = x.into(); + (block_info, batch_info) + }) + })?) + } + /// Get a [`BlockInfo`] from the database by its block number. async fn get_l2_block_info_by_number( &self, @@ -298,6 +334,18 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .map(|x| x.map(|x| x.block_info()))?) } + /// Get an iterator over all L2 blocks in the database starting from the most recent one. + async fn get_l2_blocks<'a>( + &'a self, + ) -> Result> + 'a, DatabaseError> { + tracing::trace!(target: "scroll::db", "Fetching L2 blocks from database."); + Ok(models::l2_block::Entity::find() + .order_by_desc(models::l2_block::Column::BlockNumber) + .stream(self.get_connection()) + .await? + .map(|res| Ok(res.map(|res| res.block_info())?))) + } + /// Prepare the database on startup and return metadata used for other components in the /// rollup-node. /// @@ -346,6 +394,18 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .map(|x| x.rows_affected)?) } + /// Insert multiple blocks into the database. + async fn insert_blocks( + &self, + blocks: Vec, + batch_info: Option, + ) -> Result<(), DatabaseError> { + for block in blocks { + self.insert_block(block, batch_info).await?; + } + Ok(()) + } + /// Insert a new block in the database. async fn insert_block( &self, @@ -466,6 +526,18 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { } } +/// This type defines the start of an L1 message stream. +/// +/// It can either be an index, which is the queue index of the first message to return, or a hash, +/// which is the hash of the first message to return. +#[derive(Debug)] +pub enum L1MessageStart { + /// Start from the provided queue index. + Index(u64), + /// Start from the provided queue hash. + Hash(B256), +} + /// The result of [`DatabaseOperations::unwind`]. #[derive(Debug)] pub struct UnwindResult { diff --git a/crates/database/migration/Cargo.toml b/crates/database/migration/Cargo.toml index 9abab627..872a070e 100644 --- a/crates/database/migration/Cargo.toml +++ b/crates/database/migration/Cargo.toml @@ -16,6 +16,7 @@ reth-chainspec.workspace = true sea-orm = { workspace = true, features = ["sqlx-sqlite", "runtime-tokio-native-tls", "macros"] } sha2 = "0.10.9" tracing.workspace = true +reth-scroll-chainspec.workspace = true [dependencies.sea-orm-migration] version = "1.1.0" diff --git a/crates/database/migration/src/lib.rs b/crates/database/migration/src/lib.rs index e4733d5b..d64688d0 100644 --- a/crates/database/migration/src/lib.rs +++ b/crates/database/migration/src/lib.rs @@ -19,7 +19,7 @@ impl MigratorTrait for Migrator { Box::new(m20250304_125946_add_l1_msg_table::Migration), Box::new(m20250408_132123_add_header_metadata::Migration), Box::new(m20250408_150338_load_header_metadata::Migration::(Default::default())), - Box::new(m20250411_072004_add_l2_block::Migration), + Box::new(m20250411_072004_add_l2_block::Migration::(Default::default())), Box::new(m20250616_223947_add_metadata::Migration), ] } diff --git a/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs b/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs index 1ee03466..e85a9d07 100644 --- a/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs +++ b/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs @@ -1,3 +1,4 @@ +use sea_orm::Statement; use sea_orm_migration::{prelude::*, schema::*}; // TODO: migrate these to a constants module @@ -23,7 +24,29 @@ impl MigrationTrait for Migration { .col(big_unsigned_null(BatchCommit::FinalizedBlockNumber)) .to_owned(), ) - .await + .await?; + + manager + .get_connection() + .execute(Statement::from_sql_and_values( + manager.get_database_backend(), + r#" + INSERT INTO batch_commit ("index", hash, block_number, block_timestamp, calldata, blob_hash, finalized_block_number) + VALUES (?, ?, ?, ?, ?, ?, ?) + "#, + vec![ + 0u64.into(), + vec![0u8; HASH_LENGTH as usize].into(), + 0u64.into(), + 0u64.into(), + vec![].into(), + None::>.into(), + 0u64.into(), + ], + )) + .await?; + + Ok(()) } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { diff --git a/crates/database/migration/src/m20250411_072004_add_l2_block.rs b/crates/database/migration/src/m20250411_072004_add_l2_block.rs index c8e1582d..1f383f7a 100644 --- a/crates/database/migration/src/m20250411_072004_add_l2_block.rs +++ b/crates/database/migration/src/m20250411_072004_add_l2_block.rs @@ -1,12 +1,18 @@ -use super::m20220101_000001_create_batch_commit_table::BatchCommit; +use super::{m20220101_000001_create_batch_commit_table::BatchCommit, MigrationInfo}; +use sea_orm::Statement; use sea_orm_migration::{prelude::*, schema::*}; -#[derive(DeriveMigrationName)] -pub struct Migration; +pub struct Migration(pub std::marker::PhantomData); + +impl MigrationName for Migration { + fn name(&self) -> &str { + sea_orm_migration::util::get_file_stem(file!()) + } +} #[async_trait::async_trait] -impl MigrationTrait for Migration { +impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { manager .create_table( @@ -35,7 +41,24 @@ impl MigrationTrait for Migration { ) .to_owned(), ) - .await + .await?; + + // Insert the genesis block. + let genesis_hash = MI::genesis_hash(); + + manager + .get_connection() + .execute(Statement::from_sql_and_values( + manager.get_database_backend(), + r#" + INSERT INTO l2_block (block_number, block_hash, batch_index, batch_hash) + VALUES (?, ?, ?, ?) + "#, + vec![0u64.into(), genesis_hash.to_vec().into(), 0u64.into(), vec![0u8; 32].into()], + )) + .await?; + + Ok(()) } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { diff --git a/crates/database/migration/src/migration_info.rs b/crates/database/migration/src/migration_info.rs index 38615f2c..0377f36c 100644 --- a/crates/database/migration/src/migration_info.rs +++ b/crates/database/migration/src/migration_info.rs @@ -1,4 +1,5 @@ use alloy_primitives::{b256, B256}; +use reth_scroll_chainspec::{SCROLL_MAINNET_GENESIS_HASH, SCROLL_SEPOLIA_GENESIS_HASH}; pub enum DataSource { Url(String), @@ -8,6 +9,7 @@ pub enum DataSource { pub trait MigrationInfo { fn data_source() -> Option; fn data_hash() -> Option; + fn genesis_hash() -> B256; } impl MigrationInfo for () { @@ -18,6 +20,11 @@ impl MigrationInfo for () { fn data_hash() -> Option { None } + + fn genesis_hash() -> B256 { + // Todo: Update + b256!("0xb5bd7381c6b550af0de40d6c490602574d76427c8cce17b54cb7917c323136f2") + } } /// The type implementing migration info for Mainnet. @@ -33,6 +40,10 @@ impl MigrationInfo for ScrollMainnetMigrationInfo { fn data_hash() -> Option { Some(b256!("fa2746026ec9590e37e495cb20046e20a38fd0e7099abd2012640dddf6c88b25")) } + + fn genesis_hash() -> B256 { + SCROLL_MAINNET_GENESIS_HASH + } } pub struct ScrollMainnetTestMigrationInfo; @@ -45,6 +56,10 @@ impl MigrationInfo for ScrollMainnetTestMigrationInfo { fn data_hash() -> Option { None } + + fn genesis_hash() -> B256 { + SCROLL_MAINNET_GENESIS_HASH + } } /// The type implementing migration info for Sepolia. @@ -60,4 +75,8 @@ impl MigrationInfo for ScrollSepoliaMigrationInfo { fn data_hash() -> Option { Some(b256!("a02354c12ca0f918bf4768255af9ed13c137db7e56252348f304b17bb4088924")) } + + fn genesis_hash() -> B256 { + SCROLL_SEPOLIA_GENESIS_HASH + } } diff --git a/crates/engine/src/driver.rs b/crates/engine/src/driver.rs index 857ed31e..f2292a3d 100644 --- a/crates/engine/src/driver.rs +++ b/crates/engine/src/driver.rs @@ -7,12 +7,13 @@ use crate::{ use alloy_provider::Provider; use futures::{ready, task::AtomicWaker, FutureExt, Stream}; -use rollup_node_primitives::{BlockInfo, MeteredFuture, ScrollPayloadAttributesWithBatchInfo}; +use rollup_node_primitives::{ + BlockInfo, ChainImport, MeteredFuture, ScrollPayloadAttributesWithBatchInfo, +}; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_alloy_provider::ScrollEngineApi; use scroll_alloy_rpc_types_engine::ScrollPayloadAttributes; -use scroll_network::NewBlockWithPeer; use std::{ collections::VecDeque, pin::Pin, @@ -34,14 +35,14 @@ pub struct EngineDriver { fcs: ForkchoiceState, /// Whether the EN is syncing. syncing: bool, - /// The gap between EN and tip of chain which triggers optimistic sync. - block_gap_sync_trigger: u64, /// Block building duration. block_building_duration: Duration, /// The pending payload attributes derived from batches on L1. l1_payload_attributes: VecDeque, /// The pending block imports received over the network. - block_imports: VecDeque, + chain_imports: VecDeque, + /// The latest optimistic sync target. + optimistic_sync_target: Option, /// The payload attributes associated with the next block to be built. sequencer_payload_attributes: Option, /// The future related to engine API. @@ -67,7 +68,6 @@ where provider: Option

, fcs: ForkchoiceState, sync_at_start_up: bool, - block_gap_sync_trigger: u64, block_building_duration: Duration, ) -> Self { Self { @@ -77,9 +77,9 @@ where fcs, block_building_duration, syncing: sync_at_start_up, - block_gap_sync_trigger, l1_payload_attributes: VecDeque::new(), - block_imports: VecDeque::new(), + chain_imports: VecDeque::new(), + optimistic_sync_target: None, sequencer_payload_attributes: None, payload_building_future: None, engine_future: None, @@ -109,17 +109,24 @@ where } /// Handles a block import request by adding it to the queue and waking up the driver. - pub fn handle_block_import(&mut self, block_with_peer: NewBlockWithPeer) { - tracing::trace!(target: "scroll::engine", ?block_with_peer, "new block import request received"); - - // Check diff between EN fcs and P2P network tips. - let en_block_number = self.fcs.head_block_info().number; - let p2p_block_number = block_with_peer.block.header.number; - if p2p_block_number.saturating_sub(en_block_number) > self.block_gap_sync_trigger { - self.syncing = true - } + pub fn handle_chain_import(&mut self, chain_import: ChainImport) { + tracing::trace!(target: "scroll::engine", head = %chain_import.chain.last().unwrap().hash_slow(), "new block import request received"); + + self.chain_imports.push_back(chain_import); + self.waker.wake(); + } + + /// Optimistically syncs the chain to the provided block info. + pub fn handle_optimistic_sync(&mut self, block_info: BlockInfo) { + tracing::info!(target: "scroll::engine", ?block_info, "optimistic sync request received"); + + // Purge all pending block imports. + self.chain_imports.clear(); - self.block_imports.push_back(block_with_peer); + // Update the fork choice state with the new block info. + self.optimistic_sync_target = Some(block_info); + + // Wake up the driver to process the optimistic sync. self.waker.wake(); } @@ -228,6 +235,18 @@ where } } } + EngineDriverFutureResult::OptimisticSync(result) => { + tracing::info!(target: "scroll::engine", ?result, "handling optimistic sync result"); + + match result { + Err(err) => { + tracing::error!(target: "scroll::engine", ?err, "failed to perform optimistic sync") + } + Ok(fcu) => { + tracing::trace!(target: "scroll::engine", ?fcu, "optimistic sync issued successfully"); + } + } + } } None @@ -323,13 +342,23 @@ where return Poll::Pending; } - // Handle the block import requests. - if let Some(block_with_peer) = this.block_imports.pop_front() { + // If we have an optimistic sync target, issue the optimistic sync. + if let Some(block_info) = this.optimistic_sync_target.take() { + this.fcs.update_head_block_info(block_info); + let fcs = this.fcs.get_alloy_optimistic_fcs(); + this.engine_future = + Some(MeteredFuture::new(EngineFuture::optimistic_sync(this.client.clone(), fcs))); + this.waker.wake(); + return Poll::Pending; + } + + // Handle the chain import requests. + if let Some(chain_import) = this.chain_imports.pop_front() { let fcs = this.alloy_forkchoice_state(); let client = this.client.clone(); this.engine_future = - Some(MeteredFuture::new(EngineFuture::block_import(client, block_with_peer, fcs))); + Some(MeteredFuture::new(EngineFuture::chain_import(client, chain_import, fcs))); this.waker.wake(); return Poll::Pending; @@ -393,15 +422,8 @@ mod tests { ForkchoiceState::from_block_info(BlockInfo { number: 0, hash: Default::default() }); let duration = Duration::from_secs(2); - let mut driver = EngineDriver::new( - client, - chain_spec, - None::, - fcs, - false, - 0, - duration, - ); + let mut driver = + EngineDriver::new(client, chain_spec, None::, fcs, false, duration); // Initially, it should be false assert!(!driver.is_payload_building_in_progress()); @@ -427,7 +449,6 @@ mod tests { None::, fcs, false, - 0, duration, ); diff --git a/crates/engine/src/event.rs b/crates/engine/src/event.rs index 1d9f1ab4..6cbfda0f 100644 --- a/crates/engine/src/event.rs +++ b/crates/engine/src/event.rs @@ -9,6 +9,20 @@ pub enum EngineDriverEvent { NewPayload(ScrollBlock), /// The result of attempting a block import. BlockImportOutcome(BlockImportOutcome), + /// The result of attempting a chain import. + ChainImportOutcome(ChainImportOutcome), /// A block derived from L1 has been consolidated. L1BlockConsolidated(ConsolidationOutcome), } + +/// The outcome of a chain import. +/// +/// This includes the result of the final block import outcome and the chain of blocks that were +/// imported. +#[derive(Debug)] +pub struct ChainImportOutcome { + /// The outcome of the block import. + pub outcome: BlockImportOutcome, + /// The chain of blocks that were imported. + pub chain: Vec, +} diff --git a/crates/engine/src/future/mod.rs b/crates/engine/src/future/mod.rs index 0ed91004..630d4815 100644 --- a/crates/engine/src/future/mod.rs +++ b/crates/engine/src/future/mod.rs @@ -3,20 +3,21 @@ use crate::{api::*, ForkchoiceState}; use alloy_provider::Provider; use alloy_rpc_types_engine::{ - ExecutionData, ExecutionPayloadV1, ForkchoiceState as AlloyForkchoiceState, PayloadStatusEnum, + ExecutionData, ExecutionPayloadV1, ForkchoiceState as AlloyForkchoiceState, ForkchoiceUpdated, + PayloadStatusEnum, }; use eyre::Result; use reth_scroll_engine_primitives::try_into_block; use reth_scroll_primitives::ScrollBlock; use rollup_node_primitives::{ - BatchInfo, BlockInfo, L2BlockInfoWithL1Messages, MeteredFuture, + BatchInfo, BlockInfo, ChainImport, L2BlockInfoWithL1Messages, MeteredFuture, ScrollPayloadAttributesWithBatchInfo, }; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_alloy_provider::ScrollEngineApi; use scroll_alloy_rpc_types_engine::ScrollPayloadAttributes; -use scroll_network::{BlockImportOutcome, NewBlockWithPeer}; +use scroll_network::BlockImportOutcome; use std::{ future::Future, pin::Pin, @@ -30,7 +31,7 @@ mod result; pub(crate) use result::EngineDriverFutureResult; /// A future that represents a block import job. -type BlockImportFuture = Pin< +type ChainImportFuture = Pin< Box< dyn Future< Output = Result< @@ -41,6 +42,10 @@ type BlockImportFuture = Pin< >, >; +/// A future that represents an L1 consolidation job. +type L1ConsolidationFuture = + Pin> + Send>>; + /// An enum that represents the different outcomes of an L1 consolidation job. #[derive(Debug, Clone)] pub enum ConsolidationOutcome { @@ -77,10 +82,6 @@ impl ConsolidationOutcome { } } -/// A future that represents an L1 consolidation job. -type L1ConsolidationFuture = - Pin> + Send>>; - /// A future that represents a new payload processing. type NewPayloadFuture = Pin> + Send>>; @@ -89,25 +90,36 @@ type NewPayloadFuture = pub(crate) type BuildNewPayloadFuture = MeteredFuture> + Send>>>; +/// A future that represents a new payload building job. +pub(crate) type OptimisticSyncFuture = + Pin> + Send>>; + /// An enum that represents the different types of futures that can be executed on the engine API. /// It can be a block import job, an L1 consolidation job, or a new payload processing. pub(crate) enum EngineFuture { - BlockImport(BlockImportFuture), + ChainImport(ChainImportFuture), L1Consolidation(L1ConsolidationFuture), NewPayload(NewPayloadFuture), + OptimisticSync(OptimisticSyncFuture), } impl EngineFuture { - /// Creates a new [`EngineFuture::BlockImport`] future from the provided parameters. - pub(crate) fn block_import( + pub(crate) fn chain_import( client: Arc, - block_with_peer: NewBlockWithPeer, + chain_import: ChainImport, fcs: AlloyForkchoiceState, ) -> Self where EC: ScrollEngineApi + Unpin + Send + Sync + 'static, { - Self::BlockImport(Box::pin(handle_execution_payload(client, block_with_peer, fcs))) + Self::ChainImport(Box::pin(handle_chain_import(client, chain_import, fcs))) + } + + pub(crate) fn optimistic_sync(client: Arc, fcs: AlloyForkchoiceState) -> Self + where + EC: ScrollEngineApi + Unpin + Send + Sync + 'static, + { + Self::OptimisticSync(Box::pin(forkchoice_updated(client, fcs, None))) } /// Creates a new [`EngineFuture::L1Consolidation`] future from the provided parameters. @@ -150,9 +162,10 @@ impl Future for EngineFuture { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); match this { - Self::BlockImport(fut) => fut.as_mut().poll(cx).map(Into::into), + Self::ChainImport(fut) => fut.as_mut().poll(cx).map(Into::into), Self::L1Consolidation(fut) => fut.as_mut().poll(cx).map(Into::into), Self::NewPayload(fut) => fut.as_mut().poll(cx).map(Into::into), + Self::OptimisticSync(fut) => fut.as_mut().poll(cx).map(Into::into), } } } @@ -162,42 +175,48 @@ impl Future for EngineFuture { /// - Sets the current fork choice for the EL via `engine_forkchoiceUpdatedV1`. #[instrument(skip_all, level = "trace", fields( - peer_id = %block_with_peer.peer_id, - block_hash = %block_with_peer.block.hash_slow(), + peer_id = %chain_import.peer_id, + block_hash = %chain_import.chain.last().unwrap().hash_slow(), fcs = ?fcs ) )] -async fn handle_execution_payload( +async fn handle_chain_import( client: Arc, - block_with_peer: NewBlockWithPeer, + chain_import: ChainImport, mut fcs: AlloyForkchoiceState, ) -> Result<(Option, Option, PayloadStatusEnum), EngineDriverError> where EC: ScrollEngineApi + Unpin + Send + Sync + 'static, { - tracing::trace!(target: "scroll::engine::future", ?fcs, ?block_with_peer, "handling execution payload"); + tracing::trace!(target: "scroll::engine::future", ?fcs, ?chain_import.peer_id, chain = ?chain_import.chain.last().unwrap().hash_slow(), "handling execution payload"); - // Unpack the block with peer. - let NewBlockWithPeer { peer_id, block, signature } = block_with_peer; + let ChainImport { chain, peer_id, signature } = chain_import; - // Extract the block info from the payload. - let block_info: BlockInfo = (&block).into(); + // Extract the block info from the last payload. + let head = chain.last().unwrap().clone(); - // Create the execution payload. - let payload = ExecutionPayloadV1::from_block_slow(&block); + let mut payload_status = None; + for block in chain { + // Create the execution payload. + let payload = ExecutionPayloadV1::from_block_slow(&block); - // Issue the new payload to the EN. - let payload_status = new_payload(client.clone(), payload).await?; + // Issue the new payload to the EN. + let status = new_payload(client.clone(), payload).await?; - // Check if the payload is invalid and return early. - if let PayloadStatusEnum::Invalid { validation_error } = payload_status.clone() { - tracing::error!(target: "scroll::engine", ?validation_error, "execution payload is invalid"); + // Check if the payload is invalid and return early. + if let PayloadStatusEnum::Invalid { ref validation_error } = status { + tracing::error!(target: "scroll::engine", ?validation_error, "execution payload is invalid"); + + // If the payload is invalid, return early. + return Ok((None, Some(BlockImportOutcome::invalid_block(peer_id)), status)); + } - // If the payload is invalid, return early. - return Ok((None, Some(BlockImportOutcome::invalid_block(peer_id)), payload_status)); + payload_status = Some(status); } + let payload_status = payload_status.unwrap(); // Update the fork choice state with the new block hash. + let block_info: BlockInfo = (&head).into(); fcs.head_block_hash = block_info.hash; // Invoke the FCU with the new state. @@ -209,7 +228,7 @@ where Some(block_info), Some(BlockImportOutcome::valid_block( peer_id, - block, + head, Into::>::into(signature).into(), )), PayloadStatusEnum::Valid, diff --git a/crates/engine/src/future/result.rs b/crates/engine/src/future/result.rs index ffd113ba..e1b966ad 100644 --- a/crates/engine/src/future/result.rs +++ b/crates/engine/src/future/result.rs @@ -10,6 +10,7 @@ pub(crate) enum EngineDriverFutureResult { ), L1Consolidation(Result), PayloadBuildingJob(Result), + OptimisticSync(Result), } impl @@ -41,3 +42,9 @@ impl From> for EngineDriverFutureResult { Self::PayloadBuildingJob(value) } } + +impl From> for EngineDriverFutureResult { + fn from(value: Result) -> Self { + Self::OptimisticSync(value) + } +} diff --git a/crates/indexer/Cargo.toml b/crates/indexer/Cargo.toml index b761deb6..93c62e07 100644 --- a/crates/indexer/Cargo.toml +++ b/crates/indexer/Cargo.toml @@ -11,7 +11,10 @@ workspace = true [dependencies] # alloy +alloy-consensus = { workspace = true } +alloy-eips = { workspace = true } alloy-primitives.workspace = true +alloy-provider.workspace = true # rollup-node scroll-db.workspace = true @@ -19,11 +22,17 @@ rollup-node-primitives.workspace = true rollup-node-watcher.workspace = true # scroll +reth-scroll-primitives.workspace = true scroll-alloy-consensus.workspace = true scroll-alloy-hardforks.workspace = true +scroll-alloy-network.workspace = true +scroll-network.workspace = true # reth reth-chainspec.workspace = true +reth-network-p2p = { git = "https://github.com/scroll-tech/reth.git", default-features = false } +reth-network-peers.workspace = true +reth-primitives-traits.workspace = true # misc futures.workspace = true @@ -31,10 +40,14 @@ metrics.workspace = true metrics-derive.workspace = true strum = "0.27.1" thiserror.workspace = true +tracing.workspace = true tokio.workspace = true [dev-dependencies] +alloy-consensus = { workspace = true, features = ["arbitrary"] } alloy-primitives = { workspace = true, features = ["arbitrary"] } +alloy-rpc-client.workspace = true +alloy-transport.workspace = true # rollup-node scroll-db = { workspace = true, features = ["test-utils"] } @@ -44,8 +57,15 @@ rollup-node-primitives = { workspace = true, features = ["arbitrary"] } reth-scroll-chainspec.workspace = true reth-scroll-forks.workspace = true +# reth +reth-eth-wire-types.workspace = true +reth-network-peers.workspace = true + # misc arbitrary.workspace = true futures.workspace = true +parking_lot.workspace = true rand.workspace = true +reqwest.workspace = true +serde_json = { version = "1.0" } tokio.workspace = true diff --git a/crates/indexer/src/action.rs b/crates/indexer/src/action.rs index 6c889925..21be9f93 100644 --- a/crates/indexer/src/action.rs +++ b/crates/indexer/src/action.rs @@ -1,4 +1,4 @@ -use super::{IndexerError, IndexerEvent}; +use super::{ChainOrchestratorEvent, IndexerError}; use std::{ fmt, future::Future, @@ -8,7 +8,7 @@ use std::{ /// A future that resolves to a `Result`. pub(super) type PendingIndexerFuture = - Pin> + Send>>; + Pin> + Send>>; /// A type that represents a future that is being executed by the indexer. pub(super) enum IndexerFuture { @@ -18,6 +18,7 @@ pub(super) enum IndexerFuture { HandleBatchFinalization(PendingIndexerFuture), HandleL1Message(PendingIndexerFuture), HandleDerivedBlock(PendingIndexerFuture), + HandleL2Block(PendingIndexerFuture), } impl IndexerFuture { @@ -25,14 +26,15 @@ impl IndexerFuture { pub(super) fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { match self { Self::HandleReorg(fut) | Self::HandleFinalized(fut) | Self::HandleBatchCommit(fut) | Self::HandleBatchFinalization(fut) | Self::HandleL1Message(fut) | - Self::HandleDerivedBlock(fut) => fut.as_mut().poll(cx), + Self::HandleDerivedBlock(fut) | + Self::HandleL2Block(fut) => fut.as_mut().poll(cx), } } } @@ -48,6 +50,7 @@ impl fmt::Debug for IndexerFuture { Self::HandleBatchFinalization(_) => write!(f, "HandleBatchFinalization"), Self::HandleL1Message(_) => write!(f, "HandleL1Message"), Self::HandleDerivedBlock(_) => write!(f, "HandleDerivedBlock"), + Self::HandleL2Block(_) => write!(f, "HandleL2Block"), } } } diff --git a/crates/indexer/src/error.rs b/crates/indexer/src/error.rs index 5552440f..b1597516 100644 --- a/crates/indexer/src/error.rs +++ b/crates/indexer/src/error.rs @@ -1,3 +1,4 @@ +use alloy_primitives::B256; use scroll_db::DatabaseError; /// A type that represents an error that occurred during indexing. @@ -9,4 +10,24 @@ pub enum IndexerError { /// An error occurred while trying to fetch the L2 block from the database. #[error("L2 block not found - block number: {0}")] L2BlockNotFound(u64), + /// A fork was received from the peer that is associated with a reorg of the safe chain. + #[error("L2 safe block reorg detected")] + L2SafeBlockReorgDetected, + /// A block contains invalid L1 messages. + #[error("Block contains invalid L1 message. Expected: {expected:?}, Actual: {actual:?}")] + L1MessageMismatch { + /// The expected L1 messages hash. + expected: B256, + /// The actual L1 messages hash. + actual: B256, + }, + /// An inconsistency was detected when trying to consolidate the chain. + #[error("Chain inconsistency detected")] + ChainInconsistency, + /// The peer did not provide the requested block header. + #[error("A peer did not provide the requested block header")] + MissingBlockHeader { + /// The hash of the block header that was requested. + hash: B256, + }, } diff --git a/crates/indexer/src/event.rs b/crates/indexer/src/event.rs index 008996a7..3189024f 100644 --- a/crates/indexer/src/event.rs +++ b/crates/indexer/src/event.rs @@ -1,21 +1,46 @@ -use alloy_primitives::B256; -use rollup_node_primitives::{BatchInfo, BlockInfo, L2BlockInfoWithL1Messages}; +use alloy_consensus::Header; +use alloy_primitives::{Signature, B256}; +use reth_network_peers::PeerId; +use reth_scroll_primitives::ScrollBlock; +use rollup_node_primitives::{BatchInfo, BlockInfo, ChainImport, L2BlockInfoWithL1Messages}; -/// An event emitted by the indexer. +/// An event emitted by the `ChainOrchestrator`. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum IndexerEvent { - /// A `BatchCommit` event has been indexed returning the batch info. - BatchCommitIndexed(BatchInfo), - /// A `BatchFinalization` event has been indexed returning the batch hash and new finalized L2 - /// block. - BatchFinalizationIndexed(B256, Option), - /// A `Finalized` event has been indexed returning the block number and new finalized L2 - /// block. - FinalizedIndexed(u64, Option), - /// A `L1Message` event has been indexed returning the message queue index. - L1MessageIndexed(u64), - /// A `Unwind` event has been indexed returning the reorg block number. - UnwindIndexed { +pub enum ChainOrchestratorEvent { + /// A new block has been received from the network but we have insufficient data to process it + /// due to being in optimistic mode. + InsufficientDataForReceivedBlock(B256), + /// The block that we have received is already known. + BlockAlreadyKnown(B256, PeerId), + /// A fork of the chain that is older than the current chain has been received. + OldForkReceived { + /// The headers of the old fork. + headers: Vec

, + /// The peer that provided the old fork. + peer_id: PeerId, + /// The signature of the old fork. + signature: Signature, + }, + /// The chain should be optimistically synced to the provided block. + OptimisticSync(ScrollBlock), + /// The chain has been extended, returning the new blocks. + ChainExtended(ChainImport), + /// The chain has reorged, returning the new chain and the peer that provided them. + ChainReorged(ChainImport), + /// A batch has been committed returning the batch info. + BatchCommitted(BatchInfo), + /// A batch has been finalized returning the batch hash and new an optional finalized + /// L2 block. + BatchFinalized(B256, Option), + /// An L1 block has been finalized returning the L1 block number and an optional + /// finalized L2 block. + L1BlockFinalized(u64, Option), + /// A `L1Message` event has been committed returning the message queue index. + L1MessageCommitted(u64), + /// The chain has been unwound, returning the L1 block number of the new L1 head, + /// the L1 message queue index of the new L1 head, and optionally the L2 head and safe block + /// info if the unwind resulted in a new L2 head or safe block. + ChainUnwound { /// The L1 block number of the new L1 head. l1_block_number: u64, /// The L1 message queue index of the new L1 head. @@ -25,6 +50,7 @@ pub enum IndexerEvent { /// The L2 safe block info. l2_safe_block_info: Option, }, - /// A block has been indexed returning batch and block info. - BlockIndexed(L2BlockInfoWithL1Messages, Option), + /// An L2 block has been committed returning the [`L2BlockInfoWithL1Messages`] and an + /// optional [`BatchInfo`] if the block is associated with a committed batch. + L2BlockCommitted(L2BlockInfoWithL1Messages, Option), } diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs index 0827fe89..b6a48d08 100644 --- a/crates/indexer/src/lib.rs +++ b/crates/indexer/src/lib.rs @@ -1,21 +1,29 @@ //! A library responsible for indexing data relevant to the L1. +use alloy_consensus::Header; +use alloy_eips::{BlockHashOrNumber, Encodable2718}; use alloy_primitives::{b256, keccak256, B256}; -use futures::Stream; +use alloy_provider::Provider; +use futures::{task::AtomicWaker, Stream, StreamExt, TryStreamExt}; use reth_chainspec::EthChainSpec; +use reth_network_p2p::{BlockClient, BodiesClient}; +use reth_scroll_primitives::ScrollBlock; use rollup_node_primitives::{ - BatchCommitData, BatchInfo, BlockInfo, L1MessageEnvelope, L2BlockInfoWithL1Messages, + BatchCommitData, BatchInfo, BlockInfo, BoundedVec, ChainImport, L1MessageEnvelope, + L2BlockInfoWithL1Messages, }; use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; use scroll_alloy_hardforks::{ScrollHardfork, ScrollHardforks}; -use scroll_db::{Database, DatabaseError, DatabaseOperations, UnwindResult}; +use scroll_alloy_network::Scroll; +use scroll_db::{Database, DatabaseError, DatabaseOperations, L1MessageStart, UnwindResult}; +use scroll_network::NewBlockWithPeer; use std::{ collections::{HashMap, VecDeque}, pin::Pin, sync::{ atomic::{AtomicU64, Ordering}, - Arc, + Arc, Mutex, }, task::{Context, Poll}, time::Instant, @@ -26,7 +34,7 @@ mod action; use action::{IndexerFuture, PendingIndexerFuture}; mod event; -pub use event::IndexerEvent; +pub use event::ChainOrchestratorEvent; mod error; pub use error::IndexerError; @@ -37,9 +45,29 @@ pub use metrics::{IndexerItem, IndexerMetrics}; const L1_MESSAGE_QUEUE_HASH_MASK: B256 = b256!("ffffffffffffffffffffffffffffffffffffffffffffffffffffffff00000000"); +/// The number of block headers we keep in memory. +const OPTIMISTIC_CHAIN_BUFFER_SIZE: usize = 2000; + +/// The maximum number of blocks we keep in memory for the consolidated chain (This is the full +/// unsafe chain). Memory requirements = ~800 bytes per header * `600_000` headers = 480 MB. +// TODO: This is just a temporary hack we will transition to a VecDeque when not in synced mode. +const CONSOLIDATED_CHAIN_BUFFER_SIZE: usize = 300_000; + +/// The threshold for optimistic syncing. If the received block is more than this many blocks +/// ahead of the current chain, we optimistically sync the chain. +const OPTIMISTIC_SYNC_THRESHOLD: u64 = 100; + +type Chain = BoundedVec
; + /// The indexer is responsible for indexing data relevant to the L1. #[derive(Debug)] -pub struct Indexer { +pub struct ChainOrchestrator { + /// The `BlockClient` that is used to fetch blocks from peers over p2p. + network_client: Arc, + /// The L2 client that is used to interact with the L2 chain. + l2_client: Arc

, + /// An in-memory representation of the optimistic chain we are following. + chain: Arc>, /// A reference to the database used to persist the indexed data. database: Arc, /// A queue of pending futures. @@ -52,12 +80,32 @@ pub struct Indexer { chain_spec: Arc, /// The metrics for the indexer. metrics: HashMap, + /// A boolean to represent if the [`ChainOrchestrator`] is in optimistic mode. + optimistic_mode: Arc>, + /// A boolean to represent if the L1 has been synced. + l1_synced: bool, + /// The waker to notify when the engine driver should be polled. + waker: AtomicWaker, } -impl Indexer { +impl< + ChainSpec: ScrollHardforks + EthChainSpec + Send + Sync + 'static, + BC: BlockClient + Send + Sync + 'static, + P: Provider + 'static, + > ChainOrchestrator +{ /// Creates a new indexer with the given [`Database`]. - pub fn new(database: Arc, chain_spec: Arc) -> Self { + pub async fn new( + database: Arc, + chain_spec: Arc, + block_client: BC, + l2_client: P, + ) -> Self { + let chain = init_chain_from_db(&database, &l2_client).await; Self { + network_client: Arc::new(block_client), + l2_client: Arc::new(l2_client), + chain: Arc::new(Mutex::new(chain)), database, pending_futures: Default::default(), l1_finalized_block_number: Arc::new(AtomicU64::new(0)), @@ -69,6 +117,9 @@ impl Indexer< (i, IndexerMetrics::new_with_labels(&[("item", label)])) }) .collect(), + optimistic_mode: Arc::new(Mutex::new(false)), + l1_synced: false, + waker: AtomicWaker::new(), } } @@ -88,18 +139,316 @@ impl Indexer< fut_wrapper } - /// Handles an L2 block. - pub fn handle_block( + /// Sets the L1 synced status to the provided value. + pub fn set_l1_synced_status(&mut self, l1_synced: bool) { + self.l1_synced = l1_synced; + } + + /// Handles a new block received from a peer. + pub fn handle_block_from_peer(&mut self, block_with_peer: NewBlockWithPeer) { + let chain = self.chain.clone(); + let l2_client = self.l2_client.clone(); + let optimistic_mode = self.optimistic_mode.clone(); + let network_client = self.network_client.clone(); + let database = self.database.clone(); + let fut = self.handle_metered( + IndexerItem::NewBlock, + Box::pin(async move { + Self::handle_new_block( + chain, + l2_client, + optimistic_mode, + network_client, + database, + block_with_peer, + ) + .await + }), + ); + self.pending_futures.push_back(IndexerFuture::HandleL2Block(fut)); + self.waker.wake(); + } + + /// Handles a new block received from the network. + pub async fn handle_new_block( + chain: Arc>, + _l2_client: Arc

, + optimistic_mode: Arc>, + network_client: Arc, + database: Arc, + block_with_peer: NewBlockWithPeer, + ) -> Result { + let NewBlockWithPeer { block: received_block, peer_id, signature } = block_with_peer; + let mut current_chain_headers = chain.lock().unwrap().clone().into_inner(); + let max_block_number = current_chain_headers.back().expect("chain can not be empty").number; + let min_block_number = + current_chain_headers.front().expect("chain can not be empty").number; + let optimistic_mode_local: bool = { + let guard = optimistic_mode.lock().unwrap(); + *guard + }; + + // TODO: remove database lookups. + + // If the received block has a block number that is greater than the tip + // of the chain by the optimistic sync threshold, we optimistically sync the chain and + // update the in-memory buffer. + if (received_block.header.number - max_block_number) >= OPTIMISTIC_SYNC_THRESHOLD { + // fetch the latest `OPTIMISTIC_CHAIN_BUFFER_SIZE` blocks from the network for the + // optimistic chain. + let mut optimistic_headers = vec![received_block.header.clone()]; + while optimistic_headers.len() < OPTIMISTIC_CHAIN_BUFFER_SIZE && + optimistic_headers.last().unwrap().number != 0 + { + tracing::trace!(target: "scroll::watcher", number = ?(optimistic_headers.last().unwrap().number - 1), "fetching block"); + let header = network_client + .get_header(BlockHashOrNumber::Hash( + optimistic_headers.last().unwrap().parent_hash, + )) + .await + .unwrap() + .into_data() + .unwrap(); + optimistic_headers.push(header); + } + optimistic_headers.reverse(); + let mut new_chain = Chain::new(OPTIMISTIC_CHAIN_BUFFER_SIZE); + new_chain.extend(optimistic_headers); + *chain.lock().unwrap() = new_chain; + *optimistic_mode.lock().unwrap() = true; + return Ok(ChainOrchestratorEvent::OptimisticSync(received_block)); + } + + // Check if we have already have this block in memory. + if received_block.number <= max_block_number && + received_block.number >= min_block_number && + current_chain_headers.iter().any(|h| h == &received_block.header) + { + tracing::debug!(target: "scroll::watcher", block_hash = ?received_block.header.hash_slow(), "block already in chain"); + return Ok(ChainOrchestratorEvent::BlockAlreadyKnown( + received_block.header.hash_slow(), + peer_id, + )); + } + + // If we are in optimistic mode, we return an event indicating that we have insufficient + // data to process the block as we are optimistically syncing the chain. + if optimistic_mode_local && (received_block.header.number <= min_block_number) { + return Ok(ChainOrchestratorEvent::InsufficientDataForReceivedBlock( + received_block.header.hash_slow(), + )); + }; + + let mut new_chain_headers = vec![received_block.header.clone()]; + let mut new_header_tail = received_block.header.clone(); + + // We should never have a re-org that is deeper than the current safe head. + let (latest_safe_block, _) = + database.get_latest_safe_l2_info().await?.expect("safe block must exist"); + + // We search for the re-org index in the in-memory chain or the database. + let reorg_index = { + loop { + // If the new header tail has a block number that is less than the current header + // tail then we should fetch more blocks for the current header chain to aid + // reconciliation. + if new_header_tail.number <= + current_chain_headers.back().expect("chain can not be empty").number + { + for _ in 0..50 { + if new_header_tail.number.saturating_sub(1) < latest_safe_block.number { + tracing::info!(target: "scroll::chain", hash = %latest_safe_block.hash, number = %latest_safe_block.number, "reached safe block number - terminating fetching."); + break; + } + tracing::trace!(target: "scroll::watcher", number = ?(new_header_tail.number - 1), "fetching block"); + if let Some(header) = network_client + .get_header(BlockHashOrNumber::Hash(new_header_tail.parent_hash)) + .await + .unwrap() + .into_data() + { + new_chain_headers.push(header.clone()); + new_header_tail = header; + } else { + return Err(IndexerError::MissingBlockHeader { + hash: new_header_tail.parent_hash, + }); + } + } + } + + // If the current header block number is greater than the in-memory chain then we + // should search the in-memory chain (we keep the latest + // `OPTIMISTIC_CHAIN_BUFFER_SIZE` headers in memory). + if let Some(pos) = current_chain_headers + .iter() + .rposition(|h| h.hash_slow() == new_header_tail.parent_hash) + { + // If the received fork is older than the current chain, we return an event + // indicating that we have received an old fork. + if (pos < current_chain_headers.len() - 1) && + current_chain_headers.get(pos + 1).unwrap().timestamp >= + new_header_tail.timestamp + { + return Ok(ChainOrchestratorEvent::OldForkReceived { + headers: new_chain_headers, + peer_id, + signature, + }); + } + break Some(pos); + } + + // If we are in optimistic mode, we terminate the search as we don't have the + // necessary data from L1 consolidation yet. This is fine because very deep + // re-orgs are rare and in any case will be resolved once optimistic sync is + // completed. If the current header block number is less than the + // latest safe block number then this would suggest a reorg of a + // safe block which is not invalid - terminate the search. + if optimistic_mode_local && + (new_header_tail.number <= + current_chain_headers + .front() + .expect("chain must not be empty") + .number) + { + if received_block.timestamp > + current_chain_headers.back().expect("chain can not be empty").timestamp + { + tracing::debug!(target: "scroll::watcher", block_hash = ?received_block.header.hash_slow(), "received block is ahead of the current chain"); + while new_chain_headers.len() < OPTIMISTIC_CHAIN_BUFFER_SIZE && + new_chain_headers.last().unwrap().number != 0 + { + tracing::trace!(target: "scroll::watcher", number = ?(new_chain_headers.last().unwrap().number - 1), "fetching block"); + let header = network_client + .get_header(BlockHashOrNumber::Hash( + new_chain_headers.last().unwrap().parent_hash, + )) + .await + .unwrap() + .into_data() + .unwrap(); + new_chain_headers.push(header); + } + break None; + } + return Ok(ChainOrchestratorEvent::InsufficientDataForReceivedBlock( + received_block.header.hash_slow(), + )); + } + + // If the current header block number is less than the latest safe block number then + // we should error. + if new_header_tail.number <= latest_safe_block.number { + return Err(IndexerError::L2SafeBlockReorgDetected); + } + + tracing::trace!(target: "scroll::watcher", number = ?(new_header_tail.number - 1), "fetching block"); + if let Some(header) = network_client + .get_header(BlockHashOrNumber::Hash(new_header_tail.parent_hash)) + .await + .unwrap() + .into_data() + { + // TODO: what do we do when peers don't have the blocks? We can't recreate the + // chain so we should terminate here. We should be able to reconcile this gap in + // a future block. + new_chain_headers.push(header.clone()); + new_header_tail = header; + } else { + return Err(IndexerError::MissingBlockHeader { + hash: new_header_tail.parent_hash, + }); + } + } + }; + + // Reverse the new chain headers to have them in the correct order. + new_chain_headers.reverse(); + + // Fetch the blocks associated with the new chain headers. + let new_blocks = if new_chain_headers.len() == 1 { + vec![received_block] + } else { + fetch_blocks(new_chain_headers.clone(), network_client.clone()).await + }; + + // If we are not in optimistic mode, we validate the L1 messages in the new blocks. + if !optimistic_mode_local { + validate_l1_messages(&new_blocks, database.clone()).await?; + } + + match reorg_index { + // If this is a simple chain extension, we can just extend the in-memory chain and emit + // a ChainExtended event. + Some(index) if index == current_chain_headers.len() - 1 => { + // Update the chain with the new blocks. + current_chain_headers.extend(new_blocks.iter().map(|b| b.header.clone())); + let mut new_chain = Chain::new(OPTIMISTIC_CHAIN_BUFFER_SIZE); + new_chain.extend(current_chain_headers); + *chain.lock().unwrap() = new_chain; + + Ok(ChainOrchestratorEvent::ChainExtended(ChainImport::new( + new_blocks, peer_id, signature, + ))) + } + // If we are re-organizing the in-memory chain, we need to split the chain at the reorg + // point and extend it with the new blocks. + Some(position) => { + // reorg the in-memory chain to the new chain and issue a reorg event. + let mut new_chain = Chain::new(OPTIMISTIC_CHAIN_BUFFER_SIZE); + new_chain.extend(current_chain_headers.iter().take(position).cloned()); + new_chain.extend(new_chain_headers); + *chain.lock().unwrap() = new_chain; + + Ok(ChainOrchestratorEvent::ChainReorged(ChainImport::new( + new_blocks, peer_id, signature, + ))) + } + None => { + let mut new_chain = Chain::new(OPTIMISTIC_CHAIN_BUFFER_SIZE); + new_chain.extend(new_chain_headers); + *chain.lock().unwrap() = new_chain; + *optimistic_mode.lock().unwrap() = true; + + Ok(ChainOrchestratorEvent::OptimisticSync( + new_blocks.last().cloned().expect("new_blocks should not be empty"), + )) + } + } + } + + /// Inserts an L2 block in the database. + pub fn consolidate_l2_blocks( &mut self, - block_info: L2BlockInfoWithL1Messages, + block_info: Vec, batch_info: Option, ) { let database = self.database.clone(); + let l1_synced = self.l1_synced; + let optimistic_mode = self.optimistic_mode.clone(); + let chain = self.chain.clone(); + let l2_client = self.l2_client.clone(); let fut = self.handle_metered( - IndexerItem::L2Block, + IndexerItem::InsertL2Block, Box::pin(async move { - database.insert_block(block_info.clone(), batch_info).await?; - Result::<_, IndexerError>::Ok(IndexerEvent::BlockIndexed(block_info, batch_info)) + // If we are in optimistic mode and the L1 is synced, we consolidate the chain and + // disable optimistic mode. + if l1_synced && *optimistic_mode.lock().unwrap() { + consolidate_chain(database.clone(), block_info.clone(), chain, l2_client) + .await?; + *optimistic_mode.lock().unwrap() = false; + } + + // If we are consolidating a batch, we insert the batch info into the database. + let head = block_info.last().expect("block info must not be empty").clone(); + if batch_info.is_some() { + database.insert_blocks(block_info, batch_info).await?; + } + + Result::<_, IndexerError>::Ok(ChainOrchestratorEvent::L2BlockCommitted( + head, batch_info, + )) }), ); @@ -170,12 +519,12 @@ impl Indexer< database: Arc, chain_spec: Arc, l1_block_number: u64, - ) -> Result { + ) -> Result { let txn = database.tx().await?; let UnwindResult { l1_block_number, queue_index, l2_head_block_info, l2_safe_block_info } = txn.unwind(chain_spec.genesis_hash(), l1_block_number).await?; txn.commit().await?; - Ok(IndexerEvent::UnwindIndexed { + Ok(ChainOrchestratorEvent::ChainUnwound { l1_block_number, queue_index, l2_head_block_info, @@ -190,7 +539,7 @@ impl Indexer< block_number: u64, l1_block_number: Arc, l2_block_number: Arc, - ) -> Result { + ) -> Result { // Set the latest finalized L1 block in the database. database.set_latest_finalized_l1_block_number(block_number).await?; @@ -207,7 +556,7 @@ impl Indexer< // update the indexer l1 block number. l1_block_number.store(block_number, Ordering::Relaxed); - Ok(IndexerEvent::FinalizedIndexed(block_number, finalized_block)) + Ok(ChainOrchestratorEvent::L1BlockFinalized(block_number, finalized_block)) } /// Handles an L1 message by inserting it into the database. @@ -217,8 +566,8 @@ impl Indexer< l1_message: TxL1Message, l1_block_number: u64, block_timestamp: u64, - ) -> Result { - let event = IndexerEvent::L1MessageIndexed(l1_message.queue_index); + ) -> Result { + let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index); let queue_hash = if chain_spec .scroll_fork_activation(ScrollHardfork::EuclidV2) @@ -248,8 +597,8 @@ impl Indexer< async fn handle_batch_commit( database: Arc, batch: BatchCommitData, - ) -> Result { - let event = IndexerEvent::BatchCommitIndexed(BatchInfo::new(batch.index, batch.hash)); + ) -> Result { + let event = ChainOrchestratorEvent::BatchCommitted(BatchInfo::new(batch.index, batch.hash)); database.insert_batch(batch).await?; Ok(event) } @@ -261,7 +610,7 @@ impl Indexer< block_number: u64, l1_block_number: Arc, l2_block_number: Arc, - ) -> Result { + ) -> Result { // finalized the batch. database.finalize_batch(batch_hash, block_number).await?; @@ -274,7 +623,7 @@ impl Indexer< Self::fetch_highest_finalized_block(database, batch_hash, l2_block_number).await?; } - let event = IndexerEvent::BatchFinalizationIndexed(batch_hash, finalized_block); + let event = ChainOrchestratorEvent::BatchFinalized(batch_hash, finalized_block); Ok(event) } @@ -301,12 +650,38 @@ impl Indexer< } } +async fn init_chain_from_db + 'static>( + database: &Arc, + l2_client: &P, +) -> BoundedVec

{ + let blocks = { + let mut blocks = Vec::with_capacity(OPTIMISTIC_CHAIN_BUFFER_SIZE); + let mut blocks_stream = + database.get_l2_blocks().await.unwrap().take(OPTIMISTIC_CHAIN_BUFFER_SIZE); + while let Some(block_info) = blocks_stream.try_next().await.unwrap() { + let header = l2_client + .get_block_by_hash(block_info.hash) + .await + .unwrap() + .unwrap() + .header + .into_consensus(); + blocks.push(header); + } + blocks.reverse(); + blocks + }; + let mut chain: Chain = Chain::new(OPTIMISTIC_CHAIN_BUFFER_SIZE); + chain.extend(blocks); + chain +} + /// Unwinds the indexer by deleting all indexed data greater than the provided L1 block number. pub async fn unwind( database: Arc, chain_spec: Arc, l1_block_number: u64, -) -> Result { +) -> Result { // create a database transaction so this operation is atomic let txn = database.tx().await?; @@ -350,7 +725,7 @@ pub async fn unwind Stream for Indexer { - type Item = Result; +impl< + ChainSpec: ScrollHardforks + 'static, + BC: BlockClient + Send + Sync + 'static, + P: Provider + Send + Sync + 'static, + > Stream for ChainOrchestrator +{ + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Register the waker such that we can wake when required. + self.waker.register(cx.waker()); + // Remove and poll the next future in the queue if let Some(mut action) = self.pending_futures.pop_front() { return match action.poll(cx) { @@ -377,23 +760,348 @@ impl Stream for Indexer { } } +/// Consolidates the chain by reconciling the in-memory chain with the L2 client and database. +/// This is used to ensure that the in-memory chain is consistent with the L2 chain. +async fn consolidate_chain>( + database: Arc, + _block_info: Vec, + _chain: Arc>, + l2_client: P, +) -> Result<(), IndexerError> { + // take the current chain. + let chain = std::mem::take(&mut *_chain.lock().unwrap()); + + // Find highest common ancestor by comparing hashes + let hca_index = chain.iter().rposition(|h| { + let h_hash = h.hash_slow(); + _block_info.iter().any(|b| b.block_info.hash == h_hash) + }); + + // This means there is a deep reorg that has just occurred however in practice this should + // never happen due to purging of expired chain. This should be reconciled upon the next + // block import. + // TODO: consider this case more carefully. + if hca_index.is_none() { + // If we do not have a common ancestor, we return an error. + *_chain.lock().unwrap() = chain; + return Err(IndexerError::ChainInconsistency); + } + + // Now reconcile back to the safe head. + let safe_head = database.get_latest_l2_block().await?.expect("safe head must exist"); + let starting_block = + l2_client.get_block_by_hash(chain.first().unwrap().hash_slow()).await.unwrap().unwrap(); + let mut consolidated_chain_blocks = + vec![starting_block.into_consensus().map_transactions(|tx| tx.inner.into_inner())]; + while consolidated_chain_blocks.last().unwrap().header.parent_hash != safe_head.hash { + // Fetch the missing blocks from the L2 client. + let block = + l2_client.get_block_by_hash(chain.last().unwrap().parent_hash).await.unwrap().unwrap(); + consolidated_chain_blocks + .push(block.into_consensus().map_transactions(|tx| tx.inner.into_inner())); + + if chain.last().unwrap().number < safe_head.number { + // If we did not consolidate back to the safe head, we return an error. + *_chain.lock().unwrap() = chain; + // TODO: should we revert to the last known safe head. + return Err(IndexerError::ChainInconsistency); + } + } + + consolidated_chain_blocks.reverse(); + validate_l1_messages(&consolidated_chain_blocks, database.clone()).await?; + + let mut consolidated_chain = BoundedVec::new(CONSOLIDATED_CHAIN_BUFFER_SIZE); + consolidated_chain.extend(consolidated_chain_blocks.iter().map(|b| b.header.clone())); + consolidated_chain.extend(chain.into_inner()); + + // let unsafe_chain = + + // TODO: implement the logic to consolidate the chain. + // If we are in optimistic mode, we consolidate the chain and disable optimistic + // mode. + // let mut chain = chain.lock().unwrap(); + // if !chain.is_empty() { + // database.insert_chain(chain.drain(..).collect()).await?; + // } + Ok(()) +} + +async fn fetch_blocks + Send + Sync + 'static>( + headers: Vec
, + client: Arc, +) -> Vec { + let mut blocks = Vec::new(); + // TODO: migrate to `get_block_bodies_with_range_hint`. + let bodies = client + .get_block_bodies(headers.iter().map(|h| h.hash_slow()).collect()) + .await + .expect("Failed to fetch block bodies") + .into_data(); + + for (header, body) in headers.into_iter().zip(bodies) { + blocks.push(ScrollBlock::new(header, body)); + } + + blocks +} + +/// Validates the L1 messages in the provided blocks against the expected L1 messages synced from +/// L1. +async fn validate_l1_messages( + blocks: &[ScrollBlock], + database: Arc, +) -> Result<(), IndexerError> { + let l1_message_hashes = blocks + .iter() + .flat_map(|block| { + // Get the L1 messages from the block body. + block + .body + .transactions() + .filter(|&tx| tx.is_l1_message()) + // The hash for L1 messages is the trie hash of the transaction. + .map(|tx| tx.trie_hash()) + .collect::>() + }) + .collect::>(); + let mut l1_message_stream = database + .get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))) + .await?; + + for message_hash in l1_message_hashes { + // Get the expected L1 message from the database. + let expected_hash = l1_message_stream.next().await.unwrap().unwrap().transaction.tx_hash(); + + // If the received and expected L1 messages do not match return an error. + if message_hash != expected_hash { + return Err(IndexerError::L1MessageMismatch { + expected: expected_hash, + actual: message_hash, + }); + } + } + Ok(()) +} + #[cfg(test)] mod test { use std::vec; use super::*; - use alloy_primitives::{address, bytes, U256}; - + use alloy_consensus::Header; + use alloy_eips::{BlockHashOrNumber, BlockNumHash}; + use alloy_primitives::{address, bytes, B256, U256}; + use alloy_provider::{ProviderBuilder, RootProvider}; + use alloy_transport::mock::Asserter; use arbitrary::{Arbitrary, Unstructured}; use futures::StreamExt; + use parking_lot::Mutex; use rand::Rng; + use reth_eth_wire_types::HeadersDirection; + use reth_network_p2p::{ + download::DownloadClient, + error::PeerRequestResult, + headers::client::{HeadersClient, HeadersRequest}, + priority::Priority, + BodiesClient, + }; + use reth_network_peers::{PeerId, WithPeerId}; + use reth_primitives_traits::Block; use reth_scroll_chainspec::{ScrollChainSpec, SCROLL_MAINNET}; use rollup_node_primitives::BatchCommitData; + use scroll_alloy_network::Scroll; use scroll_db::test_utils::setup_test_db; + use std::{collections::HashMap, ops::RangeInclusive, sync::Arc}; + + type ScrollBody = ::Body; + + /// A headers+bodies client that stores the headers and bodies in memory, with an artificial + /// soft bodies response limit that is set to 20 by default. + /// + /// This full block client can be [Clone]d and shared between multiple tasks. + #[derive(Clone, Debug)] + struct TestScrollFullBlockClient { + headers: Arc>>, + bodies: Arc::Body>>>, + // soft response limit, max number of bodies to respond with + soft_limit: usize, + } + + impl Default for TestScrollFullBlockClient { + fn default() -> Self { + let mainnet_genesis: reth_scroll_primitives::ScrollBlock = + serde_json::from_str(include_str!("../testdata/genesis_block.json")).unwrap(); + let (header, body) = mainnet_genesis.split(); + let hash = header.hash_slow(); + let headers = HashMap::from([(hash, header)]); + let bodies = HashMap::from([(hash, body)]); + Self { + headers: Arc::new(Mutex::new(headers)), + bodies: Arc::new(Mutex::new(bodies)), + soft_limit: 20, + } + } + } + + // impl TestScrollFullBlockClient { + // /// Insert a header and body into the client maps. + // fn insert(&self, header: SealedHeader, body: ScrollBody) { + // let hash = header.hash(); + // self.headers.lock().insert(hash, header.unseal()); + // self.bodies.lock().insert(hash, body); + // } + + // /// Set the soft response limit. + // const fn set_soft_limit(&mut self, limit: usize) { + // self.soft_limit = limit; + // } + + // /// Get the block with the highest block number. + // fn highest_block(&self) -> Option> { + // self.headers.lock().iter().max_by_key(|(_, header)| header.number).and_then( + // |(hash, header)| { + // self.bodies.lock().get(hash).map(|body| { + // SealedBlock::from_parts_unchecked(header.clone(), body.clone(), *hash) + // }) + // }, + // ) + // } + // } + + impl DownloadClient for TestScrollFullBlockClient { + /// Reports a bad message from a specific peer. + fn report_bad_message(&self, _peer_id: PeerId) {} + + /// Retrieves the number of connected peers. + /// + /// Returns the number of connected peers in the test scenario (1). + fn num_connected_peers(&self) -> usize { + 1 + } + } - async fn setup_test_indexer() -> (Indexer, Arc) { + /// Implements the `HeadersClient` trait for the `TestFullBlockClient` struct. + impl HeadersClient for TestScrollFullBlockClient { + type Header = Header; + /// Specifies the associated output type. + type Output = futures::future::Ready>>; + + /// Retrieves headers with a given priority level. + /// + /// # Arguments + /// + /// * `request` - A `HeadersRequest` indicating the headers to retrieve. + /// * `_priority` - A `Priority` level for the request. + /// + /// # Returns + /// + /// A `Ready` future containing a `PeerRequestResult` with a vector of retrieved headers. + fn get_headers_with_priority( + &self, + request: HeadersRequest, + _priority: Priority, + ) -> Self::Output { + let headers = self.headers.lock(); + + // Initializes the block hash or number. + let mut block: BlockHashOrNumber = match request.start { + BlockHashOrNumber::Hash(hash) => headers.get(&hash).cloned(), + BlockHashOrNumber::Number(num) => { + headers.values().find(|h| h.number == num).cloned() + } + } + .map(|h| h.number.into()) + .unwrap(); + + // Retrieves headers based on the provided limit and request direction. + let resp = (0..request.limit) + .filter_map(|_| { + headers.iter().find_map(|(hash, header)| { + // Checks if the header matches the specified block or number. + BlockNumHash::new(header.number, *hash).matches_block_or_num(&block).then( + || { + match request.direction { + HeadersDirection::Falling => block = header.parent_hash.into(), + HeadersDirection::Rising => block = (header.number + 1).into(), + } + header.clone() + }, + ) + }) + }) + .collect::>(); + + // Returns a future containing the retrieved headers with a random peer ID. + futures::future::ready(Ok(WithPeerId::new(PeerId::random(), resp))) + } + } + + /// Implements the `BodiesClient` trait for the `TestFullBlockClient` struct. + impl BodiesClient for TestScrollFullBlockClient { + type Body = ScrollBody; + /// Defines the output type of the function. + type Output = futures::future::Ready>>; + + /// Retrieves block bodies corresponding to provided hashes with a given priority. + /// + /// # Arguments + /// + /// * `hashes` - A vector of block hashes to retrieve bodies for. + /// * `_priority` - Priority level for block body retrieval (unused in this implementation). + /// + /// # Returns + /// + /// A future containing the result of the block body retrieval operation. + fn get_block_bodies_with_priority_and_range_hint( + &self, + hashes: Vec, + _priority: Priority, + _range_hint: Option>, + ) -> Self::Output { + // Acquire a lock on the bodies. + let bodies = self.bodies.lock(); + + // Create a future that immediately returns the result of the block body retrieval + // operation. + futures::future::ready(Ok(WithPeerId::new( + PeerId::random(), + hashes + .iter() + .filter_map(|hash| bodies.get(hash).cloned()) + .take(self.soft_limit) + .collect(), + ))) + } + } + + impl BlockClient for TestScrollFullBlockClient { + type Block = ScrollBlock; + } + + async fn setup_test_indexer() -> ( + ChainOrchestrator>, + Arc, + ) { + // Get a provider to the node. + // TODO: update to use a real node URL. + let assertor = Asserter::new(); + let mainnet_genesis: ::BlockResponse = + serde_json::from_str(include_str!("../testdata/genesis_block_rpc.json")) + .expect("Failed to parse mainnet genesis block"); + assertor.push_success(&mainnet_genesis); + let provider = ProviderBuilder::<_, _, Scroll>::default().connect_mocked_client(assertor); let db = Arc::new(setup_test_db().await); - (Indexer::new(db.clone(), SCROLL_MAINNET.clone()), db) + ( + ChainOrchestrator::new( + db.clone(), + SCROLL_MAINNET.clone(), + TestScrollFullBlockClient::default(), + provider, + ) + .await, + db, + ) } #[tokio::test] @@ -561,13 +1269,18 @@ mod test { let batch_commits = db.get_batches().await.unwrap().map(|res| res.unwrap()).collect::>().await; - assert_eq!(2, batch_commits.len()); + assert_eq!(3, batch_commits.len()); assert!(batch_commits.contains(&batch_commit_block_1)); assert!(batch_commits.contains(&batch_commit_block_20)); // check that the L1 message at block 30 is deleted - let l1_messages = - db.get_l1_messages().await.unwrap().map(|res| res.unwrap()).collect::>().await; + let l1_messages = db + .get_l1_messages(None) + .await + .unwrap() + .map(|res| res.unwrap()) + .collect::>() + .await; assert_eq!(2, l1_messages.len()); assert!(l1_messages.contains(&l1_message_block_1)); assert!(l1_messages.contains(&l1_message_block_20)); @@ -641,7 +1354,7 @@ mod test { } else { None }; - indexer.handle_block(l2_block.clone(), batch_info); + indexer.consolidate_l2_blocks(vec![l2_block.clone()], batch_info); indexer.next().await.unwrap().unwrap(); blocks.push(l2_block); } @@ -652,7 +1365,7 @@ mod test { let event = indexer.next().await.unwrap().unwrap(); assert_eq!( event, - IndexerEvent::UnwindIndexed { + ChainOrchestratorEvent::ChainUnwound { l1_block_number: 17, queue_index: None, l2_head_block_info: None, @@ -667,7 +1380,7 @@ mod test { assert_eq!( event, - IndexerEvent::UnwindIndexed { + ChainOrchestratorEvent::ChainUnwound { l1_block_number: 7, queue_index: Some(8), l2_head_block_info: Some(blocks[7].block_info), @@ -681,7 +1394,7 @@ mod test { assert_eq!( event, - IndexerEvent::UnwindIndexed { + ChainOrchestratorEvent::ChainUnwound { l1_block_number: 3, queue_index: Some(4), l2_head_block_info: Some(blocks[3].block_info), diff --git a/crates/indexer/src/metrics.rs b/crates/indexer/src/metrics.rs index dc25ee0c..0defdfeb 100644 --- a/crates/indexer/src/metrics.rs +++ b/crates/indexer/src/metrics.rs @@ -5,8 +5,10 @@ use strum::EnumIter; /// An enum representing the items the indexer can handle. #[derive(Debug, PartialEq, Eq, Hash, EnumIter)] pub enum IndexerItem { + /// Handle a block received from the network. + NewBlock, /// L2 block. - L2Block, + InsertL2Block, /// L1 reorg. L1Reorg, /// L1 finalization. @@ -23,7 +25,8 @@ impl IndexerItem { /// Returns the str representation of the [`IndexerItem`]. pub const fn as_str(&self) -> &'static str { match self { - Self::L2Block => "l2_block", + Self::NewBlock => "new_block", + Self::InsertL2Block => "l2_block", Self::L1Reorg => "l1_reorg", Self::L1Finalization => "l1_finalization", Self::L1Message => "l1_message", @@ -33,7 +36,7 @@ impl IndexerItem { } } -/// The metrics for the [`super::Indexer`]. +/// The metrics for the [`super::ChainOrchestrator`]. #[derive(Metrics, Clone)] #[metrics(scope = "indexer")] pub struct IndexerMetrics { diff --git a/crates/indexer/testdata/genesis_block.json b/crates/indexer/testdata/genesis_block.json new file mode 100644 index 00000000..3387b87c --- /dev/null +++ b/crates/indexer/testdata/genesis_block.json @@ -0,0 +1,27 @@ +{ + "header": { + "hash": "0xbbc05efd412b7cd47a2ed0e5ddfcf87af251e414ea4c801d78b6784513180a80", + "parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "miner": "0x0000000000000000000000000000000000000000", + "stateRoot": "0x08d535cc60f40af5dd3b31e0998d7567c2d568b224bed2ba26070aeb078d1339", + "transactionsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "receiptsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "difficulty": "0x1", + "number": "0x0", + "gasLimit": "0x989680", + "gasUsed": "0x0", + "timestamp": "0x6524e860", + "extraData": "", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "nonce": "0x0000000000000000", + "totalDifficulty": "0x1", + "size": "0x272" + }, + "body": { + "uncles": [], + "transactions": [], + "ommers": [] + } +} \ No newline at end of file diff --git a/crates/indexer/testdata/genesis_block_rpc.json b/crates/indexer/testdata/genesis_block_rpc.json new file mode 100644 index 00000000..b828bd22 --- /dev/null +++ b/crates/indexer/testdata/genesis_block_rpc.json @@ -0,0 +1,22 @@ +{ + "hash": "0xbbc05efd412b7cd47a2ed0e5ddfcf87af251e414ea4c801d78b6784513180a80", + "parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "miner": "0x0000000000000000000000000000000000000000", + "stateRoot": "0x08d535cc60f40af5dd3b31e0998d7567c2d568b224bed2ba26070aeb078d1339", + "transactionsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "receiptsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "difficulty": "0x1", + "number": "0x0", + "gasLimit": "0x989680", + "gasUsed": "0x0", + "timestamp": "0x6524e860", + "extraData": "0x4c61206573746f6e7465636f206573746173206d616c6665726d6974612e0000d2acf5d16a983db0d909d9d761b8337fabd6cbd10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "nonce": "0x0000000000000000", + "totalDifficulty": "0x1", + "size": "0x272", + "uncles": [], + "transactions": [] +} \ No newline at end of file diff --git a/crates/manager/src/manager/event.rs b/crates/manager/src/manager/event.rs index 0fb82c76..ea708e2d 100644 --- a/crates/manager/src/manager/event.rs +++ b/crates/manager/src/manager/event.rs @@ -1,4 +1,6 @@ use reth_scroll_primitives::ScrollBlock; +use rollup_node_indexer::ChainOrchestratorEvent; +use rollup_node_primitives::ChainImport; use rollup_node_signer::SignerEvent; use scroll_engine::ConsolidationOutcome; use scroll_network::NewBlockWithPeer; @@ -18,4 +20,10 @@ pub enum RollupManagerEvent { L1MessageIndexed(u64), /// A new event from the signer. SignerEvent(SignerEvent), + /// An event from the chain orchestrator. + ChainOrchestratorEvent(ChainOrchestratorEvent), + /// An optimistic sync has been triggered by the chain orchestrator. + OptimisticSyncTriggered(ScrollBlock), + /// A chain extension has been triggered by the chain orchestrator. + ChainExtensionTriggered(ChainImport), } diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index e9051cef..fe17092e 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -1,18 +1,18 @@ //! The [`RollupNodeManager`] is the main component of the rollup node that manages the -//! [`ScrollNetworkManager`], [`EngineDriver`], [`Indexer`] and [`Consensus`] components. It is -//! responsible for handling events from these components and coordinating their actions. +//! [`ScrollNetworkManager`], [`EngineDriver`], [`ChainOrchestrator`] and [`Consensus`] components. +//! It is responsible for handling events from these components and coordinating their actions. use super::Consensus; -use alloy_primitives::Signature; use alloy_provider::Provider; use futures::StreamExt; use reth_chainspec::EthChainSpec; -use reth_network_api::{block::NewBlockWithPeer as RethNewBlockWithPeer, FullNetwork}; +use reth_network::BlockDownloaderProvider; +use reth_network_api::FullNetwork; use reth_scroll_node::ScrollNetworkPrimitives; -use reth_scroll_primitives::ScrollBlock; use reth_tasks::shutdown::GracefulShutdown; use reth_tokio_util::{EventSender, EventStream}; -use rollup_node_indexer::{Indexer, IndexerEvent}; +use rollup_node_indexer::{ChainOrchestrator, ChainOrchestratorEvent}; +use rollup_node_primitives::BlockInfo; use rollup_node_sequencer::Sequencer; use rollup_node_signer::{SignerEvent, SignerHandle}; use rollup_node_watcher::L1Notification; @@ -53,9 +53,6 @@ pub use handle::RollupManagerHandle; /// The size of the event channel. const EVENT_CHANNEL_SIZE: usize = 100; -/// The size of the ECDSA signature in bytes. -const ECDSA_SIGNATURE_LEN: usize = 65; - /// The main manager for the rollup node. /// /// This is an endless [`Future`] that drives the state of the entire network forward and includes @@ -82,19 +79,17 @@ pub struct RollupNodeManager< /// The chain spec used by the rollup node. chain_spec: Arc, /// The network manager that manages the scroll p2p network. - network: ScrollNetworkManager, + network: ScrollNetworkManager, /// The engine driver used to communicate with the engine. engine: EngineDriver, /// The derivation pipeline, used to derive payload attributes from batches. derivation_pipeline: Option>, /// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`]. l1_notification_rx: Option>>, - /// An indexer used to index data for the rollup node. - indexer: Indexer, + /// The chain orchestrator. + chain: ChainOrchestrator::Client, P>, /// The consensus algorithm used by the rollup node. consensus: Box, - /// The receiver for new blocks received from the network (used to bridge from eth-wire). - eth_wire_block_rx: Option>>, /// An event sender for sending events to subscribers of the rollup node manager. event_sender: Option>, /// The sequencer which is responsible for sequencing transactions and producing new blocks. @@ -128,7 +123,7 @@ impl< .field("engine", &self.engine) .field("derivation_pipeline", &self.derivation_pipeline) .field("l1_notification_rx", &self.l1_notification_rx) - .field("indexer", &self.indexer) + .field("indexer", &self.chain) .field("consensus", &self.consensus) .field("eth_wire_block_rx", &"eth_wire_block_rx") .field("event_sender", &self.event_sender) @@ -150,21 +145,20 @@ where /// Create a new [`RollupNodeManager`] instance. #[allow(clippy::too_many_arguments)] #[allow(clippy::new_ret_no_self)] - pub fn new( - network: ScrollNetworkManager, + pub async fn new( + network: ScrollNetworkManager, engine: EngineDriver, l1_provider: Option, database: Arc, l1_notification_rx: Option>>, consensus: Box, chain_spec: Arc, - eth_wire_block_rx: Option>>, sequencer: Option>, signer: Option, block_time: Option, + chain_orchestrator: ChainOrchestrator::Client, P>, ) -> (Self, RollupManagerHandle) { let (handle_tx, handle_rx) = mpsc::channel(EVENT_CHANNEL_SIZE); - let indexer = Indexer::new(database.clone(), chain_spec.clone()); let derivation_pipeline = l1_provider.map(|provider| DerivationPipeline::new(provider, database)); let rnm = Self { @@ -174,9 +168,8 @@ where engine, derivation_pipeline, l1_notification_rx: l1_notification_rx.map(Into::into), - indexer, + chain: chain_orchestrator, consensus, - eth_wire_block_rx, event_sender: None, sequencer, signer, @@ -220,7 +213,7 @@ where result: Err(err.into()), }); } else { - self.engine.handle_block_import(block_with_peer); + self.chain.handle_block_from_peer(block_with_peer); } } @@ -234,27 +227,27 @@ where } /// Handles an indexer event. - fn handle_indexer_event(&mut self, event: IndexerEvent) { + fn handle_indexer_event(&mut self, event: ChainOrchestratorEvent) { trace!(target: "scroll::node::manager", ?event, "Received indexer event"); match event { - IndexerEvent::BatchCommitIndexed(batch_info) => { + ChainOrchestratorEvent::BatchCommitted(batch_info) => { // push the batch info into the derivation pipeline. if let Some(pipeline) = &mut self.derivation_pipeline { pipeline.handle_batch_commit(batch_info); } } - IndexerEvent::BatchFinalizationIndexed(_, Some(finalized_block)) => { + ChainOrchestratorEvent::BatchFinalized(_, Some(finalized_block)) => { // update the fcs on new finalized block. self.engine.set_finalized_block_info(finalized_block); } - IndexerEvent::FinalizedIndexed(l1_block_number, Some(finalized_block)) => { + ChainOrchestratorEvent::L1BlockFinalized(l1_block_number, Some(finalized_block)) => { if let Some(sequencer) = self.sequencer.as_mut() { sequencer.set_l1_finalized_block_number(l1_block_number); } // update the fcs on new finalized block. self.engine.set_finalized_block_info(finalized_block); } - IndexerEvent::UnwindIndexed { + ChainOrchestratorEvent::ChainUnwound { l1_block_number, queue_index, l2_head_block_info, @@ -277,12 +270,52 @@ where // TODO: should clear the derivation pipeline. } - IndexerEvent::L1MessageIndexed(index) => { + ChainOrchestratorEvent::L1MessageCommitted(index) => { if let Some(event_sender) = self.event_sender.as_ref() { event_sender.notify(RollupManagerEvent::L1MessageIndexed(index)); } } - _ => (), + ChainOrchestratorEvent::OldForkReceived { ref headers, ref peer_id, signature: _ } => { + trace!(target: "scroll::node::manager", ?headers, ?peer_id, "Received old fork from peer"); + if let Some(event_sender) = self.event_sender.as_ref() { + event_sender.notify(RollupManagerEvent::ChainOrchestratorEvent(event)); + } + } + ChainOrchestratorEvent::ChainExtended(chain_import) => { + trace!(target: "scroll::node::manager", head = ?chain_import.chain.last().unwrap().header.clone(), peer_id = ?chain_import.peer_id.clone(), "Received chain extension from peer"); + if let Some(event_sender) = self.event_sender.as_ref() { + event_sender + .notify(RollupManagerEvent::ChainExtensionTriggered(chain_import.clone())); + } + + // Issue the new chain to the engine driver for processing. + self.engine.handle_chain_import(chain_import) + } + ChainOrchestratorEvent::ChainReorged(chain_import) => { + trace!(target: "scroll::node::manager", head = ?chain_import.chain.last().unwrap().header, ?chain_import.peer_id, "Received chain reorg from peer"); + // if let Some(event_sender) = self.event_sender.as_ref() { + // event_sender.notify(RollupManagerEvent::ChainOrchestratorEvent(event)); + // } + + // Issue the new chain to the engine driver for processing. + self.engine.handle_chain_import(chain_import) + } + ChainOrchestratorEvent::OptimisticSync(block) => { + let block_info: BlockInfo = (&block).into(); + trace!(target: "scroll::node::manager", ?block_info, "Received optimistic sync from peer"); + if let Some(event_sender) = self.event_sender.as_ref() { + event_sender.notify(RollupManagerEvent::OptimisticSyncTriggered(block)); + } + + // Issue the new block info to the engine driver for processing. + self.engine.handle_optimistic_sync(block_info) + } + event => { + trace!(target: "scroll::node::manager", ?event, "Received chain orchestrator event"); + if let Some(event_sender) = self.event_sender.as_ref() { + event_sender.notify(RollupManagerEvent::ChainOrchestratorEvent(event)); + } + } } } @@ -295,7 +328,7 @@ where if let Some(event_sender) = self.event_sender.as_ref() { event_sender.notify(RollupManagerEvent::BlockImported(block.clone())); } - self.indexer.handle_block((&block).into(), None); + self.chain.consolidate_l2_blocks(vec![(&block).into()], None); } self.network.handle().block_import_outcome(outcome); } @@ -308,11 +341,11 @@ where event_sender.notify(RollupManagerEvent::BlockSequenced(payload.clone())); } - self.indexer.handle_block((&payload).into(), None); + self.chain.consolidate_l2_blocks(vec![(&payload).into()], None); } EngineDriverEvent::L1BlockConsolidated(consolidation_outcome) => { - self.indexer.handle_block( - consolidation_outcome.block_info().clone(), + self.chain.consolidate_l2_blocks( + vec![consolidation_outcome.block_info().clone()], Some(*consolidation_outcome.batch_info()), ); @@ -322,50 +355,27 @@ where )); } } + EngineDriverEvent::ChainImportOutcome(outcome) => { + if let Some(block) = outcome.outcome.block() { + if let Some(event_sender) = self.event_sender.as_ref() { + event_sender.notify(RollupManagerEvent::BlockImported(block)); + } + self.chain.consolidate_l2_blocks( + outcome.chain.iter().map(|b| b.into()).collect(), + None, + ); + } + self.network.handle().block_import_outcome(outcome.outcome); + } } } - fn handle_eth_wire_block( - &mut self, - block: reth_network_api::block::NewBlockWithPeer, - ) { - trace!(target: "scroll::node::manager", ?block, "Received new block from eth-wire protocol"); - let reth_network_api::block::NewBlockWithPeer { peer_id, mut block } = block; - - // We purge the extra data field post euclid v2 to align with protocol specification. - let extra_data = if self.chain_spec.is_euclid_v2_active_at_timestamp(block.timestamp) { - let extra_data = block.extra_data.clone(); - block.header.extra_data = Default::default(); - extra_data - } else { - block.extra_data.clone() - }; - - // If we can extract a signature from the extra data we validate consensus and then attempt - // import via the EngineAPI in the `handle_new_block` method. The signature is extracted - // from the last `ECDSA_SIGNATURE_LEN` bytes of the extra data field as specified by - // the protocol. - let block = if let Some(signature) = extra_data - .len() - .checked_sub(ECDSA_SIGNATURE_LEN) - .and_then(|i| Signature::from_raw(&extra_data[i..]).ok()) - { - trace!(target: "scroll::bridge::import", peer_id = %peer_id, block = ?block.hash_slow(), "Received new block from eth-wire protocol"); - NewBlockWithPeer { peer_id, block, signature } - } else { - warn!(target: "scroll::bridge::import", peer_id = %peer_id, "Failed to extract signature from block extra data"); - return; - }; - - self.handle_new_block(block); - } - /// Handles an [`L1Notification`] from the L1 watcher. fn handle_l1_notification(&mut self, notification: L1Notification) { if let L1Notification::Consensus(ref update) = notification { self.consensus.update_config(update); } - self.indexer.handle_l1_notification(notification) + self.chain.handle_l1_notification(notification) } /// Returns the current status of the [`RollupNodeManager`]. @@ -458,7 +468,7 @@ where ); // Drain all Indexer events. - while let Poll::Ready(Some(result)) = this.indexer.poll_next_unpin(cx) { + while let Poll::Ready(Some(result)) = this.chain.poll_next_unpin(cx) { match result { Ok(event) => this.handle_indexer_event(event), Err(err) => { @@ -514,13 +524,6 @@ where this.engine.handle_l1_consolidation(attributes) } - // Handle blocks received from the eth-wire protocol. - while let Some(Poll::Ready(Some(block))) = - this.eth_wire_block_rx.as_mut().map(|new_block_rx| new_block_rx.poll_next_unpin(cx)) - { - this.handle_eth_wire_block(block); - } - // Handle network manager events. while let Poll::Ready(Some(event)) = this.network.poll_next_unpin(cx) { this.handle_network_manager_event(event); diff --git a/crates/network/Cargo.toml b/crates/network/Cargo.toml index 1fb59c4f..adc549fe 100644 --- a/crates/network/Cargo.toml +++ b/crates/network/Cargo.toml @@ -11,6 +11,7 @@ exclude.workspace = true alloy-primitives = { workspace = true, features = ["map-foldhash"] } # reth +reth-chainspec.workspace = true reth-eth-wire-types.workspace = true reth-network.workspace = true reth-network-api.workspace = true @@ -18,11 +19,13 @@ reth-network-types = { git = "https://github.com/scroll-tech/reth.git", default- reth-network-peers.workspace = true reth-primitives-traits.workspace = true reth-storage-api = { git = "https://github.com/scroll-tech/reth.git", default-features = false } +reth-tokio-util.workspace = true # scroll reth-scroll-chainspec.workspace = true reth-scroll-node.workspace = true reth-scroll-primitives.workspace = true +scroll-alloy-hardforks.workspace = true scroll-wire.workspace = true # misc @@ -45,4 +48,5 @@ serde = [ "scroll-wire/serde", "reth-primitives-traits/serde", "reth-storage-api/serde", + "scroll-alloy-hardforks/serde", ] diff --git a/crates/network/src/manager.rs b/crates/network/src/manager.rs index 7c10ceb5..badbdead 100644 --- a/crates/network/src/manager.rs +++ b/crates/network/src/manager.rs @@ -4,27 +4,35 @@ use super::{ BlockImportOutcome, BlockValidation, NetworkHandleMessage, NetworkManagerEvent, NewBlockWithPeer, ScrollNetworkHandle, }; -use alloy_primitives::FixedBytes; +use alloy_primitives::{FixedBytes, Signature}; use futures::{FutureExt, Stream, StreamExt}; +use reth_chainspec::EthChainSpec; use reth_network::{ cache::LruCache, NetworkConfig as RethNetworkConfig, NetworkHandle as RethNetworkHandle, NetworkManager as RethNetworkManager, }; -use reth_network_api::FullNetwork; +use reth_network_api::{block::NewBlockWithPeer as RethNewBlockWithPeer, FullNetwork}; use reth_scroll_node::ScrollNetworkPrimitives; +use reth_scroll_primitives::ScrollBlock; use reth_storage_api::BlockNumReader as BlockNumReaderT; +use reth_tokio_util::EventStream; +use scroll_alloy_hardforks::ScrollHardforks; use scroll_wire::{ NewBlock, ScrollWireConfig, ScrollWireEvent, ScrollWireManager, ScrollWireProtocolHandler, LRU_CACHE_SIZE, }; use std::{ pin::Pin, + sync::Arc, task::{Context, Poll}, }; use tokio::sync::mpsc::{self, UnboundedReceiver}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::trace; +/// The size of the ECDSA signature in bytes. +const ECDSA_SIGNATURE_LEN: usize = 65; + /// [`ScrollNetworkManager`] manages the state of the scroll p2p network. /// /// This manager drives the state of the entire network forward and includes the following @@ -34,22 +42,30 @@ use tracing::trace; /// - `from_handle_rx`: Receives commands from the [`FullNetwork`]. /// - `scroll_wire`: The type that manages connections and state of the scroll wire protocol. #[derive(Debug)] -pub struct ScrollNetworkManager { +pub struct ScrollNetworkManager { + /// The chain spec used by the rollup node. + chain_spec: Arc, /// A handle used to interact with the network manager. handle: ScrollNetworkHandle, /// Receiver half of the channel set up between this type and the [`FullNetwork`], receives /// [`NetworkHandleMessage`]s. from_handle_rx: UnboundedReceiverStream, + /// The receiver for new blocks received from the network (used to bridge from eth-wire). + eth_wire_listener: Option>>, /// The scroll wire protocol manager. scroll_wire: ScrollWireManager, } -impl ScrollNetworkManager> { +impl + ScrollNetworkManager, CS> +{ /// Creates a new [`ScrollNetworkManager`] instance from the provided configuration and block /// import. pub async fn new( + chain_spec: Arc, mut network_config: RethNetworkConfig, scroll_wire_config: ScrollWireConfig, + eth_wire_listener: Option>>, ) -> Self { // Create the scroll-wire protocol handler. let (scroll_wire_handler, events) = ScrollWireProtocolHandler::new(scroll_wire_config); @@ -73,16 +89,29 @@ impl ScrollNetworkManager> { // Spawn the inner network manager. tokio::spawn(inner_network_manager); - Self { handle, from_handle_rx: from_handle_rx.into(), scroll_wire } + Self { + chain_spec, + handle, + from_handle_rx: from_handle_rx.into(), + scroll_wire, + eth_wire_listener, + } } } -impl ScrollNetworkManager { +impl + ScrollNetworkManager +{ /// Creates a new [`ScrollNetworkManager`] instance from the provided parts. /// /// This is used when the scroll-wire [`ScrollWireProtocolHandler`] and the inner network /// manager [`RethNetworkManager`] are instantiated externally. - pub fn from_parts(inner_network_handle: N, events: UnboundedReceiver) -> Self { + pub fn from_parts( + chain_spec: Arc, + inner_network_handle: N, + events: UnboundedReceiver, + eth_wire_listener: Option>>, + ) -> Self { // Create the channel for sending messages to the network manager from the network handle. let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel(); @@ -91,7 +120,13 @@ impl ScrollNetworkManager { let handle = ScrollNetworkHandle::new(to_manager_tx, inner_network_handle); - Self { handle, from_handle_rx: from_handle_rx.into(), scroll_wire } + Self { + chain_spec, + handle, + from_handle_rx: from_handle_rx.into(), + scroll_wire, + eth_wire_listener, + } } /// Returns a new [`ScrollNetworkHandle`] instance. @@ -182,9 +217,45 @@ impl ScrollNetworkManager { } } } + + /// Handles a new block received from the eth-wire protocol. + fn handle_eth_wire_block( + &mut self, + block: reth_network_api::block::NewBlockWithPeer, + ) -> Option { + trace!(target: "scroll::node::manager", ?block, "Received new block from eth-wire protocol"); + let reth_network_api::block::NewBlockWithPeer { peer_id, mut block } = block; + + // We purge the extra data field post euclid v2 to align with protocol specification. + let extra_data = if self.chain_spec.is_euclid_v2_active_at_timestamp(block.timestamp) { + let extra_data = block.extra_data.clone(); + block.header.extra_data = Default::default(); + extra_data + } else { + block.extra_data.clone() + }; + + // If we can extract a signature from the extra data we validate consensus and then attempt + // import via the EngineAPI in the `handle_new_block` method. The signature is extracted + // from the last `ECDSA_SIGNATURE_LEN` bytes of the extra data field as specified by + // the protocol. + if let Some(signature) = extra_data + .len() + .checked_sub(ECDSA_SIGNATURE_LEN) + .and_then(|i| Signature::from_raw(&extra_data[i..]).ok()) + { + trace!(target: "scroll::bridge::import", peer_id = %peer_id, block = ?block.hash_slow(), "Received new block from eth-wire protocol"); + Some(NetworkManagerEvent::NewBlock(NewBlockWithPeer { peer_id, block, signature })) + } else { + tracing::warn!(target: "scroll::bridge::import", peer_id = %peer_id, "Failed to extract signature from block extra data"); + None + } + } } -impl Stream for ScrollNetworkManager { +impl Stream + for ScrollNetworkManager +{ type Item = NetworkManagerEvent; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -211,6 +282,15 @@ impl Stream for ScrollNetworkManager { return Poll::Ready(Some(this.on_scroll_wire_event(event))); } + // Handle blocks received from the eth-wire protocol. + while let Some(Poll::Ready(Some(block))) = + this.eth_wire_listener.as_mut().map(|new_block_rx| new_block_rx.poll_next_unpin(cx)) + { + if let Some(event) = this.handle_eth_wire_block(block) { + return Poll::Ready(Some(event)); + } + } + Poll::Pending } } diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 65271709..ef3d6fe3 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -56,6 +56,7 @@ reth-transaction-pool = { git = "https://github.com/scroll-tech/reth.git", defau reth-trie-db = { git = "https://github.com/scroll-tech/reth.git", default-features = false } # rollup node +rollup-node-indexer.workspace = true rollup-node-manager.workspace = true rollup-node-primitives.workspace = true rollup-node-providers.workspace = true diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index dc6d6590..9ae81ae9 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -17,6 +17,7 @@ use reth_node_builder::rpc::RethRpcServerHandles; use reth_node_core::primitives::BlockHeader; use reth_scroll_chainspec::SCROLL_FEE_VAULT_ADDRESS; use reth_scroll_node::ScrollNetworkPrimitives; +use rollup_node_indexer::ChainOrchestrator; use rollup_node_manager::{ Consensus, NoopConsensus, RollupManagerHandle, RollupNodeManager, SystemContractConsensus, }; @@ -111,12 +112,6 @@ impl ScrollRollupNodeConfig { RollupManagerHandle, Option>>, )> { - // Instantiate the network manager - let (scroll_wire_handler, events) = - ScrollWireProtocolHandler::new(ScrollWireConfig::new(true)); - network.add_rlpx_sub_protocol(scroll_wire_handler.into_rlpx_sub_protocol()); - let scroll_network_manager = ScrollNetworkManager::from_parts(network.clone(), events); - // Get the rollup node config. let named_chain = chain_spec.chain().named().expect("expected named chain"); let node_config = Arc::new(NodeConfig::from_named_chain(named_chain)); @@ -142,8 +137,8 @@ impl ScrollRollupNodeConfig { let l2_provider = rpc_server_handles .rpc .new_http_provider_for() - .map(Arc::new) .expect("failed to create payload provider"); + let l2_provider = Arc::new(l2_provider); // Instantiate the database let database_path = if let Some(database_path) = self.database_args.path { @@ -175,6 +170,21 @@ impl ScrollRollupNodeConfig { let chain_spec = Arc::new(chain_spec.clone()); + // Instantiate the network manager + let eth_wire_listener = self + .network_args + .enable_eth_scroll_wire_bridge + .then_some(network.eth_wire_block_listener().await?); + let (scroll_wire_handler, events) = + ScrollWireProtocolHandler::new(ScrollWireConfig::new(true)); + network.add_rlpx_sub_protocol(scroll_wire_handler.into_rlpx_sub_protocol()); + let scroll_network_manager = ScrollNetworkManager::from_parts( + chain_spec.clone(), + network.clone(), + events, + eth_wire_listener, + ); + // On startup we replay the latest batch of blocks from the database as such we set the safe // block hash to the latest block hash associated with the previous consolidated // batch in the database. @@ -192,10 +202,9 @@ impl ScrollRollupNodeConfig { let engine = EngineDriver::new( Arc::new(engine_api), chain_spec.clone(), - Some(l2_provider), + Some(l2_provider.clone()), fcs, !self.test && !chain_spec.is_dev_chain(), - self.engine_driver_args.en_sync_trigger, Duration::from_millis(self.sequencer_args.payload_building_duration), ); @@ -258,13 +267,6 @@ impl ScrollRollupNodeConfig { (None, None) }; - // Instantiate the eth wire listener - let eth_wire_listener = self - .network_args - .enable_eth_scroll_wire_bridge - .then_some(network.eth_wire_block_listener().await?); - - // Instantiate the signer // Instantiate the signer let signer = if self.test { // Use a random private key signer for testing @@ -275,6 +277,16 @@ impl ScrollRollupNodeConfig { self.signer_args.signer(chain_id).await?.map(rollup_node_signer::Signer::spawn) }; + // Instantiate the chain orchestrator + let block_client = scroll_network_manager + .handle() + .inner() + .fetch_client() + .await + .expect("failed to fetch block client"); + let chain_orchestrator = + ChainOrchestrator::new(db.clone(), chain_spec.clone(), block_client, l2_provider).await; + // Spawn the rollup node manager let (rnm, handle) = RollupNodeManager::new( scroll_network_manager, @@ -284,11 +296,12 @@ impl ScrollRollupNodeConfig { l1_notification_rx, consensus, chain_spec, - eth_wire_listener, sequencer, signer, block_time, - ); + chain_orchestrator, + ) + .await; Ok((rnm, handle, l1_notification_tx)) } } diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 2cc4a963..9f40dba9 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -91,6 +91,19 @@ async fn can_bridge_l1_messages() -> eyre::Result<()> { Ok(()) } +#[tokio::test] +async fn follower_can_reorg() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // create 2 nodes + let chain_spec = (*SCROLL_DEV).clone(); + let (mut _nodes, _tasks, _) = + setup_engine(default_test_scroll_rollup_node_config(), 2, chain_spec.clone(), false) + .await?; + + Ok(()) +} + #[tokio::test] async fn can_sequence_and_gossip_blocks() { reth_tracing::init_test_tracing(); @@ -149,6 +162,12 @@ async fn can_sequence_and_gossip_blocks() { panic!("Failed to receive block from rollup node"); } + // assert that a chain extension is triggered on the follower node + if let Some(RollupManagerEvent::ChainExtensionTriggered(_)) = follower_events.next().await { + } else { + panic!("Failed to receive chain extension event from rollup node"); + } + // assert that the block was successfully imported by the follower node if let Some(RollupManagerEvent::BlockImported(block)) = follower_events.next().await { assert_eq!(block.body.transactions.len(), 1); @@ -189,8 +208,13 @@ async fn can_bridge_blocks() { .with_pow() .build_with_noop_provider(chain_spec.clone()); let scroll_wire_config = ScrollWireConfig::new(true); - let mut scroll_network = - scroll_network::ScrollNetworkManager::new(network_config, scroll_wire_config).await; + let mut scroll_network = scroll_network::ScrollNetworkManager::new( + chain_spec.clone(), + network_config, + scroll_wire_config, + None, + ) + .await; let scroll_network_handle = scroll_network.handle(); // Connect the scroll-wire node to the scroll NetworkManager. diff --git a/crates/node/tests/sync.rs b/crates/node/tests/sync.rs index 8b315bd7..f43e0a0f 100644 --- a/crates/node/tests/sync.rs +++ b/crates/node/tests/sync.rs @@ -11,8 +11,7 @@ use rollup_node::{ }, ScrollRollupNode, }; -use rollup_node_manager::{RollupManagerCommand, RollupManagerEvent}; -use tokio::sync::oneshot; +use rollup_node_manager::RollupManagerEvent; /// We test if the syncing of the RN is correctly triggered and released when the EN reaches sync. #[allow(clippy::large_stack_frames)] @@ -42,19 +41,9 @@ async fn test_should_trigger_pipeline_sync_for_execution_node() { unsynced.network.next_session_established().await; synced.network.next_session_established().await; - // Wait for the unsynced node to receive a block. - wait_n_events(&unsynced, |e| matches!(e, RollupManagerEvent::NewBlockReceived(_)), 1).await; - - // Check the unsynced node enters sync mode. - let (tx, rx) = oneshot::channel(); - unsynced - .inner - .add_ons_handle - .rollup_manager_handle - .send_command(RollupManagerCommand::Status(tx)) + // Assert that the unsynced node triggers optimistic sync. + wait_n_events(&unsynced, |e| matches!(e, RollupManagerEvent::OptimisticSyncTriggered(_)), 1) .await; - let status = rx.await.unwrap(); - assert!(status.syncing); // Verify the unsynced node syncs. let provider = ProviderBuilder::new().connect_http(unsynced.rpc_url()); @@ -70,19 +59,9 @@ async fn test_should_trigger_pipeline_sync_for_execution_node() { retries += 1; } - // Wait at least a single block for the driver to exit sync mode. - wait_n_events(&unsynced, |e| matches!(e, RollupManagerEvent::BlockImported(_)), 1).await; - - // Check the unsynced node exits sync mode. - let (tx, rx) = oneshot::channel(); - unsynced - .inner - .add_ons_handle - .rollup_manager_handle - .send_command(RollupManagerCommand::Status(tx)) + // Assert that the unsynced node triggers a chain extension on the optimistic chain. + wait_n_events(&unsynced, |e| matches!(e, RollupManagerEvent::ChainExtensionTriggered(_)), 1) .await; - let status = rx.await.unwrap(); - assert!(!status.syncing); } /// Waits for n events to be emitted. diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index 7dbbc3ca..d1af8a3d 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -22,6 +22,7 @@ scroll-alloy-consensus.workspace = true scroll-alloy-rpc-types-engine.workspace = true # reth +reth-network-peers.workspace = true reth-primitives-traits.workspace = true reth-scroll-primitives.workspace = true @@ -42,6 +43,7 @@ std = [ "reth-primitives-traits/std", "alloy-consensus/std", "alloy-chains/std", + "reth-network-peers/std", ] arbitrary = [ "std", diff --git a/crates/primitives/src/block.rs b/crates/primitives/src/block.rs index e698a314..f870f105 100644 --- a/crates/primitives/src/block.rs +++ b/crates/primitives/src/block.rs @@ -1,3 +1,4 @@ +use alloy_consensus::Header; use alloy_eips::{BlockNumHash, Decodable2718}; use alloy_primitives::{B256, U256}; use alloy_rpc_types_engine::ExecutionPayload; @@ -49,6 +50,18 @@ impl From<&ScrollBlock> for BlockInfo { } } +impl From<&Header> for BlockInfo { + fn from(value: &Header) -> Self { + Self { number: value.number, hash: value.hash_slow() } + } +} + +impl From
for BlockInfo { + fn from(value: Header) -> Self { + Self { number: value.number, hash: value.hash_slow() } + } +} + #[cfg(feature = "arbitrary")] impl arbitrary::Arbitrary<'_> for BlockInfo { fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result { diff --git a/crates/primitives/src/bounded_vec.rs b/crates/primitives/src/bounded_vec.rs index 55a185f6..eaf74526 100644 --- a/crates/primitives/src/bounded_vec.rs +++ b/crates/primitives/src/bounded_vec.rs @@ -39,6 +39,11 @@ impl BoundedVec { self.data.back() } + /// Returns the first element in the vector, if any. + pub fn first(&self) -> Option<&T> { + self.data.front() + } + /// Clears the structure by removing all the elements. pub fn clear(&mut self) { self.data.clear() @@ -56,6 +61,16 @@ impl BoundedVec { fn is_full(&self) -> bool { self.data.len() == self.data.capacity() } + + /// Returns the inner `VecDeque` of the bounded vec. + pub const fn inner(&self) -> &VecDeque { + &self.data + } + + /// Returns the inner `VecDeque` of the bounded vec. + pub fn into_inner(self) -> VecDeque { + self.data + } } impl Extend for BoundedVec { diff --git a/crates/primitives/src/chain.rs b/crates/primitives/src/chain.rs new file mode 100644 index 00000000..2568960e --- /dev/null +++ b/crates/primitives/src/chain.rs @@ -0,0 +1,24 @@ +use alloy_primitives::Signature; +use reth_network_peers::PeerId; +use reth_scroll_primitives::ScrollBlock; +use std::vec::Vec; + +/// A structure representing a chain import, which includes a vector of blocks, +/// the peer ID from which the blocks were received, and a signature for the import of the chain +/// tip. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChainImport { + /// The blocks that are part of the chain import. + pub chain: Vec, + /// The peer ID from which the blocks were received. + pub peer_id: PeerId, + /// The signature for the import of the chain tip. + pub signature: Signature, +} + +impl ChainImport { + /// Creates a new `ChainImport` instance with the provided blocks, peer ID, and signature. + pub const fn new(blocks: Vec, peer_id: PeerId, signature: Signature) -> Self { + Self { chain: blocks, peer_id, signature } + } +} diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 0b3407a7..6c958348 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -16,6 +16,9 @@ pub use batch::{BatchCommitData, BatchInfo}; mod bounded_vec; pub use bounded_vec::BoundedVec; +mod chain; +pub use chain::ChainImport; + mod metadata; pub use metadata::Metadata; diff --git a/crates/scroll-wire/src/protocol/proto.rs b/crates/scroll-wire/src/protocol/proto.rs index 92472371..473fb837 100644 --- a/crates/scroll-wire/src/protocol/proto.rs +++ b/crates/scroll-wire/src/protocol/proto.rs @@ -30,7 +30,7 @@ pub struct NewBlock { } impl NewBlock { - /// Returns a [`NewBlock`] instance with the provided signature and block. + /// Returns a [`NewBlock`] instance with the provided signature and blocks. pub fn new(signature: Signature, block: reth_scroll_primitives::ScrollBlock) -> Self { Self { signature: Bytes::from(Into::>::into(signature)), block } } diff --git a/crates/sequencer/tests/e2e.rs b/crates/sequencer/tests/e2e.rs index 77315649..bda7b946 100644 --- a/crates/sequencer/tests/e2e.rs +++ b/crates/sequencer/tests/e2e.rs @@ -40,7 +40,7 @@ async fn can_build_blocks() { reth_tracing::init_test_tracing(); const BLOCK_BUILDING_DURATION: Duration = Duration::from_millis(0); - const BLOCK_GAP_TRIGGER: u64 = 100; + // const BLOCK_GAP_TRIGGER: u64 = 100; // setup a test node let (mut nodes, _tasks, wallet) = setup(1, false).await.unwrap(); @@ -64,7 +64,6 @@ async fn can_build_blocks() { None::, fcs, false, - BLOCK_GAP_TRIGGER, BLOCK_BUILDING_DURATION, ); @@ -158,7 +157,6 @@ async fn can_build_blocks_with_delayed_l1_messages() { let chain_spec = SCROLL_DEV.clone(); const BLOCK_BUILDING_DURATION: Duration = tokio::time::Duration::from_millis(0); - const BLOCK_GAP_TRIGGER: u64 = 100; const L1_MESSAGE_DELAY: u64 = 2; // setup a test node @@ -184,7 +182,6 @@ async fn can_build_blocks_with_delayed_l1_messages() { None::, fcs, false, - BLOCK_GAP_TRIGGER, BLOCK_BUILDING_DURATION, ); @@ -284,7 +281,7 @@ async fn can_build_blocks_with_finalized_l1_messages() { let chain_spec = SCROLL_DEV.clone(); const BLOCK_BUILDING_DURATION: Duration = tokio::time::Duration::from_millis(0); - const BLOCK_GAP_TRIGGER: u64 = 100; + // const BLOCK_GAP_TRIGGER: u64 = 100; // setup a test node let (mut nodes, _tasks, wallet) = @@ -309,7 +306,6 @@ async fn can_build_blocks_with_finalized_l1_messages() { None::, fcs, false, - BLOCK_GAP_TRIGGER, BLOCK_BUILDING_DURATION, ); @@ -478,6 +474,9 @@ async fn can_sequence_blocks_with_private_key_file() -> eyre::Result<()> { panic!("Failed to receive BlockSequenced event"); } + // Skip the next event. + let _ = sequencer_events.next().await; + // Verify signing event and signature correctness if let Some(RollupManagerEvent::SignerEvent(SignerEvent::SignedBlock { block: signed_block, @@ -562,6 +561,9 @@ async fn can_sequence_blocks_with_hex_key_file_without_prefix() -> eyre::Result< panic!("Failed to receive BlockSequenced event"); } + // Skip the next event. + let _ = sequencer_events.next().await; + // Verify signing event and signature correctness if let Some(RollupManagerEvent::SignerEvent(SignerEvent::SignedBlock { block: signed_block, @@ -585,7 +587,7 @@ async fn can_build_blocks_and_exit_at_gas_limit() { let chain_spec = SCROLL_DEV.clone(); const MIN_TRANSACTION_GAS_COST: u64 = 21_000; const BLOCK_BUILDING_DURATION: Duration = Duration::from_millis(250); - const BLOCK_GAP_TRIGGER: u64 = 100; + // const BLOCK_GAP_TRIGGER: u64 = 100; const TRANSACTIONS_COUNT: usize = 2000; // setup a test node. use a high value for the payload building duration to be sure we don't @@ -635,7 +637,6 @@ async fn can_build_blocks_and_exit_at_gas_limit() { None::, fcs, false, - BLOCK_GAP_TRIGGER, BLOCK_BUILDING_DURATION, ); @@ -671,7 +672,7 @@ async fn can_build_blocks_and_exit_at_time_limit() { let chain_spec = SCROLL_DEV.clone(); const MIN_TRANSACTION_GAS_COST: u64 = 21_000; const BLOCK_BUILDING_DURATION: Duration = Duration::from_secs(1); - const BLOCK_GAP_TRIGGER: u64 = 100; + // const BLOCK_GAP_TRIGGER: u64 = 100; const TRANSACTIONS_COUNT: usize = 2000; // setup a test node. use a low payload building duration in order to exit before we reach the @@ -721,7 +722,6 @@ async fn can_build_blocks_and_exit_at_time_limit() { None::, fcs, false, - BLOCK_GAP_TRIGGER, BLOCK_BUILDING_DURATION, );