Skip to content

feat: handle batch reverts #187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 77 additions & 5 deletions crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,14 @@ mod test {
block_number += 1;
}

// Fetch the highest block for the batch hash and verify number.
let highest_block_info =
db.get_highest_block_for_batch_hash(batch_info.hash).await.unwrap().unwrap();
assert_eq!(highest_block_info.number, block_number - 1);

// Fetch the highest block for the batch and verify number.
let highest_block_info =
db.get_highest_block_for_batch(batch_info.hash).await.unwrap().unwrap();
db.get_highest_block_for_batch_index(batch_info.index).await.unwrap().unwrap();
assert_eq!(highest_block_info.number, block_number - 1);
}

Expand Down Expand Up @@ -211,9 +216,14 @@ mod test {
block_number += 1;
}

// Fetch the highest block for the batch and verify number.
// Fetch the highest block for the batch hash and verify number.
let highest_block_info =
db.get_highest_block_for_batch(second_batch_info.hash).await.unwrap().unwrap();
db.get_highest_block_for_batch_hash(second_batch_info.hash).await.unwrap().unwrap();
assert_eq!(highest_block_info.number, block_number - 1);

// Fetch the highest block for the batch index and verify number.
let highest_block_info =
db.get_highest_block_for_batch_index(second_batch_info.index).await.unwrap().unwrap();
assert_eq!(highest_block_info.number, block_number - 1);
}

Expand Down Expand Up @@ -480,7 +490,7 @@ mod test {
}

#[tokio::test]
async fn test_delete_l2_blocks_gt() {
async fn test_delete_l2_blocks_gt_block_number() {
// Set up the test database.
let db = setup_test_db().await;

Expand All @@ -499,7 +509,7 @@ mod test {
}

// Delete blocks with number > 405
let deleted_count = db.delete_l2_blocks_gt(405).await.unwrap();
let deleted_count = db.delete_l2_blocks_gt_block_number(405).await.unwrap();
assert_eq!(deleted_count, 4); // Blocks 406, 407, 408, 409

// Verify remaining blocks still exist
Expand All @@ -515,6 +525,68 @@ mod test {
}
}

#[tokio::test]
async fn test_delete_l2_blocks_gt_batch_index() {
// Set up the test database.
let db = setup_test_db().await;

// Generate unstructured bytes.
let mut bytes = [0u8; 1024];
rand::rng().fill(bytes.as_mut_slice());
let mut u = Unstructured::new(&bytes);

// Insert multiple batches
for i in 100..110 {
let batch_data = BatchCommitData {
index: i,
calldata: Arc::new(vec![].into()),
..Arbitrary::arbitrary(&mut u).unwrap()
};
db.insert_batch(batch_data).await.unwrap();
}

// Insert L2 blocks with different batch indices
for i in 100..110 {
let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap();
let batch_info: BatchInfo = batch_data.into();

let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() };
let l2_block = L2BlockInfoWithL1Messages { block_info, l1_messages: vec![] };

db.insert_block(l2_block, Some(batch_info)).await.unwrap();
}

// Insert some blocks without batch index (should not be deleted)
for i in 0..3 {
let block_info = BlockInfo { number: 600 + i, hash: B256::arbitrary(&mut u).unwrap() };
let l2_block = L2BlockInfoWithL1Messages { block_info, l1_messages: vec![] };

db.insert_block(l2_block, None).await.unwrap();
}

// Delete L2 blocks with batch index > 105
let deleted_count = db.delete_l2_blocks_gt_batch_index(105).await.unwrap();
assert_eq!(deleted_count, 4); // Blocks with batch indices 106, 107, 108, 109

// Verify remaining blocks with batch index <= 105 still exist
for i in 100..=105 {
let block = db.get_l2_block_info_by_number(500 + i).await.unwrap();
assert!(block.is_some());
}

// Verify deleted blocks with batch index > 105 no longer exist
for i in 106..110 {
let block = db.get_l2_block_info_by_number(500 + i).await.unwrap();
assert!(block.is_none());
}

// Verify blocks without batch index are still there (not affected by batch index filter)
for i in 0..3 {
let block = db.get_l2_block_info_by_number(600 + i).await.unwrap();
assert!(block.is_some());
}
}

