Skip to content

feat: chain orchestrator #185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .codespellrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[codespell]
skip = .git,target,Cargo.toml,Cargo.lock
skip = .git,target,Cargo.toml,Cargo.lock,docker-compose
ignore-words-list = crate
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,13 @@ mod test {
db.insert_l1_message(l1_message_2.clone()).await.unwrap();

// collect the L1Messages
let l1_messages =
db.get_l1_messages().await.unwrap().map(|res| res.unwrap()).collect::<Vec<_>>().await;
let l1_messages = db
.get_l1_messages(None)
.await
.unwrap()
.map(|res| res.unwrap())
.collect::<Vec<_>>()
.await;

// Apply the assertions.
assert!(l1_messages.contains(&l1_message_1));
Expand Down Expand Up @@ -412,9 +417,10 @@ mod test {
rand::rng().fill(bytes.as_mut_slice());
let mut u = Unstructured::new(&bytes);

// Initially should return None
let latest_safe = db.get_latest_safe_l2_info().await.unwrap();
assert!(latest_safe.is_none());
// Initially should return the genesis block and hash.
let (latest_safe_block, batch) = db.get_latest_safe_l2_info().await.unwrap().unwrap();
assert_eq!(latest_safe_block.number, 0);
assert_eq!(batch.index, 0);

// Generate and insert a batch
let batch_data = BatchCommitData { index: 100, ..Arbitrary::arbitrary(&mut u).unwrap() };
Expand Down
2 changes: 1 addition & 1 deletion crates/database/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod models;
pub use models::*;

mod operations;
pub use operations::{DatabaseOperations, UnwindResult};
pub use operations::{DatabaseOperations, L1MessageStart, UnwindResult};

mod transaction;
pub use transaction::DatabaseTransaction;
Expand Down
74 changes: 73 additions & 1 deletion crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,30 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.map(Into::into))?)
}

/// Gets an iterator over all [`L1MessageEnvelope`]s in the database.
/// Get an iterator over all [`L1MessageEnvelope`]s in the database starting from the provided
/// `start` point.
async fn get_l1_messages<'a>(
&'a self,
start: Option<L1MessageStart>,
) -> Result<impl Stream<Item = Result<L1MessageEnvelope, DatabaseError>> + 'a, DatabaseError>
{
let queue_index = match start {
Some(L1MessageStart::Index(i)) => i,
Some(L1MessageStart::Hash(ref h)) => {
// Lookup message by hash
let record = models::l1_message::Entity::find()
.filter(models::l1_message::Column::Hash.eq(h.to_vec()))
.one(self.get_connection())
.await?
.ok_or_else(|| DatabaseError::L1MessageNotFound(0))?;

record.queue_index as u64
}
None => 0,
};

Ok(models::l1_message::Entity::find()
.filter(models::l1_message::Column::QueueIndex.gte(queue_index))
.stream(self.get_connection())
.await?
.map(|res| Ok(res.map(Into::into)?)))
Expand All @@ -250,6 +268,24 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.map(Into::into))?)
}

/// Get the [`BlockInfo`] and optional [`BatchInfo`] for the provided block hash.
async fn get_l2_block_and_batch_info_by_hash(
&self,
block_hash: B256,
) -> Result<Option<(BlockInfo, Option<BatchInfo>)>, DatabaseError> {
tracing::trace!(target: "scroll::db", ?block_hash, "Fetching L2 block and batch info by hash from database.");
Ok(models::l2_block::Entity::find()
.filter(models::l2_block::Column::BlockHash.eq(block_hash.to_vec()))
.one(self.get_connection())
.await
.map(|x| {
x.map(|x| {
let (block_info, batch_info): (BlockInfo, Option<BatchInfo>) = x.into();
(block_info, batch_info)
})
})?)
}

/// Get a [`BlockInfo`] from the database by its block number.
async fn get_l2_block_info_by_number(
&self,
Expand Down Expand Up @@ -298,6 +334,18 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.map(|x| x.block_info()))?)
}

/// Get an iterator over all L2 blocks in the database starting from the most recent one.
async fn get_l2_blocks<'a>(
&'a self,
) -> Result<impl Stream<Item = Result<BlockInfo, DatabaseError>> + 'a, DatabaseError> {
tracing::trace!(target: "scroll::db", "Fetching L2 blocks from database.");
Ok(models::l2_block::Entity::find()
.order_by_desc(models::l2_block::Column::BlockNumber)
.stream(self.get_connection())
.await?
.map(|res| Ok(res.map(|res| res.block_info())?)))
}

/// Prepare the database on startup and return metadata used for other components in the
/// rollup-node.
///
Expand Down Expand Up @@ -346,6 +394,18 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.rows_affected)?)
}

/// Insert multiple blocks into the database.
async fn insert_blocks(
&self,
blocks: Vec<L2BlockInfoWithL1Messages>,
batch_info: Option<BatchInfo>,
) -> Result<(), DatabaseError> {
for block in blocks {
self.insert_block(block, batch_info).await?;
}
Ok(())
}

/// Insert a new block in the database.
async fn insert_block(
&self,
Expand Down Expand Up @@ -466,6 +526,18 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
}
}

/// This type defines the start of an L1 message stream.
///
/// It can either be an index, which is the queue index of the first message to return, or a hash,
/// which is the hash of the first message to return.
#[derive(Debug)]
pub enum L1MessageStart {
/// Start from the provided queue index.
Index(u64),
/// Start from the provided queue hash.
Hash(B256),
}

