From b8d109a6d558cb1845078c64429a4198505bd213 Mon Sep 17 00:00:00 2001 From: Joe Parks <26990067+jowparks@users.noreply.github.com> Date: Fri, 2 May 2025 13:53:36 -0700 Subject: [PATCH] feat: add verification service for flashblocks with metrics tracking for block reorgs --- Cargo.lock | 1 + Cargo.toml | 1 + crates/flashblocks-rpc/Cargo.toml | 1 + crates/flashblocks-rpc/src/flashblocks.rs | 208 +++++++-- crates/flashblocks-rpc/src/lib.rs | 1 + crates/flashblocks-rpc/src/metrics.rs | 12 + crates/flashblocks-rpc/src/verification.rs | 478 +++++++++++++++++++++ crates/node/src/main.rs | 8 + 8 files changed, 685 insertions(+), 25 deletions(-) create mode 100644 crates/flashblocks-rpc/src/verification.rs diff --git a/Cargo.lock b/Cargo.lock index 71afd9c..b21189f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1497,6 +1497,7 @@ dependencies = [ "op-alloy-rpc-types-engine", "reqwest 0.11.27", "reth", + "reth-exex", "reth-optimism-chainspec", "reth-optimism-cli", "reth-optimism-evm", diff --git a/Cargo.toml b/Cargo.toml index 75ca867..4e41dd8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ reth-rpc-types-compat = { git = "https://github.com/paradigmxyz/reth", tag = "v1 reth-optimism-rpc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" } reth-optimism-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" } reth-optimism-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" } +reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.12" } # revm revm = { version = "22.0.1", default-features = false } diff --git a/crates/flashblocks-rpc/Cargo.toml b/crates/flashblocks-rpc/Cargo.toml index 6d8aa27..d3d2f67 100644 --- a/crates/flashblocks-rpc/Cargo.toml +++ b/crates/flashblocks-rpc/Cargo.toml @@ -26,6 +26,7 @@ reth-rpc-types-compat.workspace = true reth-optimism-rpc.workspace = true reth-optimism-evm.workspace = true reth-optimism-chainspec.workspace = true +reth-exex.workspace = true # revm revm.workspace = true diff --git a/crates/flashblocks-rpc/src/flashblocks.rs b/crates/flashblocks-rpc/src/flashblocks.rs index f92aadb..528fa57 100644 --- a/crates/flashblocks-rpc/src/flashblocks.rs +++ b/crates/flashblocks-rpc/src/flashblocks.rs @@ -368,29 +368,37 @@ fn get_and_set_txs_and_receipts( metadata: Metadata, ) -> Result, Box> { let mut diff_receipts: Vec = vec![]; - // Store tx transaction signed + let mut tx_hashes: Vec = vec![]; + + if let Some(existing_hashes) = cache.get::>(&format!("tx_hashes:{}", block_number)) + { + tx_hashes = existing_hashes; + } + for (idx, transaction) in block.body.transactions.iter().enumerate() { - // check if exists, if not update - let existing_tx = cache.get::(&transaction.tx_hash().to_string()); + let tx_hash = transaction.tx_hash().to_string(); + + // Add transaction hash to the ordered list if not already present + if !tx_hashes.contains(&tx_hash) { + tx_hashes.push(tx_hash.clone()); + } + + let existing_tx = cache.get::(&tx_hash); if existing_tx.is_none() { - if let Err(e) = cache.set(&transaction.tx_hash().to_string(), &transaction, Some(10)) { + if let Err(e) = cache.set(&tx_hash, &transaction, Some(10)) { error!("Failed to set transaction in cache: {}", e); continue; } - // update tx index if let Err(e) = cache.set(&format!("tx_idx:{}", transaction.tx_hash()), &idx, Some(10)) { error!("Failed to set transaction index in cache: {}", e); continue; } - // update tx count for each from address if let Ok(from) = transaction.recover_signer() { - // Get current tx count, default to 0 if not found let current_count = cache .get::(&format!("tx_count:{}:{}", from, block_number)) .unwrap_or(0); - // Increment tx count by 1 if let Err(e) = cache.set( &format!("tx_count:{}:{}", from, block_number), &(current_count + 1), @@ -399,7 +407,6 @@ fn get_and_set_txs_and_receipts( error!("Failed to set transaction count in cache: {}", e); } - // also keep track of sender of each transaction if let Err(e) = cache.set( &format!("tx_sender:{}", transaction.tx_hash()), &from, @@ -408,7 +415,6 @@ fn get_and_set_txs_and_receipts( error!("Failed to set transaction sender in cache: {}", e); } - // also keep track of the block number of each transaction if let Err(e) = cache.set( &format!("tx_block_number:{}", transaction.tx_hash()), &block_number, @@ -420,26 +426,16 @@ fn get_and_set_txs_and_receipts( } // TODO: move this into the transaction check - if metadata - .receipts - .contains_key(&transaction.tx_hash().to_string()) - { + if metadata.receipts.contains_key(&tx_hash) { // find receipt in metadata and set it in cache - let receipt = metadata - .receipts - .get(&transaction.tx_hash().to_string()) - .unwrap(); - if let Err(e) = cache.set( - &format!("receipt:{:?}", transaction.tx_hash().to_string()), - receipt, - Some(10), - ) { + let receipt = metadata.receipts.get(&tx_hash).unwrap(); + if let Err(e) = cache.set(&format!("receipt:{:?}", tx_hash), receipt, Some(10)) { error!("Failed to set receipt in cache: {}", e); continue; } // map receipt's block number as well if let Err(e) = cache.set( - &format!("receipt_block:{:?}", transaction.tx_hash().to_string()), + &format!("receipt_block:{:?}", tx_hash), &block_number, Some(10), ) { @@ -451,6 +447,10 @@ fn get_and_set_txs_and_receipts( } } + if let Err(e) = cache.set(&format!("tx_hashes:{}", block_number), &tx_hashes, Some(10)) { + error!("Failed to update transaction hashes list in cache: {}", e); + } + Ok(diff_receipts) } @@ -635,7 +635,7 @@ mod tests { payload_id: PayloadId::new([0; 8]), base: None, diff: delta2, - metadata: serde_json::to_value(metadata2).unwrap(), + metadata: serde_json::to_value(metadata2.clone()).unwrap(), } } @@ -833,4 +833,162 @@ mod tests { let highest = cache.get::("highest_payload_index").unwrap(); assert_eq!(highest, 0); } + + #[test] + fn test_tx_hash_list_storage_and_deduplication() { + let cache = Arc::new(Cache::default()); + let block_number = 1; + + let base = ExecutionPayloadBaseV1 { + parent_hash: Default::default(), + parent_beacon_block_root: Default::default(), + fee_recipient: Address::from_str("0x1234567890123456789012345678901234567890").unwrap(), + block_number, + gas_limit: 1000000, + timestamp: 1234567890, + prev_randao: Default::default(), + extra_data: Default::default(), + base_fee_per_gas: U256::from(1000), + }; + + let tx1 = Bytes::from_str("0x02f87483014a3482017e8459682f0084596830a98301f1d094b01866f195533de16eb929b73f87280693ca0cb480844e71d92dc001a0a658c18bdba29dd4022ee6640fdd143691230c12b3c8c86cf5c1a1f1682cc1e2a0248a28763541ebed2b87ecea63a7024b5c2b7de58539fa64c887b08f5faf29c1").unwrap(); + + let delta1 = ExecutionPayloadFlashblockDeltaV1 { + transactions: vec![tx1.clone()], + withdrawals: vec![], + state_root: Default::default(), + receipts_root: Default::default(), + logs_bloom: Default::default(), + gas_used: 21000, + block_hash: Default::default(), + }; + + let tx1_hash = + "0x3cbbc9a6811ac5b2a2e5780bdb67baffc04246a59f39e398be048f1b2d05460c".to_string(); + + let metadata1 = Metadata { + block_number, + receipts: { + let mut receipts = HashMap::default(); + receipts.insert( + tx1_hash.clone(), + OpReceipt::Legacy(Receipt { + status: true.into(), + cumulative_gas_used: 21000, + logs: vec![], + }), + ); + receipts + }, + new_account_balances: HashMap::default(), + }; + + let payload1 = FlashblocksPayloadV1 { + index: 0, + payload_id: PayloadId::new([0; 8]), + base: Some(base), + diff: delta1, + metadata: serde_json::to_value(metadata1).unwrap(), + }; + + process_payload(payload1, cache.clone()); + + let tx_hashes1 = cache + .get::>(&format!("tx_hashes:{}", block_number)) + .unwrap(); + assert_eq!(tx_hashes1.len(), 1); + assert_eq!(tx_hashes1[0], tx1_hash); + + let tx2 = Bytes::from_str("0xf8cd82016d8316e5708302c01c94f39635f2adf40608255779ff742afe13de31f57780b8646e530e9700000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000001bc16d674ec8000000000000000000000000000000000000000000000000000156ddc81eed2a36d68302948ba0a608703e79b22164f74523d188a11f81c25a65dd59535bab1cd1d8b30d115f3ea07f4cfbbad77a139c9209d3bded89091867ff6b548dd714109c61d1f8e7a84d14").unwrap(); + + let tx2_hash = + "0xa6155b295085d3b87a3c86e342fe11c3b22f9952d0d85d9d34d223b7d6a17cd8".to_string(); + + let delta2 = ExecutionPayloadFlashblockDeltaV1 { + transactions: vec![tx1.clone(), tx2.clone()], // Note tx1 is repeated + withdrawals: vec![], + state_root: B256::repeat_byte(0x1), + receipts_root: B256::repeat_byte(0x2), + logs_bloom: Default::default(), + gas_used: 42000, + block_hash: B256::repeat_byte(0x3), + }; + + let metadata2 = Metadata { + block_number, + receipts: { + let mut receipts = HashMap::default(); + receipts.insert( + tx1_hash.clone(), + OpReceipt::Legacy(Receipt { + status: true.into(), + cumulative_gas_used: 21000, + logs: vec![], + }), + ); + receipts.insert( + tx2_hash.clone(), + OpReceipt::Legacy(Receipt { + status: true.into(), + cumulative_gas_used: 42000, + logs: vec![], + }), + ); + receipts + }, + new_account_balances: HashMap::default(), + }; + + let payload2 = FlashblocksPayloadV1 { + index: 1, + payload_id: PayloadId::new([0; 8]), + base: None, + diff: delta2, + metadata: serde_json::to_value(metadata2.clone()).unwrap(), + }; + + process_payload(payload2, cache.clone()); + + let tx_hashes2 = cache + .get::>(&format!("tx_hashes:{}", block_number)) + .unwrap(); + assert_eq!( + tx_hashes2.len(), + 2, + "Should have 2 unique transaction hashes" + ); + assert_eq!(tx_hashes2[0], tx1_hash, "First hash should be tx1"); + assert_eq!(tx_hashes2[1], tx2_hash, "Second hash should be tx2"); + + let delta3 = ExecutionPayloadFlashblockDeltaV1 { + transactions: vec![tx2.clone(), tx1.clone()], // Different order + withdrawals: vec![], + state_root: B256::repeat_byte(0x1), + receipts_root: B256::repeat_byte(0x2), + logs_bloom: Default::default(), + gas_used: 42000, + block_hash: B256::repeat_byte(0x3), + }; + + let payload3 = FlashblocksPayloadV1 { + index: 2, + payload_id: PayloadId::new([0; 8]), + base: None, + diff: delta3, + metadata: serde_json::to_value(metadata2).unwrap(), // Same metadata + }; + + process_payload(payload3, cache.clone()); + + let tx_hashes3 = cache + .get::>(&format!("tx_hashes:{}", block_number)) + .unwrap(); + assert_eq!( + tx_hashes3.len(), + 2, + "Should still have 2 unique transaction hashes" + ); + assert_eq!(tx_hashes3[0], tx1_hash, "First hash should be tx1"); + assert_eq!(tx_hashes3[1], tx2_hash, "Second hash should be tx2"); + } } diff --git a/crates/flashblocks-rpc/src/lib.rs b/crates/flashblocks-rpc/src/lib.rs index 278dfa6..a79f839 100644 --- a/crates/flashblocks-rpc/src/lib.rs +++ b/crates/flashblocks-rpc/src/lib.rs @@ -2,6 +2,7 @@ pub mod cache; pub mod flashblocks; mod metrics; pub mod rpc; +pub mod verification; #[cfg(test)] mod integration; diff --git a/crates/flashblocks-rpc/src/metrics.rs b/crates/flashblocks-rpc/src/metrics.rs index d688e9a..e61030d 100644 --- a/crates/flashblocks-rpc/src/metrics.rs +++ b/crates/flashblocks-rpc/src/metrics.rs @@ -29,4 +29,16 @@ pub struct Metrics { #[metric(describe = "Number of flashblocks in a block")] pub flashblocks_in_block: Histogram, + + #[metric(describe = "Count of successful block verifications")] + pub block_verification_success: Counter, + + #[metric(describe = "Count of failed block verifications")] + pub block_verification_failure: Counter, + + #[metric(describe = "Count of blocks not found in cache during verification")] + pub block_verification_not_found: Counter, + + #[metric(describe = "Count of transaction count mismatches during verification")] + pub block_verification_tx_count_mismatch: Counter, } diff --git a/crates/flashblocks-rpc/src/verification.rs b/crates/flashblocks-rpc/src/verification.rs new file mode 100644 index 0000000..2a7fccb --- /dev/null +++ b/crates/flashblocks-rpc/src/verification.rs @@ -0,0 +1,478 @@ +use crate::cache::Cache; +use crate::metrics::Metrics; +use futures_util::TryStreamExt; +use op_alloy_consensus::OpTxEnvelope; +use reth::api::FullNodeComponents; +use reth::builder::NodeTypes; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_optimism_primitives::OpBlock; +use reth_optimism_primitives::OpPrimitives; +use reth_primitives::Block; +use std::sync::Arc; +use tracing::debug; +use tracing::{error, info, warn}; + +#[derive(Debug)] +pub enum VerificationResult { + Match, + BlockNotFound, + TransactionCountMismatch { + expected: usize, + actual: usize, + }, + TransactionMismatches { + count: usize, + }, + TransactionHashMismatch { + expected: Vec, + actual: Vec, + }, + TransactionHashListNotFound, +} + +pub struct VerificationService { + cache: Arc, + metrics: Metrics, +} + +impl VerificationService { + pub fn new(cache: Arc) -> Self { + Self { + cache, + metrics: Metrics::default(), + } + } + + pub fn verify_block(&self, block: &Block) -> VerificationResult { + let block_number = block.number; + + let cached_block = match self + .cache + .get::(&format!("block:{}", block_number)) + { + Some(cached_block) => cached_block, + None => { + error!( + "Block {} not found in cache during verification", + block_number + ); + self.metrics.block_verification_not_found.increment(1); + return VerificationResult::BlockNotFound; + } + }; + + let expected_txs = cached_block.body.transactions.len(); + let actual_txs = block.body.transactions.len(); + + if expected_txs != actual_txs { + error!( + "Transaction count mismatch for block {}: expected {}, got {}", + block_number, expected_txs, actual_txs + ); + self.metrics + .block_verification_tx_count_mismatch + .increment(1); + self.metrics.block_verification_failure.increment(1); + return VerificationResult::TransactionCountMismatch { + expected: expected_txs, + actual: actual_txs, + }; + } + + let expected_tx_hashes = self + .cache + .get::>(&format!("tx_hashes:{}", block_number)); + if expected_tx_hashes.is_none() { + error!( + "No transaction hash list found in cache for block {}", + block_number + ); + self.metrics.block_verification_failure.increment(1); + return VerificationResult::TransactionHashListNotFound; + } + let expected_tx_hashes = expected_tx_hashes.unwrap(); + + let actual_tx_hashes: Vec = block + .body + .transactions + .iter() + .map(|tx| tx.tx_hash().to_string()) + .collect(); + + if expected_tx_hashes != actual_tx_hashes { + error!( + "Transaction hash mismatch for block {}: expected {:?}, got {:?}", + block_number, expected_tx_hashes, actual_tx_hashes + ); + self.metrics.block_verification_failure.increment(1); + return VerificationResult::TransactionHashMismatch { + expected: expected_tx_hashes, + actual: actual_tx_hashes, + }; + } + + let mut mismatch_count = 0; + for (index, (expected_tx, actual_tx)) in cached_block + .body + .transactions + .iter() + .zip(block.body.transactions.iter()) + .enumerate() + { + let expected_hash = expected_tx.tx_hash(); + let actual_hash = actual_tx.tx_hash(); + + if expected_hash != actual_hash { + error!( + "Transaction mismatch at index {} in block {}: expected {:?}, got {:?}", + index, block_number, expected_hash, actual_hash + ); + mismatch_count += 1; + } + } + + if mismatch_count > 0 { + self.metrics.block_verification_failure.increment(1); + error!( + block = block.number, + mismatch_count = mismatch_count, + "Found {} transaction mismatches during verification", + mismatch_count + ); + return VerificationResult::TransactionMismatches { + count: mismatch_count, + }; + } + + self.metrics.block_verification_success.increment(1); + VerificationResult::Match + } +} + +impl Clone for VerificationService { + fn clone(&self) -> Self { + Self { + cache: Arc::clone(&self.cache), + metrics: self.metrics.clone(), + } + } +} + +pub async fn flashblocks_verification_exex< + Node: FullNodeComponents>, +>( + mut ctx: ExExContext, + cache: Arc, +) -> eyre::Result<()> { + info!("FlashblocksVerification ExEx started"); + + let verification_service = VerificationService::new(cache); + while let Some(notification) = ctx.notifications.try_next().await? { + match ¬ification { + ExExNotification::ChainCommitted { new } => { + debug!(committed_chain = ?new.range(), "Verifying committed chain"); + for block in new.blocks().values() { + verification_service.verify_block(&block.clone().into_block()); + } + } + ExExNotification::ChainReorged { old: _, new } => { + warn!(new_chain = ?new.range(), "Verifying reorged chain"); + for block in new.blocks().values() { + verification_service.verify_block(&block.clone().into_block()); + } + } + ExExNotification::ChainReverted { old: _ } => { + warn!("Chain reverted, previously verified flashblock transactions are now invalid unless they are in the new chain"); + } + } + + if let Some(committed_chain) = notification.committed_chain() { + ctx.events + .send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?; + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::{Sealable, B256}; + use op_alloy_consensus::TxDeposit; + use reth_optimism_primitives::{OpBlock, OpTransactionSigned}; + use reth_primitives::{Block, BlockBody, Header}; + + fn create_mock_tx(source_hash: u8) -> OpTransactionSigned { + let source_hash = B256::from_slice( + &[0; 31][..31] + .iter() + .chain(&[source_hash]) + .copied() + .collect::>(), + ); + let tx = TxDeposit { + source_hash, + ..Default::default() + }; + OpTransactionSigned::Deposit(tx.seal_slow()) + } + + fn create_mock_op_block(number: u64, txs: Vec) -> OpBlock { + let header = Header { + number, + ..Default::default() + }; + + let body = BlockBody { + transactions: txs, + ommers: vec![], + withdrawals: None, + }; + + OpBlock { header, body } + } + + fn create_mock_block(number: u64, txs: Vec) -> Block { + let header = Header { + number, + ..Default::default() + }; + + let body = BlockBody { + transactions: txs + .into_iter() + .map(|tx| OpTransactionSigned::from(tx)) + .collect(), + ommers: vec![], + withdrawals: None, + }; + + Block { header, body } + } + + fn setup_test_cache() -> Arc { + Arc::new(Cache::default()) + } + + #[test] + fn test_valid_block_match() { + let cache = setup_test_cache(); + let verification_service = VerificationService::new(Arc::clone(&cache)); + + let block_number = 1; + let tx1 = create_mock_tx(1); + let tx2 = create_mock_tx(2); + + let cached_block = create_mock_op_block(block_number, vec![tx1.clone(), tx2.clone()]); + cache + .set(&format!("block:{}", block_number), &cached_block, None) + .unwrap(); + + let tx_hashes = vec![tx1.hash().to_string(), tx2.hash().to_string()]; + cache + .set(&format!("tx_hashes:{}", block_number), &tx_hashes, None) + .unwrap(); + + let final_block = create_mock_block(block_number, vec![tx1.clone(), tx2.clone()]); + + let result = verification_service.verify_block(&final_block); + + match result { + VerificationResult::Match => {} // Test passed + _ => panic!("Expected VerificationResult::Match, got {:?}", result), + } + } + + #[test] + fn test_transaction_in_cache_not_in_block() { + let cache = setup_test_cache(); + let verification_service = VerificationService::new(Arc::clone(&cache)); + + let block_number = 1; + let tx1 = create_mock_tx(1); + let tx2 = create_mock_tx(2); + let tx3 = create_mock_tx(3); + + let cached_block = + create_mock_op_block(block_number, vec![tx1.clone(), tx2.clone(), tx3.clone()]); + cache + .set(&format!("block:{}", block_number), &cached_block, None) + .unwrap(); + + let tx_hashes = vec![ + tx1.hash().to_string(), + tx2.hash().to_string(), + tx3.hash().to_string(), + ]; + cache + .set(&format!("tx_hashes:{}", block_number), &tx_hashes, None) + .unwrap(); + + let final_block = create_mock_block(block_number, vec![tx1.clone(), tx2.clone()]); + + let result = verification_service.verify_block(&final_block); + + match result { + VerificationResult::TransactionCountMismatch { expected, actual } => { + assert_eq!(expected, 3); + assert_eq!(actual, 2); + } + _ => panic!("Expected TransactionCountMismatch, got {:?}", result), + } + } + + #[test] + fn test_transaction_in_block_not_in_cache() { + let cache = setup_test_cache(); + let verification_service = VerificationService::new(Arc::clone(&cache)); + + let block_number = 1; + let tx1 = create_mock_tx(1); + let tx2 = create_mock_tx(2); + let tx3 = create_mock_tx(3); + + let cached_block = create_mock_op_block(block_number, vec![tx1.clone(), tx2.clone()]); + cache + .set(&format!("block:{}", block_number), &cached_block, None) + .unwrap(); + + let tx_hashes = vec![tx1.hash().to_string(), tx2.hash().to_string()]; + cache + .set(&format!("tx_hashes:{}", block_number), &tx_hashes, None) + .unwrap(); + + let final_block = + create_mock_block(block_number, vec![tx1.clone(), tx2.clone(), tx3.clone()]); + + let result = verification_service.verify_block(&final_block); + + match result { + VerificationResult::TransactionCountMismatch { expected, actual } => { + assert_eq!(expected, 2); + assert_eq!(actual, 3); + } + _ => panic!("Expected TransactionCountMismatch, got {:?}", result), + } + } + + #[test] + fn test_cached_block_not_present() { + let cache = setup_test_cache(); + let verification_service = VerificationService::new(Arc::clone(&cache)); + + let block_number = 1; + let tx1 = create_mock_tx(1); + let tx2 = create_mock_tx(2); + + let final_block = create_mock_block(block_number, vec![tx1.clone(), tx2.clone()]); + + let result = verification_service.verify_block(&final_block); + + match result { + VerificationResult::BlockNotFound => {} // Test passed + _ => panic!("Expected BlockNotFound, got {:?}", result), + } + } + + #[test] + fn test_transaction_hash_mismatch() { + let cache = setup_test_cache(); + let verification_service = VerificationService::new(Arc::clone(&cache)); + + let block_number = 1; + let tx1 = create_mock_tx(1); + let tx2 = create_mock_tx(2); + let tx3 = create_mock_tx(3); // Different transaction for the block + + let cached_block = create_mock_op_block(block_number, vec![tx1.clone(), tx2.clone()]); + cache + .set(&format!("block:{}", block_number), &cached_block, None) + .unwrap(); + + let tx_hashes = vec![tx1.hash().to_string(), tx2.hash().to_string()]; + cache + .set(&format!("tx_hashes:{}", block_number), &tx_hashes, None) + .unwrap(); + + let final_block = create_mock_block(block_number, vec![tx1.clone(), tx3.clone()]); + + let result = verification_service.verify_block(&final_block); + + match result { + VerificationResult::TransactionHashMismatch { expected, actual } => { + assert_eq!(expected.len(), 2); + assert_eq!(actual.len(), 2); + assert_eq!(expected[1], tx2.hash().to_string()); + assert_eq!(actual[1], tx3.hash().to_string()); + } + _ => panic!("Expected TransactionHashMismatch, got {:?}", result), + } + } + + #[test] + fn test_transaction_hash_list_not_found() { + let cache = setup_test_cache(); + let verification_service = VerificationService::new(Arc::clone(&cache)); + + let block_number = 1; + let tx1 = create_mock_tx(1); + let tx2 = create_mock_tx(2); + + let cached_block = create_mock_op_block(block_number, vec![tx1.clone(), tx2.clone()]); + cache + .set(&format!("block:{}", block_number), &cached_block, None) + .unwrap(); + + let final_block = create_mock_block(block_number, vec![tx1.clone(), tx2.clone()]); + + let result = verification_service.verify_block(&final_block); + + match result { + VerificationResult::TransactionHashListNotFound => {} // Test passed + _ => panic!("Expected TransactionHashListNotFound, got {:?}", result), + } + } + + #[test] + fn test_all_transaction_hashes_mismatch() { + let cache = setup_test_cache(); + let verification_service = VerificationService::new(Arc::clone(&cache)); + + let block_number = 1; + let tx1 = create_mock_tx(1); + let tx2 = create_mock_tx(2); + + let different_tx1 = create_mock_tx(3); + let different_tx2 = create_mock_tx(4); + + let cached_block = create_mock_op_block(block_number, vec![tx1.clone(), tx2.clone()]); + cache + .set(&format!("block:{}", block_number), &cached_block, None) + .unwrap(); + + let tx_hashes = vec![tx1.hash().to_string(), tx2.hash().to_string()]; + cache + .set(&format!("tx_hashes:{}", block_number), &tx_hashes, None) + .unwrap(); + + let final_block = create_mock_block( + block_number, + vec![different_tx1.clone(), different_tx2.clone()], + ); + + let result = verification_service.verify_block(&final_block); + + match result { + VerificationResult::TransactionHashMismatch { expected, actual } => { + assert_eq!(expected.len(), 2); + assert_eq!(actual.len(), 2); + assert_eq!(expected[0], tx1.hash().to_string()); + assert_eq!(expected[1], tx2.hash().to_string()); + assert_eq!(actual[0], different_tx1.hash().to_string()); + assert_eq!(actual[1], different_tx2.hash().to_string()); + } + _ => panic!("Expected TransactionHashMismatch, got {:?}", result), + } + } +} diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 34c7786..5979c40 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -1,3 +1,4 @@ +use base_reth_flashblocks_rpc::verification::flashblocks_verification_exex; use base_reth_flashblocks_rpc::{cache::Cache, flashblocks::FlashblocksClient, rpc::EthApiExt}; use std::sync::Arc; use std::time::Duration; @@ -33,12 +34,19 @@ fn main() { let mut flashblocks_client = FlashblocksClient::new(Arc::clone(&cache)); let cache_clone = Arc::clone(&cache); + let verification_cache_clone = Arc::clone(&cache); let chain_spec = builder.config().chain.clone(); let handle = builder .with_types_and_provider::>() .with_components(op_node.components()) .with_add_ons(op_node.add_ons()) .on_component_initialized(move |_ctx| Ok(())) + .install_exex("flashblocks-verification", async move |ctx| { + Ok(flashblocks_verification_exex( + ctx, + Arc::clone(&verification_cache_clone), + )) + }) .extend_rpc_modules(move |ctx| { let api_ext = EthApiExt::new( ctx.registry.eth_api().clone(),