#[tokio::test]
async fn test_insert_block_with_l1_messages() {
// Set up the test database.
Expand Down
67 changes: 59 additions & 8 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{models, DatabaseError};
use crate::DatabaseConnectionProvider;

use alloy_primitives::B256;
use futures::{Stream, StreamExt};
use rollup_node_primitives::{
Expand All @@ -26,6 +27,8 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
models::batch_commit::Column::Hash,
models::batch_commit::Column::BlockNumber,
models::batch_commit::Column::BlockTimestamp,
models::batch_commit::Column::Calldata,
models::batch_commit::Column::BlobHash,
models::batch_commit::Column::FinalizedBlockNumber,
])
.to_owned(),
Expand Down Expand Up @@ -132,7 +135,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
}

/// Delete all [`BatchCommitData`]s with a block number greater than the provided block number.
async fn delete_batches_gt(&self, block_number: u64) -> Result<u64, DatabaseError> {
async fn delete_batches_gt_block_number(
&self,
block_number: u64,
) -> Result<u64, DatabaseError> {
tracing::trace!(target: "scroll::db", block_number, "Deleting batch inputs greater than block number.");
Ok(models::batch_commit::Entity::delete_many()
.filter(models::batch_commit::Column::BlockNumber.gt(block_number as i64))
Expand All @@ -141,6 +147,16 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.rows_affected)?)
}

/// Delete all [`BatchCommitData`]s with a batch index greater than the provided index.
async fn delete_batches_gt_batch_index(&self, batch_index: u64) -> Result<u64, DatabaseError> {
tracing::trace!(target: "scroll::db", batch_index, "Deleting batch inputs greater than batch index.");
Ok(models::batch_commit::Entity::delete_many()
.filter(models::batch_commit::Column::Index.gt(batch_index as i64))
.exec(self.get_connection())
.await
.map(|x| x.rows_affected)?)
}

/// Get an iterator over all [`BatchCommitData`]s in the database.
async fn get_batches<'a>(
&'a self,
Expand Down Expand Up @@ -327,7 +343,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.get_batch_by_index(batch_info.index - 1)
.await?
.expect("Batch info must be present due to database query arguments");
let l2_block = self.get_highest_block_for_batch(previous_batch.hash).await?;
let l2_block = self.get_highest_block_for_batch_hash(previous_batch.hash).await?;
(l2_block, Some(batch.block_number))
} else {
(None, None)
Expand All @@ -337,7 +353,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
}

/// Delete all L2 blocks with a block number greater than the provided block number.
async fn delete_l2_blocks_gt(&self, block_number: u64) -> Result<u64, DatabaseError> {
async fn delete_l2_blocks_gt_block_number(
&self,
block_number: u64,
) -> Result<u64, DatabaseError> {
tracing::trace!(target: "scroll::db", block_number, "Deleting L2 blocks greater than provided block number.");
Ok(models::l2_block::Entity::delete_many()
.filter(models::l2_block::Column::BlockNumber.gt(block_number as i64))
Expand All @@ -346,6 +365,23 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.rows_affected)?)
}

/// Delete all L2 blocks with a batch index greater than the batch index.
async fn delete_l2_blocks_gt_batch_index(
&self,
batch_index: u64,
) -> Result<u64, DatabaseError> {
tracing::trace!(target: "scroll::db", batch_index, "Deleting L2 blocks greater than provided batch index.");
Ok(models::l2_block::Entity::delete_many()
.filter(
Condition::all()
.add(models::l2_block::Column::BatchIndex.is_not_null())
.add(models::l2_block::Column::BatchIndex.gt(batch_index as i64)),
)
.exec(self.get_connection())
.await
.map(|x| x.rows_affected)?)
}

