From 0b80d06eead379e8717828b5422bab1248130024 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Mon, 31 Oct 2022 16:34:26 -0700 Subject: [PATCH] feat: Expose `transact async` for asynchronous transactions (#222) * Initial into_future working * Moved over to generic impl * Cleanup and fixing ViewFunction * Added ViewFunction impls and FunctionOwned/Args type * Cleanup usage of futures::TryFutureExt * Some more cleanup * Updated view_code and view_state to use new async builder * Added view async builder to API surface * Removed view_latest_block for view_block instead * view_account async builder now apart of the API surface * Got rid of unused client.view_{code, state} * Sort imports * Added Finality type * Added doc * Expose access keys * Expose access keys list * Rename QueryMethod => Method * Added gas price * Rename Queryable methods * Rename Queryable to ProcessQuery * Addressed comments * Addressed comments * Addressed comments (TM) * AccessKey info should be public * Addressed comments * Added transact_async * Update docs * Added test for transact_async * Addressed comments * Rebase from async builders * Update workspaces/src/operations.rs Co-authored-by: Austin Abell * Added TransactionPoll::Error * Added test for nonce * Status Result * Fix tests Co-authored-by: Austin Abell --- CHANGELOG.md | 1 + examples/Cargo.toml | 4 + examples/src/async_transaction.rs | 24 ++++ workspaces/src/operations.rs | 129 +++++++++++++++++++++- workspaces/src/rpc/client.rs | 54 ++++++++- workspaces/tests/parallel_transactions.rs | 57 ++++++++++ 6 files changed, 263 insertions(+), 6 deletions(-) create mode 100644 examples/src/async_transaction.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index e7a4d0c8..f723e20c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Added - [`view_*` asynchronous builders have been added which provides being able to query from a specific block hash or block height](https://github.com/near/workspaces-rs/pull/218) +- [`{CallTransaction, Transaction}::transact_async` for performing transactions without directly having to wait for it complete it on chain](https://github.com/near/workspaces-rs/pull/222) ### Changed diff --git a/examples/Cargo.toml b/examples/Cargo.toml index ca14c130..95d227d4 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -18,6 +18,10 @@ tracing = "0.1" tracing-subscriber = { version = "0.3.5", features = ["env-filter"] } workspaces = { path = "../workspaces" } +[[example]] +name = "async_transaction" +path = "src/async_transaction.rs" + [[example]] name = "nft" path = "src/nft.rs" diff --git a/examples/src/async_transaction.rs b/examples/src/async_transaction.rs new file mode 100644 index 00000000..a384ea51 --- /dev/null +++ b/examples/src/async_transaction.rs @@ -0,0 +1,24 @@ +const STATUS_MSG_WASM_FILEPATH: &str = "./examples/res/status_message.wasm"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let worker = workspaces::sandbox().await?; + let wasm = std::fs::read(STATUS_MSG_WASM_FILEPATH)?; + let contract = worker.dev_deploy(&wasm).await?; + + let status = contract + .call("set_status") + .args_json(serde_json::json!({ + "message": "hello_world", + })) + .transact_async() + .await?; + + let outcome = status.await; + println!( + "Async transaction result from setting hello world: {:#?}", + outcome + ); + + Ok(()) +} diff --git a/workspaces/src/operations.rs b/workspaces/src/operations.rs index 8163d966..9456248a 100644 --- a/workspaces/src/operations.rs +++ b/workspaces/src/operations.rs @@ -1,23 +1,29 @@ //! All operation types that are generated/used when making transactions or view calls. -use crate::error::ErrorKind; +use crate::error::{ErrorKind, RpcErrorCode}; use crate::result::{Execution, ExecutionFinalResult, Result, ViewResultDetails}; use crate::rpc::client::{ - send_batch_tx_and_retry, Client, DEFAULT_CALL_DEPOSIT, DEFAULT_CALL_FN_GAS, + send_batch_tx_and_retry, send_batch_tx_async_and_retry, Client, DEFAULT_CALL_DEPOSIT, + DEFAULT_CALL_FN_GAS, }; +use crate::rpc::BoxFuture; use crate::types::{ AccessKey, AccountId, Balance, Gas, InMemorySigner, KeyType, PublicKey, SecretKey, }; use crate::worker::Worker; -use crate::{Account, Network}; +use crate::{Account, CryptoHash, Network}; use near_account_id::ParseAccountError; +use near_jsonrpc_client::errors::{JsonRpcError, JsonRpcServerError}; +use near_jsonrpc_client::methods::tx::RpcTransactionError; use near_primitives::transaction::{ Action, AddKeyAction, CreateAccountAction, DeleteAccountAction, DeleteKeyAction, DeployContractAction, FunctionCallAction, StakeAction, TransferAction, }; use near_primitives::views::FinalExecutionOutcomeView; use std::convert::TryInto; +use std::future::IntoFuture; +use std::task::Poll; const MAX_GAS: Gas = 300_000_000_000_000; @@ -225,13 +231,25 @@ impl<'a> Transaction<'a> { send_batch_tx_and_retry(self.client, &self.signer, &self.receiver_id, self.actions?).await } - /// Process the trannsaction, and return the result of the execution. + /// Process the transaction, and return the result of the execution. pub async fn transact(self) -> Result { self.transact_raw() .await .map(ExecutionFinalResult::from_view) .map_err(crate::error::Error::from) } + + /// Send the transaction to the network to be processed. This will be done asynchronously + /// without waiting for the transaction to complete. This returns us a [`TransactionStatus`] + /// for which we can call into [`status`] and/or [`wait`] to retrieve info about whether + /// the transaction has been completed or not. + /// + /// [`status`]: TransactionStatus::status + /// [`wait`]: TransactionStatus::wait + pub async fn transact_async(self) -> Result> { + send_batch_tx_async_and_retry(self.client, &self.signer, &self.receiver_id, self.actions?) + .await + } } /// Similiar to a [`Transaction`], but more specific to making a call into a contract. @@ -318,6 +336,29 @@ impl<'a> CallTransaction<'a> { .map_err(crate::error::Error::from) } + /// Send the transaction to the network to be processed. This will be done asynchronously + /// without waiting for the transaction to complete. This returns us a [`TransactionStatus`] + /// for which we can call into [`status`] and/or [`wait`] to retrieve info about whether + /// the transaction has been completed or not. + /// + /// [`status`]: TransactionStatus::status + /// [`wait`]: TransactionStatus::wait + pub async fn transact_async(self) -> Result> { + send_batch_tx_async_and_retry( + self.worker.client(), + &self.signer, + &self.contract_id, + vec![FunctionCallAction { + args: self.function.args?, + method_name: self.function.name, + gas: self.function.gas, + deposit: self.function.deposit, + } + .into()], + ) + .await + } + /// Instead of transacting the transaction, call into the specified view function. pub async fn view(self) -> Result { self.worker @@ -393,3 +434,83 @@ impl<'a, 'b> CreateAccountTransaction<'a, 'b> { }) } } + +/// `TransactionStatus` object relating to an [`asynchronous transaction`] on the network. +/// Used to query into the status of the Transaction for whether it has completed or not. +/// +/// [`asynchronous transaction`]: https://docs.near.org/api/rpc/transactions#send-transaction-async +#[must_use] +pub struct TransactionStatus<'a> { + client: &'a Client, + sender_id: AccountId, + hash: CryptoHash, +} + +impl<'a> TransactionStatus<'a> { + pub(crate) fn new( + client: &'a Client, + id: AccountId, + hash: near_primitives::hash::CryptoHash, + ) -> Self { + Self { + client, + sender_id: id, + hash: CryptoHash(hash.0), + } + } + + /// Checks the status of the transaction. If an `Err` is returned, then the transaction + /// is in an unexpected state. The error should have further context. Otherwise, if an + /// `Ok` value with [`Poll::Pending`] is returned, then the transaction has not finished. + pub async fn status(&self) -> Result> { + let result = self + .client + .tx_async_status( + &self.sender_id, + near_primitives::hash::CryptoHash(self.hash.0), + ) + .await + .map(ExecutionFinalResult::from_view); + + match result { + Ok(result) => Ok(Poll::Ready(result)), + Err(err) => match err { + JsonRpcError::ServerError(JsonRpcServerError::HandlerError( + RpcTransactionError::UnknownTransaction { .. }, + )) => Ok(Poll::Pending), + other => Err(RpcErrorCode::BroadcastTxFailure.custom(other)), + }, + } + } + + /// Wait until the completion of the transaction by polling [`TransactionStatus::status`]. + pub(crate) async fn wait(self) -> Result { + loop { + match self.status().await? { + Poll::Ready(val) => break Ok(val), + Poll::Pending => (), + } + + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + } + } + + /// Get the [`AccountId`] of the account that initiated this transaction. + pub fn sender_id(&self) -> &AccountId { + &self.sender_id + } + + /// Reference [`CryptoHash`] to the submitted transaction, pending completion. + pub fn hash(&self) -> &CryptoHash { + &self.hash + } +} + +impl<'a> IntoFuture for TransactionStatus<'a> { + type Output = Result; + type IntoFuture = BoxFuture<'a, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin(async { self.wait().await }) + } +} diff --git a/workspaces/src/rpc/client.rs b/workspaces/src/rpc/client.rs index 1659374e..a5f345f3 100644 --- a/workspaces/src/rpc/client.rs +++ b/workspaces/src/rpc/client.rs @@ -27,6 +27,7 @@ use near_primitives::views::{ }; use crate::error::{Error, ErrorKind, RpcErrorCode}; +use crate::operations::TransactionStatus; use crate::result::Result; use crate::types::{AccountId, InMemorySigner, Nonce, PublicKey}; @@ -39,7 +40,7 @@ pub struct Client { rpc_addr: String, rpc_client: JsonRpcClient, /// AccessKey nonces to reference when sending transactions. - access_key_nonces: RwLock>, + pub(crate) access_key_nonces: RwLock>, } impl Client { @@ -284,6 +285,20 @@ impl Client { result } + pub(crate) async fn tx_async_status( + &self, + sender_id: &AccountId, + hash: CryptoHash, + ) -> Result> { + self.query(methods::tx::RpcTransactionStatusRequest { + transaction_info: methods::tx::TransactionInfo::TransactionId { + account_id: sender_id.clone(), + hash, + }, + }) + .await + } + pub(crate) async fn wait_for_rpc(&self) -> Result<()> { let timeout_secs = match std::env::var("NEAR_RPC_TIMEOUT_SECS") { // hard fail on not being able to parse the env var, since this isn't something @@ -316,7 +331,7 @@ impl Client { } } -async fn access_key( +pub(crate) async fn access_key( client: &Client, account_id: near_primitives::account::id::AccountId, public_key: near_crypto::PublicKey, @@ -456,3 +471,38 @@ pub(crate) async fn send_batch_tx_and_retry( }) .await } + +pub(crate) async fn send_batch_tx_async_and_retry<'a>( + client: &'a Client, + signer: &InMemorySigner, + receiver_id: &AccountId, + actions: Vec, +) -> Result> { + let signer = signer.inner(); + + retry(|| async { + let (block_hash, nonce) = + fetch_tx_nonce(client, signer.account_id.clone(), signer.public_key()).await?; + + let hash = client + .query(&methods::broadcast_tx_async::RpcBroadcastTxAsyncRequest { + signed_transaction: SignedTransaction::from_actions( + nonce, + signer.account_id.clone(), + receiver_id.clone(), + &signer as &dyn Signer, + actions.clone(), + block_hash, + ), + }) + .await + .map_err(|e| RpcErrorCode::BroadcastTxFailure.custom(e))?; + + Ok(TransactionStatus::new( + client, + signer.account_id.clone(), + hash, + )) + }) + .await +} diff --git a/workspaces/tests/parallel_transactions.rs b/workspaces/tests/parallel_transactions.rs index 22d6f001..d12240e7 100644 --- a/workspaces/tests/parallel_transactions.rs +++ b/workspaces/tests/parallel_transactions.rs @@ -1,3 +1,5 @@ +use std::{collections::VecDeque, task::Poll}; + use serde_json::json; const STATUS_MSG_CONTRACT: &[u8] = include_bytes!("../../examples/res/status_message.wasm"); @@ -38,3 +40,58 @@ async fn test_parallel() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test] +async fn test_parallel_async() -> anyhow::Result<()> { + let worker = workspaces::sandbox().await?; + let contract = worker.dev_deploy(STATUS_MSG_CONTRACT).await?; + let account = worker.dev_create_account().await?; + + // nonce of access key before any transactions occured. + let nonce_start = worker + .view_access_key(account.id(), &account.secret_key().public_key()) + .await? + .nonce; + + // Create a queue statuses we can check the status of later. + let mut statuses = VecDeque::new(); + let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; + for msg in messages { + let status = account + .call(contract.id(), "set_status") + .args_json(json!({ + "message": msg, + })) + .transact_async() + .await?; + statuses.push_back(status); + } + + // Retry checking the statuses of all transactions until the queue is empty + // with all transactions completed. + while let Some(status) = statuses.pop_front() { + match status.status().await? { + Poll::Ready(_) => (), + Poll::Pending => statuses.push_back(status), + } + } + + // Check the final set message. This should be "j" due to the ordering of the queue. + let final_set_msg = account + .call(contract.id(), "get_status") + .args_json(json!({ "account_id": account.id() })) + .view() + .await? + .json::()?; + assert_eq!(final_set_msg, "j"); + + let nonce_end = worker + .view_access_key(account.id(), &account.secret_key().public_key()) + .await? + .nonce; + + // The amount of transactions should equal the increase in nonce: + assert!(nonce_end - nonce_start == messages.len() as u64); + + Ok(()) +}