From 8996aa23de9219e7b45274fea92c51ed30dfa9b1 Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Tue, 8 Jul 2025 16:30:03 +0530 Subject: [PATCH 1/5] skip unncessarily verifying erc1271 signature + remove stray dbg --- aa-core/src/signer.rs | 59 ++++++++++++++++++++----------------------- 1 file changed, 27 insertions(+), 32 deletions(-) diff --git a/aa-core/src/signer.rs b/aa-core/src/signer.rs index 84783ba..6e4bf4f 100644 --- a/aa-core/src/signer.rs +++ b/aa-core/src/signer.rs @@ -248,7 +248,7 @@ impl SmartAccountSigner { } /// Verify ERC-1271 signature - async fn verify_erc1271(&self, hash: B256, signature: &str) -> Result { + pub async fn verify_erc1271(&self, hash: B256, signature: &str) -> Result { let signature_bytes = hex::decode(signature.strip_prefix("0x").unwrap_or(signature)) .map_err(|_| EngineError::ValidationError { message: "Invalid signature hex".to_string(), @@ -263,15 +263,10 @@ impl SmartAccountSigner { .await { Ok(response) => { - dbg!(response); let expected_magic = ERC1271Contract::isValidSignatureCall::SELECTOR; Ok(response.as_slice() == expected_magic) } - Err(e) => { - let data = e.as_revert_data().unwrap(); - dbg!(decode_revert_reason(data.as_ref())); - Ok(false) - } + Err(e) => Ok(false), } } @@ -333,16 +328,16 @@ impl SmartAccountSigner { if is_deployed { // Verify ERC-1271 signature for deployed accounts - let message_hash = self.hash_message(message, format); - let is_valid = self.verify_erc1271(message_hash, &signature).await?; - - if is_valid { - Ok(signature) - } else { - Err(EngineError::ValidationError { - message: "ERC-1271 signature validation failed".to_string(), - }) - } + // let message_hash = self.hash_message(message, format); + // let is_valid = self.verify_erc1271(message_hash, &signature).await?; + + // if is_valid { + Ok(signature) + // } else { + // Err(EngineError::ValidationError { + // message: "ERC-1271 signature validation failed".to_string(), + // }) + // } } else { // Create ERC-6492 signature for undeployed accounts self.create_erc6492_signature(&signature).await @@ -359,21 +354,21 @@ impl SmartAccountSigner { if is_deployed { // Verify ERC-1271 signature for deployed accounts - let typed_data_hash = - typed_data - .eip712_signing_hash() - .map_err(|_e| EngineError::ValidationError { - message: "Failed to compute typed data hash".to_string(), - })?; - let is_valid = self.verify_erc1271(typed_data_hash, &signature).await?; - - if is_valid { - Ok(signature) - } else { - Err(EngineError::ValidationError { - message: "ERC-1271 signature validation failed".to_string(), - }) - } + // let typed_data_hash = + // typed_data + // .eip712_signing_hash() + // .map_err(|_e| EngineError::ValidationError { + // message: "Failed to compute typed data hash".to_string(), + // })?; + // let is_valid = self.verify_erc1271(typed_data_hash, &signature).await?; + + // if is_valid { + Ok(signature) + // } else { + // Err(EngineError::ValidationError { + // message: "ERC-1271 signature validation failed".to_string(), + // }) + // } } else { // Create ERC-6492 signature for undeployed accounts self.create_erc6492_signature(&signature).await From e3ea9cb47b9c63a2d1255ebf20c227311788845b Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Tue, 8 Jul 2025 16:32:07 +0530 Subject: [PATCH 2/5] clippy --- server/src/http/dyn_contract.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/http/dyn_contract.rs b/server/src/http/dyn_contract.rs index 9bd207f..20a10f0 100644 --- a/server/src/http/dyn_contract.rs +++ b/server/src/http/dyn_contract.rs @@ -198,8 +198,7 @@ impl ContractCall { fn extract_function_name(&self, method: &str) -> Result { let trimmed = method.trim(); - if trimmed.starts_with("function ") { - let after_function = &trimmed[9..]; + if let Some(after_function) = trimmed.strip_prefix("function ") { if let Some(paren_pos) = after_function.find('(') { return Ok(after_function[..paren_pos].trim().to_string()); } From f10c10ebf3bb2b355730fd79dd8809ca92c11158 Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Tue, 8 Jul 2025 16:32:18 +0530 Subject: [PATCH 3/5] add schema titles --- core/src/defs.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/defs.rs b/core/src/defs.rs index b5f36dd..d9299c8 100644 --- a/core/src/defs.rs +++ b/core/src/defs.rs @@ -2,16 +2,19 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; #[derive(JsonSchema, Serialize, Deserialize, Clone, utoipa::ToSchema)] +#[schema(title = "EVM Address")] /// ### Address /// Used to represent an EVM address. This is a string of length 42 with a `0x` prefix. Non-checksummed addresses are also supported, but will be converted to checksummed. pub struct AddressDef(pub String); #[derive(JsonSchema, Serialize, Deserialize, Clone, utoipa::ToSchema)] +#[schema(title = "Bytes")] /// # Bytes /// Used to represent "bytes". This is a 0x prefixed hex string. pub struct BytesDef(pub String); #[derive(JsonSchema, Serialize, Deserialize, Clone, utoipa::ToSchema)] +#[schema(title = "U256")] /// # U256 /// Used to represent a 256-bit unsigned integer. Engine can parse these from any valid encoding of the Ethereum "quantity" format. pub struct U256Def(pub String); From 9583209db705ed168e3f82eddc2a81b5e046d249 Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Tue, 8 Jul 2025 16:43:11 +0530 Subject: [PATCH 4/5] fix error reporting for erc1271 verification --- aa-core/src/signer.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/aa-core/src/signer.rs b/aa-core/src/signer.rs index 6e4bf4f..968ccd4 100644 --- a/aa-core/src/signer.rs +++ b/aa-core/src/signer.rs @@ -4,12 +4,12 @@ use alloy::{ dyn_abi::TypedData, primitives::{Address, B256, hex, keccak256}, sol, - sol_types::{SolCall, SolValue, decode_revert_reason, eip712_domain}, + sol_types::{SolCall, SolValue, eip712_domain}, }; use engine_core::{ chain::Chain, credentials::SigningCredential, - error::EngineError, + error::{ContractErrorToEngineError, EngineError}, signer::{AccountSigner, EoaSigner, EoaSigningOptions, Erc4337SigningOptions}, }; use serde::Serialize; @@ -266,7 +266,9 @@ impl SmartAccountSigner { let expected_magic = ERC1271Contract::isValidSignatureCall::SELECTOR; Ok(response.as_slice() == expected_magic) } - Err(e) => Ok(false), + Err(e) => { + Err(e.to_engine_error(self.chain.chain_id(), Some(self.smart_account.address))) + } } } From 0dde05e3125249dfe642a90aeb7d36002e528948 Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Tue, 8 Jul 2025 16:43:45 +0530 Subject: [PATCH 5/5] Add `serde_with` dependency and implement multicall configuration in contract read logic - Updated `Cargo.toml` to include `serde_with` version 3.14.0. - Enhanced `contract_read.rs` to support multicall configuration with detailed execution strategies. - Introduced `MulticallConfig` enum for flexible multicall handling, including enabling/disabling and custom addresses. - Refactored contract reading logic to utilize multicall by default, with fallback to direct calls on failure. - Improved error handling and response mapping for multicall results. --- Cargo.lock | 1 + server/Cargo.toml | 1 + server/src/http/routes/contract_read.rs | 417 ++++++++++++++++++++---- twmq/tests/basic_hook.rs | 17 +- 4 files changed, 372 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 68ec643..3b82182 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5099,6 +5099,7 @@ dependencies = [ "serde", "serde-bool", "serde_json", + "serde_with", "thirdweb-core", "tokio", "tower-http", diff --git a/server/Cargo.toml b/server/Cargo.toml index 93c0ab0..783be6d 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -40,3 +40,4 @@ utoipa = { version = "5.4.0", features = [ ] } utoipa-axum = "0.2.0" utoipa-scalar = { version = "0.3.0", features = ["axum"] } +serde_with = "3.14.0" diff --git a/server/src/http/routes/contract_read.rs b/server/src/http/routes/contract_read.rs index b04d462..6bf408c 100644 --- a/server/src/http/routes/contract_read.rs +++ b/server/src/http/routes/contract_read.rs @@ -2,9 +2,11 @@ use alloy::dyn_abi::FunctionExt; use alloy::primitives::{Address, ChainId, address}; -use alloy::providers::RootProvider; + use alloy::{ - providers::Provider, rpc::types::eth::TransactionRequest as AlloyTransactionRequest, sol, + providers::{Provider, RootProvider}, + rpc::types::eth::TransactionRequest as AlloyTransactionRequest, + sol, sol_types::SolCall, }; use axum::{ @@ -15,14 +17,16 @@ use axum::{ use engine_core::{ chain::{Chain, ChainService}, defs::AddressDef, - error::EngineError, + error::{AlloyRpcErrorToEngineError, EngineError}, }; use futures::future::join_all; -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value as JsonValue; +use serde_with::serde_as; use thirdweb_core::auth::ThirdwebAuth; +use utoipa::ToSchema; +use crate::http::error::EngineResult; use crate::http::extractors::EngineJson; use crate::http::types::{BatchResultItem, BatchResults}; use crate::http::{ @@ -54,36 +58,121 @@ const MULTICALL3_DEFAULT_ADDRESS: Address = address!("0xcA11bde05977b36311670288 // ===== REQUEST/RESPONSE TYPES ===== +/// Multicall configuration +#[derive(Debug, Clone, Serialize, ToSchema, Default)] +#[serde(untagged)] +pub enum MulticallConfig { + /// Enable multicall with default Multicall3 address + #[schema(example = true)] + #[default] + Enabled, + /// Disable multicall - make direct RPC calls instead + #[schema(example = false)] + Disabled, + /// Enable multicall with custom address + #[schema(value_type = AddressDef)] + CustomAddress(Address), +} + +impl<'de> Deserialize<'de> for MulticallConfig { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + use serde::de::Error; + use serde_json::Value; + + let value = Value::deserialize(deserializer)?; + + match value { + // Handle boolean values + Value::Bool(true) => Ok(MulticallConfig::Enabled), + Value::Bool(false) => Ok(MulticallConfig::Disabled), + + // Handle null as disabled + Value::Null => Ok(MulticallConfig::Disabled), + + // Handle string addresses + Value::String(addr_str) => { + // Try to parse as Address + match addr_str.parse::
() { + Ok(address) => Ok(MulticallConfig::CustomAddress(address)), + Err(_) => Err(D::Error::custom(format!( + "Invalid address format: {}. Expected valid Ethereum address or boolean value", + addr_str + ))), + } + } + + // Reject other types + _ => Err(D::Error::custom( + "Expected boolean (true/false), null, or address string", + )), + } + } +} + +impl MulticallConfig { + /// Returns true if multicall is enabled + pub fn is_enabled(&self) -> bool { + matches!( + self, + MulticallConfig::Enabled | MulticallConfig::CustomAddress(_) + ) + } + + /// Returns the multicall address to use + pub fn address(&self) -> Option
{ + match self { + MulticallConfig::Enabled => Some(default_multicall_address()), + MulticallConfig::Disabled => None, // Won't be used + MulticallConfig::CustomAddress(addr) => Some(*addr), + } + } +} + +fn default_multicall_address() -> Address { + MULTICALL3_DEFAULT_ADDRESS +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(untagged)] +pub enum MulticallDocType { + #[schema(example = true)] + Boolean(bool), + #[schema(value_type = AddressDef)] + Address(Address), +} + /// Options for reading from smart contracts -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, utoipa::ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] +#[serde_as] pub struct ReadOptions { /// The blockchain network ID to read from - pub chain_id: String, - /// Address of the Multicall3 contract to use for batching calls + #[serde_as(as = "PickFirst<(_, DisplayFromStr)>")] + pub chain_id: u64, + /// Multicall configuration /// - /// Defaults to the standard Multicall3 address: 0xcA11bde05977b3631167028862bE2a173976CA11 - /// which is deployed on most networks - #[serde(default = "default_multicall_address")] - #[schemars(with = "AddressDef")] - #[schema(value_type = AddressDef)] - pub multicall_address: Address, + /// Can be: + /// - `null` or omitted (default): Use multicall with default Multicall3 address and fallback to direct calls if multicall fails + /// - `true`: Explicitly enable multicall with default Multicall3 address (no fallback) + /// - `false`: Disable multicall and make direct RPC calls + /// - Address string: Use multicall with custom address (no fallback) + #[serde(default)] + #[schema(value_type = Option)] + pub multicall: Option, /// Optional address to use as the caller for view functions /// /// This can be useful for functions that return different values /// based on the caller's address or permissions #[serde(skip_serializing_if = "Option::is_none")] - #[schemars(with = "Option")] #[schema(value_type = Option)] pub from: Option
, } -fn default_multicall_address() -> Address { - MULTICALL3_DEFAULT_ADDRESS -} - /// Request to read from multiple smart contracts -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, utoipa::ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] #[serde(rename_all = "camelCase")] pub struct ReadRequest { /// Configuration options for the read operation @@ -95,7 +184,7 @@ pub struct ReadRequest { } /// Successful result from a contract read operation -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, utoipa::ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] #[serde(transparent)] pub struct ReadResultSuccessItem( /// The decoded return value from the contract function @@ -124,7 +213,13 @@ pub struct ReadResultSuccessItem( )] /// Read Contract /// -/// Read from multiple smart contracts using multicall +/// Read from smart contracts with intelligent execution strategy: +/// - Single calls: Always executed directly for efficiency +/// - Multiple calls: Uses multicall by default, or direct calls if disabled +/// - Failed preparations: Returns preparation errors directly +/// +/// If multicall is not specified, it will be used by default. In case of multicall related errors, engine will fallback to direct calls. +/// Only in the case where multicall is explicitly enabled, engine will not fallback to direct calls. pub async fn read_contract( State(state): State, OptionalRpcCredentialsExtractor(rpc_credentials): OptionalRpcCredentialsExtractor, @@ -134,20 +229,176 @@ pub async fn read_contract( engine_core::chain::RpcCredentials::Thirdweb(auth) => auth, }); - let chain_id: ChainId = request.read_options.chain_id.parse().map_err(|_| { - ApiEngineError(EngineError::ValidationError { - message: "Invalid chain ID".to_string(), - }) - })?; + let chain_id = request.read_options.chain_id; + + let chain = state.chains.get_chain(chain_id).api_error()?; // Prepare all contract calls in parallel let prepare_futures = request.params.iter().map(|contract_read| { - contract_read.prepare_call_with_error_tracking(&state.abi_service, chain_id, auth.clone()) + contract_read.prepare_call_with_error_tracking( + &state.abi_service, + chain.chain_id(), + auth.clone(), + ) }); let preparation_results: Vec> = join_all(prepare_futures).await; + // Determine execution strategy and execute + let results = + match determine_execution_strategy(&request.read_options.multicall, &preparation_results) { + ExecutionStrategy::Direct => { + execute_direct_contract_calls( + &preparation_results, + chain.provider(), + &request.read_options.from, + &chain, + ) + .await + } + ExecutionStrategy::Multicall(multicall_address) => { + execute_with_multicall_and_fallback( + &preparation_results, + multicall_address, + chain.provider(), + chain_id, + &chain, + &request.read_options.from, + allows_fallback(&request.read_options.multicall), + ) + .await + } + ExecutionStrategy::AllFailed => { + // All calls failed during preparation + preparation_results + .iter() + .map(|result| match result { + ContractOperationResult::Success(_) => { + BatchResultItem::failure(EngineError::InternalError { + message: "Internal error: no valid calls found".to_string(), + }) + } + ContractOperationResult::Failure(error) => { + BatchResultItem::failure(error.clone()) + } + }) + .collect() + } + }; + + Ok((StatusCode::OK, Json(BatchResults { result: results }))) +} + +// ===== HELPER FUNCTIONS ===== + +/// Execution strategy for contract calls +#[derive(Debug)] +enum ExecutionStrategy { + /// Execute calls directly without multicall + Direct, + /// Use multicall with the specified address + Multicall(Address), + /// All calls failed during preparation + AllFailed, +} + +/// Returns the multicall address if enabled +fn get_multicall_address(multicall_config: &Option) -> Option
{ + match multicall_config { + None => Some(default_multicall_address()), // Default behavior + Some(config) => config.address(), + } +} + +/// Returns true if fallback to direct calls is allowed when multicall fails +fn allows_fallback(multicall_config: &Option) -> bool { + multicall_config.is_none() // Only allow fallback for default behavior (None) +} + +/// Detect if an error is likely due to multicall contract issues that warrant fallback +fn is_multicall_fallback_error(error: &EngineError) -> bool { + match error { + EngineError::ContractInteractionError { message, .. } => { + // Check for ABI decoding errors that suggest multicall contract doesn't exist or is incompatible + message.contains("ABI decoding failed") + || message.contains("buffer overrun") + || message.contains("Failed to decode multicall result") + || message.contains("execution reverted") + || message.contains("contract not deployed") + } + _ => false, + } +} + +/// Determine the best execution strategy based on configuration and prepared calls +fn determine_execution_strategy( + multicall_config: &Option, + preparation_results: &[ContractOperationResult], +) -> ExecutionStrategy { + // Count successful preparations + let successful_calls = preparation_results + .iter() + .filter(|result| matches!(result, ContractOperationResult::Success(_))) + .count(); + + match successful_calls { + 0 => ExecutionStrategy::AllFailed, + 1 => ExecutionStrategy::Direct, // Single call is always direct for efficiency + _ => { + // Multiple calls - check configuration + if let Some(multicall_address) = get_multicall_address(multicall_config) { + ExecutionStrategy::Multicall(multicall_address) + } else { + ExecutionStrategy::Direct + } + } + } +} + +/// Execute contract calls using multicall with optional fallback to direct calls +async fn execute_with_multicall_and_fallback( + preparation_results: &[ContractOperationResult], + multicall_address: Address, + provider: &RootProvider, + chain_id: ChainId, + chain: &C, + from: &Option
, + allow_fallback: bool, +) -> Vec> { + // First try multicall + let multicall_result = + execute_with_multicall(preparation_results, multicall_address, provider, chain_id).await; + + // Check if we should fallback on error + if allow_fallback { + // Check if any results contain fallback-worthy errors + let has_fallback_error = multicall_result.iter().any(|result| match result { + BatchResultItem::Failure { error } => is_multicall_fallback_error(error), + _ => false, + }); + + if has_fallback_error { + tracing::warn!( + "Multicall failed with fallback-worthy error on chain {}, falling back to direct calls", + chain_id + ); + + // Fallback to direct calls + return execute_direct_contract_calls(preparation_results, provider, from, chain).await; + } + } + + multicall_result +} + +/// Execute contract calls using multicall +async fn execute_with_multicall( + preparation_results: &[ContractOperationResult], + multicall_address: Address, + provider: &RootProvider, + chain_id: ChainId, +) -> Vec> { // Separate successful calls for multicall while preserving original order and errors let (multicall_calls, call_indices): (Vec, Vec) = preparation_results .iter() @@ -165,38 +416,86 @@ pub async fn read_contract( }) .unzip(); - // Execute multicall if we have any valid calls - let multicall_results = if !multicall_calls.is_empty() { - let chain = state.chains.get_chain(chain_id).map_err(ApiEngineError)?; - match execute_multicall( - &request.read_options.multicall_address, - multicall_calls, - chain.provider(), - chain_id, - ) - .await - { - Ok(results) => Some(results), - Err(e) => { - tracing::error!("Multicall failed: {}", e); - None - } + let multicall_results = + execute_multicall(&multicall_address, multicall_calls, provider, chain_id).await; + + match multicall_results { + Ok(results) => { + map_multicall_results_to_original_order(preparation_results, &results, &call_indices) } - } else { - None - }; + Err(e) => { + tracing::error!("Multicall failed: {}", e); + // Return error for all calls + preparation_results + .iter() + .map(|_| BatchResultItem::failure(e.clone())) + .collect() + } + } +} - // Map results back to original order, preserving all errors - let results = map_results_to_original_order( - &preparation_results, - multicall_results.as_deref(), - &call_indices, - ); +/// Execute contract calls directly without multicall +async fn execute_direct_contract_calls( + preparation_results: &[ContractOperationResult], + provider: &RootProvider, + from: &Option
, + chain: &C, +) -> Vec> { + let mut results = Vec::new(); - Ok((StatusCode::OK, Json(BatchResults { result: results }))) -} + for prep_result in preparation_results { + match prep_result { + ContractOperationResult::Success(prepared_call) => { + // Execute the single call directly + let mut call_request = AlloyTransactionRequest::default() + .to(prepared_call.target) + .input(prepared_call.call_data.clone().into()); -// ===== HELPER FUNCTIONS ===== + if let Some(from_address) = from { + call_request = call_request.from(*from_address); + } + + match provider.call(call_request).await { + Ok(result) => { + // Decode the result + match prepared_call.function.abi_decode_output(&result) { + Ok(decoded_values) => { + let result_json = match decoded_values.len() { + 1 => dyn_sol_value_to_json(&decoded_values[0]), + _ => JsonValue::Array( + decoded_values.iter().map(dyn_sol_value_to_json).collect(), + ), + }; + results.push(BatchResultItem::success(ReadResultSuccessItem( + result_json, + ))); + } + Err(e) => { + results.push(BatchResultItem::failure( + EngineError::contract_decoding_error( + Some(prepared_call.target), + chain.chain_id(), + format!("Failed to decode result: {}", e), + ), + )); + } + } + } + Err(e) => { + // Convert alloy error to engine error + let engine_error = e.to_engine_error(chain); + results.push(BatchResultItem::failure(engine_error)); + } + } + } + ContractOperationResult::Failure(error) => { + results.push(BatchResultItem::failure(error.clone())); + } + } + } + + results +} /// Execute the multicall and return results async fn execute_multicall( @@ -228,12 +527,12 @@ async fn execute_multicall( } /// Map multicall results back to the original parameter order, preserving all errors -fn map_results_to_original_order( +fn map_multicall_results_to_original_order( preparation_results: &[ContractOperationResult], - multicall_results: Option<&[Result3]>, + multicall_results: &[Result3], call_indices: &[usize], ) -> Vec> { - let mut multicall_iter = multicall_results.unwrap_or(&[]).iter(); + let mut multicall_iter = multicall_results.iter(); preparation_results .iter() diff --git a/twmq/tests/basic_hook.rs b/twmq/tests/basic_hook.rs index 48a49c7..eed4536 100644 --- a/twmq/tests/basic_hook.rs +++ b/twmq/tests/basic_hook.rs @@ -54,7 +54,6 @@ pub struct WebhookJobOutput { pub response: String, } - // Main job that queues webhook jobs #[derive(Serialize, Deserialize, Clone)] pub struct MainJobPayload { @@ -82,7 +81,10 @@ impl DurableExecution for MainJobHandler { type ErrorData = TestJobErrorData; type JobData = MainJobPayload; - async fn process(&self, job: &BorrowedJob) -> JobResult { + async fn process( + &self, + job: &BorrowedJob, + ) -> JobResult { println!("MAIN_JOB: Processing job with id: {}", job.job.id); tokio::time::sleep(Duration::from_millis(50)).await; @@ -125,7 +127,10 @@ impl DurableExecution for WebhookJobHandler { type ErrorData = TestJobErrorData; type JobData = WebhookJobPayload; - async fn process(&self, job: &BorrowedJob) -> JobResult { + async fn process( + &self, + job: &BorrowedJob, + ) -> JobResult { println!("WEBHOOK_JOB: Sending webhook to: {}", job.job.data.url); println!("WEBHOOK_JOB: Payload: {}", job.job.data.payload); tokio::time::sleep(Duration::from_millis(25)).await; @@ -172,8 +177,10 @@ async fn test_cross_queue_job_scheduling() { println!("Creating main queue: {}", main_queue_name); println!("Creating webhook queue: {}", webhook_queue_name); - let mut queue_options = QueueOptions::default(); - queue_options.local_concurrency = 1; + let mut queue_options = QueueOptions { + local_concurrency: 1, + ..Default::default() + }; let webhook_handler = WebhookJobHandler;