/// Insert a new block in the database.
async fn insert_block(
&self,
Expand All @@ -365,6 +401,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.on_conflict(
OnConflict::column(models::l2_block::Column::BlockNumber)
.update_columns([
models::l2_block::Column::BlockHash,
models::l2_block::Column::BatchHash,
models::l2_block::Column::BatchIndex,
])
Expand Down Expand Up @@ -396,7 +433,7 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {

/// Returns the highest L2 block originating from the provided `batch_hash` or the highest block
/// for the batch's index.
async fn get_highest_block_for_batch(
async fn get_highest_block_for_batch_hash(
&self,
batch_hash: B256,
) -> Result<Option<BlockInfo>, DatabaseError> {
Expand All @@ -420,14 +457,28 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
}
}

/// Returns the highest L2 block originating from the provided `batch_index` or the highest
/// block for the batch's index.
async fn get_highest_block_for_batch_index(
&self,
batch_index: u64,
) -> Result<Option<BlockInfo>, DatabaseError> {
Ok(models::l2_block::Entity::find()
.filter(models::l2_block::Column::BatchIndex.lte(batch_index))
.order_by_desc(models::l2_block::Column::BlockNumber)
.one(self.get_connection())
.await?
.map(|model| model.block_info()))
}

/// Unwinds the indexer by deleting all indexed data greater than the provided L1 block number.
async fn unwind(
&self,
genesis_hash: B256,
l1_block_number: u64,
) -> Result<UnwindResult, DatabaseError> {
// delete batch inputs and l1 messages
let batches_removed = self.delete_batches_gt(l1_block_number).await?;
let batches_removed = self.delete_batches_gt_block_number(l1_block_number).await?;
let deleted_messages = self.delete_l1_messages_gt(l1_block_number).await?;

// filter and sort the executed L1 messages
Expand All @@ -441,10 +492,10 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
if let Some(msg) = removed_executed_l1_messages.first() {
let l2_reorg_block_number = msg
.l2_block_number
.expect("we guarantee that this is Some(u64) due to the filter above") -
1;
.expect("we guarantee that this is Some(u64) due to the filter above")
.saturating_sub(1);
let l2_block_info = self.get_l2_block_info_by_number(l2_reorg_block_number).await?;
self.delete_l2_blocks_gt(l2_reorg_block_number).await?;
self.delete_l2_blocks_gt_block_number(l2_reorg_block_number).await?;
(Some(msg.transaction.queue_index), l2_block_info)
} else {
(None, None)
Expand Down
7 changes: 7 additions & 0 deletions crates/derivation-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ where
}
None
}

/// Flushes all the data in the pipeline.
pub fn flush(&mut self) {
self.attributes_queue.clear();
self.batch_queue.clear();
self.pipeline_futures = FuturesOrdered::new();
}
}

impl<P> Stream for DerivationPipeline<P>
Expand Down
13 changes: 13 additions & 0 deletions crates/engine/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ where
self.block_building_duration = block_building_duration;
}

/// Clear the l1 attributes queue.
pub fn clear_l1_payload_attributes(&mut self) {
// clear the L1 attributes queue.
self.l1_payload_attributes.clear();

// drop the engine future if it is a L1 consolidation.
if let Some(MeteredFuture { fut: EngineFuture::L1Consolidation(_), .. }) =
self.engine_future
{
self.engine_future.take();
}
}

/// Handles a block import request by adding it to the queue and waking up the driver.
pub fn handle_block_import(&mut self, block_with_peer: NewBlockWithPeer) {
tracing::trace!(target: "scroll::engine", ?block_with_peer, "new block import request received");
Expand Down
10 changes: 8 additions & 2 deletions crates/indexer/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ use rollup_node_primitives::{BatchInfo, BlockInfo, L2BlockInfoWithL1Messages};
/// An event emitted by the indexer.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IndexerEvent {
/// A `BatchCommit` event has been indexed returning the batch info.
BatchCommitIndexed(BatchInfo),
/// A `BatchCommit` event has been indexed returning the batch info and the L2 block info to
/// revert to due to a batch revert.
BatchCommitIndexed {
/// The batch info.
batch_info: BatchInfo,
/// The safe L2 block info.
safe_head: Option<BlockInfo>,
},
/// A `BatchFinalization` event has been indexed returning the batch hash and new finalized L2
/// block.
BatchFinalizationIndexed(B256, Option<BlockInfo>),
Expand Down
Loading