/// The result of [`DatabaseOperations::unwind`].
#[derive(Debug)]
pub struct UnwindResult {
Expand Down
1 change: 1 addition & 0 deletions crates/database/migration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ reth-chainspec.workspace = true
sea-orm = { workspace = true, features = ["sqlx-sqlite", "runtime-tokio-native-tls", "macros"] }
sha2 = "0.10.9"
tracing.workspace = true
reth-scroll-chainspec.workspace = true

[dependencies.sea-orm-migration]
version = "1.1.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/database/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl<MI: MigrationInfo + Send + Sync + 'static> MigratorTrait for Migrator<MI> {
Box::new(m20250304_125946_add_l1_msg_table::Migration),
Box::new(m20250408_132123_add_header_metadata::Migration),
Box::new(m20250408_150338_load_header_metadata::Migration::<MI>(Default::default())),
Box::new(m20250411_072004_add_l2_block::Migration),
Box::new(m20250411_072004_add_l2_block::Migration::<MI>(Default::default())),
Box::new(m20250616_223947_add_metadata::Migration),
]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use sea_orm::Statement;
use sea_orm_migration::{prelude::*, schema::*};

// TODO: migrate these to a constants module
Expand All @@ -23,7 +24,29 @@ impl MigrationTrait for Migration {
.col(big_unsigned_null(BatchCommit::FinalizedBlockNumber))
.to_owned(),
)
.await
.await?;

manager
.get_connection()
.execute(Statement::from_sql_and_values(
manager.get_database_backend(),
r#"
INSERT INTO batch_commit ("index", hash, block_number, block_timestamp, calldata, blob_hash, finalized_block_number)
VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
vec![
0u64.into(),
vec![0u8; HASH_LENGTH as usize].into(),
0u64.into(),
0u64.into(),
vec![].into(),
None::<Vec<u8>>.into(),
0u64.into(),
],
))
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
Expand Down
33 changes: 28 additions & 5 deletions crates/database/migration/src/m20250411_072004_add_l2_block.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use super::m20220101_000001_create_batch_commit_table::BatchCommit;
use super::{m20220101_000001_create_batch_commit_table::BatchCommit, MigrationInfo};

use sea_orm::Statement;
use sea_orm_migration::{prelude::*, schema::*};

#[derive(DeriveMigrationName)]
pub struct Migration;
pub struct Migration<MI>(pub std::marker::PhantomData<MI>);

impl<MI> MigrationName for Migration<MI> {
fn name(&self) -> &str {
sea_orm_migration::util::get_file_stem(file!())
}
}

#[async_trait::async_trait]
impl MigrationTrait for Migration {
impl<MI: MigrationInfo + Send + Sync> MigrationTrait for Migration<MI> {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Expand Down Expand Up @@ -35,7 +41,24 @@ impl MigrationTrait for Migration {
)
.to_owned(),
)
.await
.await?;

// Insert the genesis block.
let genesis_hash = MI::genesis_hash();

manager
.get_connection()
.execute(Statement::from_sql_and_values(
manager.get_database_backend(),
r#"
INSERT INTO l2_block (block_number, block_hash, batch_index, batch_hash)
VALUES (?, ?, ?, ?)
"#,
vec![0u64.into(), genesis_hash.to_vec().into(), 0u64.into(), vec![0u8; 32].into()],
))
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
Expand Down
19 changes: 19 additions & 0 deletions crates/database/migration/src/migration_info.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloy_primitives::{b256, B256};
use reth_scroll_chainspec::{SCROLL_MAINNET_GENESIS_HASH, SCROLL_SEPOLIA_GENESIS_HASH};

pub enum DataSource {
Url(String),
Expand All @@ -8,6 +9,7 @@ pub enum DataSource {
pub trait MigrationInfo {
fn data_source() -> Option<DataSource>;
fn data_hash() -> Option<B256>;
fn genesis_hash() -> B256;
}

impl MigrationInfo for () {
Expand All @@ -18,6 +20,11 @@ impl MigrationInfo for () {
fn data_hash() -> Option<B256> {
None
}

fn genesis_hash() -> B256 {
// Todo: Update
b256!("0xb5bd7381c6b550af0de40d6c490602574d76427c8cce17b54cb7917c323136f2")
}
}

/// The type implementing migration info for Mainnet.
Expand All @@ -33,6 +40,10 @@ impl MigrationInfo for ScrollMainnetMigrationInfo {
fn data_hash() -> Option<B256> {
Some(b256!("fa2746026ec9590e37e495cb20046e20a38fd0e7099abd2012640dddf6c88b25"))
}

fn genesis_hash() -> B256 {
SCROLL_MAINNET_GENESIS_HASH
}
}

pub struct ScrollMainnetTestMigrationInfo;
Expand All @@ -45,6 +56,10 @@ impl MigrationInfo for ScrollMainnetTestMigrationInfo {
fn data_hash() -> Option<B256> {
None
}

fn genesis_hash() -> B256 {
SCROLL_MAINNET_GENESIS_HASH
}
}

/// The type implementing migration info for Sepolia.
Expand All @@ -60,4 +75,8 @@ impl MigrationInfo for ScrollSepoliaMigrationInfo {
fn data_hash() -> Option<B256> {
Some(b256!("a02354c12ca0f918bf4768255af9ed13c137db7e56252348f304b17bb4088924"))
}

fn genesis_hash() -> B256 {
SCROLL_SEPOLIA_GENESIS_HASH
}
}
Loading
Loading