Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
chore(concurrency): implement validate method in worker logic (#1903)
Browse files Browse the repository at this point in the history
  • Loading branch information
noaov1 authored May 21, 2024
1 parent 4bdfda2 commit ca6b9f0
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 17 deletions.
47 changes: 34 additions & 13 deletions crates/blockifier/src/concurrency/worker_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod test;
pub struct ExecutionTaskOutput {
pub reads: StateMaps,
pub writes: StateMaps,
pub contract_classes: ContractClassMapping,
pub visited_pcs: HashMap<ClassHash, HashSet<usize>>,
pub result: TransactionExecutionResult<TransactionExecutionInfo>,
}
Expand Down Expand Up @@ -90,32 +91,46 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
tx.execute_raw(&mut transactional_state, &self.block_context, charge_fee, validate);

if execution_result.is_ok() {
let class_hash_to_class = transactional_state.class_hash_to_class.borrow();
// TODO(Noa, 15/05/2024): use `tx_versioned_state` when we add support to transactional
// versioned state.
self.state
.pin_version(tx_index)
.apply_writes(&transactional_state.cache.borrow().writes, &class_hash_to_class);
self.state.pin_version(tx_index).apply_writes(
&transactional_state.cache.borrow().writes,
&transactional_state.class_hash_to_class.borrow(),
);
}

// Write the transaction execution outputs.
let tx_reads_writes = transactional_state.cache.take();
let class_hash_to_class = transactional_state.class_hash_to_class.take();
// In case of a failed transaction, we don't record its writes and visited pcs.
let (writes, visited_pcs) = match execution_result {
Ok(_) => (tx_reads_writes.writes, transactional_state.visited_pcs),
Err(_) => (StateMaps::default(), HashMap::default()),
let (writes, contract_classes, visited_pcs) = match execution_result {
Ok(_) => (tx_reads_writes.writes, class_hash_to_class, transactional_state.visited_pcs),
Err(_) => (StateMaps::default(), HashMap::default(), HashMap::default()),
};
let mut execution_output = lock_mutex_in_array(&self.execution_outputs, tx_index);
*execution_output = Some(ExecutionTaskOutput {
reads: tx_reads_writes.initial_reads,
writes,
contract_classes,
visited_pcs,
result: execution_result,
});
}

fn validate(&self, _tx_index: TxIndex) -> Task {
todo!();
fn validate(&self, tx_index: TxIndex) -> Task {
let tx_versioned_state = self.state.pin_version(tx_index);
let execution_output = lock_mutex_in_array(&self.execution_outputs, tx_index);
let execution_output = execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR);
let reads = &execution_output.reads;
let reads_valid = tx_versioned_state.validate_reads(reads);

let aborted = !reads_valid && self.scheduler.try_validation_abort(tx_index);
if aborted {
tx_versioned_state
.delete_writes(&execution_output.writes, &execution_output.contract_classes)
}

self.scheduler.finish_validation(tx_index, aborted)
}

/// Commits a transaction.
Expand All @@ -127,18 +142,20 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
/// for the transaction in the block.
/// If there is room: we fix the call info, update the sequencer balance and
/// commit the transaction. Otherwise (execution failed, no room), we don't commit.
pub fn commit_tx(&self, tx_index: TxIndex) -> StateResult<bool> {
// TODO(Meshi, 01/06/2024): Remove dead code.
#[allow(dead_code)]
fn commit_tx(&self, tx_index: TxIndex) -> StateResult<bool> {
let execution_output = lock_mutex_in_array(&self.execution_outputs, tx_index);

let tx = &self.chunk[tx_index];
let tx_versioned_state = self.state.pin_version(tx_index);

let read_set = &execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).reads;
let validate_reads = tx_versioned_state.validate_reads(read_set);
let reads = &execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).reads;
let reads_valid = tx_versioned_state.validate_reads(reads);
drop(execution_output);

// First, re-validate the transaction.
if !validate_reads {
if !reads_valid {
// Revalidate failed: re-execute the transaction, and commit.
// TODO(Meshi, 01/06/2024): Delete the transaction writes.
self.execute_tx(tx_index);
Expand Down Expand Up @@ -192,6 +209,10 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
}
}

// Utilities.

// TODO(Meshi, 01/06/2024): Remove dead code.
#[allow(dead_code)]
fn add_fee_to_sequencer_balance(
fee_token_address: ContractAddress,
tx_versioned_state: &VersionedStateProxy<impl StateReader>,
Expand Down
106 changes: 104 additions & 2 deletions crates/blockifier/src/concurrency/worker_logic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use starknet_api::{contract_address, patricia_key, stark_felt};
use super::WorkerExecutor;
use crate::abi::abi_utils::get_fee_token_var_address;
use crate::abi::sierra_types::next_storage_key;
use crate::concurrency::scheduler::TransactionStatus;
use crate::concurrency::scheduler::{Task, TransactionStatus};
use crate::concurrency::test_utils::safe_versioned_state_for_testing;
use crate::context::BlockContext;
use crate::state::cached_state::StateMaps;
Expand Down Expand Up @@ -119,7 +119,7 @@ fn test_worker_execute() {
// Both in write and read sets, only the account balance appear, and not the sequencer balance.
// This is because when executing transaction in concurrency mode on, we manually remove the
// writes and reads to and from the sequencer balance (to avoid the inevitable dependency
// between all the transactions)
// between all the transactions).
let writes = StateMaps {
nonces: HashMap::from([(account_address, nonce!(1_u8))]),
storage: HashMap::from([
Expand Down Expand Up @@ -193,3 +193,105 @@ fn test_worker_execute() {
assert_eq!(*worker_executor.scheduler.get_tx_status(tx_index), TransactionStatus::Executed);
}
}

#[test]
fn test_worker_validate() {
// Settings.
let concurrency_mode = true;
let block_context =
BlockContext::create_for_account_testing_with_concurrency_mode(concurrency_mode);

let account_contract = FeatureContract::AccountWithoutValidations(CairoVersion::Cairo1);
let test_contract = FeatureContract::TestContract(CairoVersion::Cairo0);
let chain_info = &block_context.chain_info;

// Create the state.
let state_reader =
test_state_reader(chain_info, BALANCE, &[(account_contract, 1), (test_contract, 1)]);
let safe_versioned_state = safe_versioned_state_for_testing(state_reader);

// Create transactions.
let test_contract_address = test_contract.get_instance_address(0);
let account_address = account_contract.get_instance_address(0);
let nonce_manager = &mut NonceManager::default();
let storage_value0 = stark_felt!(93_u8);
let storage_value1 = stark_felt!(39_u8);
let storage_key = storage_key!(1993_u16);

// Both transactions change the same storage key.
let account_tx0 = account_invoke_tx(invoke_tx_args! {
sender_address: account_address,
calldata: create_calldata(
test_contract_address,
"test_storage_read_write",
&[*storage_key.0.key(),storage_value0 ], // Calldata: address, value.
),
max_fee: Fee(MAX_FEE),
nonce: nonce_manager.next(account_address)
});

let account_tx1 = account_invoke_tx(invoke_tx_args! {
sender_address: account_address,
calldata: create_calldata(
test_contract_address,
"test_storage_read_write",
&[*storage_key.0.key(),storage_value1 ], // Calldata: address, value.
),
max_fee: Fee(MAX_FEE),
nonce: nonce_manager.next(account_address)

});

// Concurrency settings.
let txs = [account_tx0, account_tx1]
.into_iter()
.map(Transaction::AccountTransaction)
.collect::<Vec<Transaction>>();

let worker_executor = WorkerExecutor::new(safe_versioned_state.clone(), &txs, block_context);

// Creates 2 active tasks.
worker_executor.scheduler.next_task();
worker_executor.scheduler.next_task();

// Execute transactions in the wrong order, making the first execution invalid.
worker_executor.execute(1);
worker_executor.execute(0);

// Creates 2 active tasks.
worker_executor.scheduler.next_task();
worker_executor.scheduler.next_task();

// Validate succeeds.
let tx_index = 0;
let next_task = worker_executor.validate(tx_index);
assert_eq!(next_task, Task::NoTask);
// Verify writes exist in state.
assert_eq!(
safe_versioned_state
.pin_version(tx_index + 1)
.get_storage_at(test_contract_address, storage_key)
.unwrap(),
storage_value0
);
// No status change.
assert_eq!(*worker_executor.scheduler.get_tx_status(tx_index), TransactionStatus::Executed);

// Validate failed. Invoke 2 failed validations; only the first leads to a re-execution.
let tx_index = 1;
let next_task1 = worker_executor.validate(tx_index);
assert_eq!(next_task1, Task::ExecutionTask(tx_index));
// Verify writes were removed.
assert_eq!(
safe_versioned_state
.pin_version(tx_index + 1)
.get_storage_at(test_contract_address, storage_key)
.unwrap(),
storage_value0
);
// Verify status change.
assert_eq!(*worker_executor.scheduler.get_tx_status(tx_index), TransactionStatus::Executing);

let next_task2 = worker_executor.validate(tx_index);
assert_eq!(next_task2, Task::NoTask);
}
4 changes: 2 additions & 2 deletions crates/blockifier/src/transaction/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,9 @@ impl TransactionResources {
.expect("This conversion should not fail as the value is a converted usize."),
),
]));
let revrted_steps_to_add = if with_reverted_steps { self.n_reverted_steps } else { 0 };
let reverted_steps_to_add = if with_reverted_steps { self.n_reverted_steps } else { 0 };
*resources.0.get_mut(abi_constants::N_STEPS_RESOURCE).unwrap_or(&mut 0) +=
revrted_steps_to_add;
reverted_steps_to_add;
resources
}

Expand Down

0 comments on commit ca6b9f0

Please sign in to comment.