Skip to content

Commit

Permalink
feat: Expose transact async for asynchronous transactions (#222)
Browse files Browse the repository at this point in the history
* Initial into_future working

* Moved over to generic impl

* Cleanup and fixing ViewFunction

* Added ViewFunction impls and FunctionOwned/Args type

* Cleanup usage of futures::TryFutureExt

* Some more cleanup

* Updated view_code and view_state to use new async builder

* Added view async builder to API surface

* Removed view_latest_block for view_block instead

* view_account async builder now apart of the API surface

* Got rid of unused client.view_{code, state}

* Sort imports

* Added Finality type

* Added doc

* Expose access keys

* Expose access keys list

* Rename QueryMethod => Method

* Added gas price

* Rename Queryable methods

* Rename Queryable to ProcessQuery

* Addressed comments

* Addressed comments

* Addressed comments (TM)

* AccessKey info should be public

* Addressed comments

* Added transact_async

* Update docs

* Added test for transact_async

* Addressed comments

* Rebase from async builders

* Update workspaces/src/operations.rs

Co-authored-by: Austin Abell <austinabell8@gmail.com>

* Added TransactionPoll::Error

* Added test for nonce

* Status Result<Poll>

* Fix tests

Co-authored-by: Austin Abell <austinabell8@gmail.com>
  • Loading branch information
ChaoticTempest and austinabell committed Oct 31, 2022
1 parent fdd2864 commit 0b80d06
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Added

- [`view_*` asynchronous builders have been added which provides being able to query from a specific block hash or block height](https://github.com/near/workspaces-rs/pull/218)
- [`{CallTransaction, Transaction}::transact_async` for performing transactions without directly having to wait for it complete it on chain](https://github.com/near/workspaces-rs/pull/222)

### Changed

Expand Down
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3.5", features = ["env-filter"] }
workspaces = { path = "../workspaces" }

[[example]]
name = "async_transaction"
path = "src/async_transaction.rs"

[[example]]
name = "nft"
path = "src/nft.rs"
Expand Down
24 changes: 24 additions & 0 deletions examples/src/async_transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
const STATUS_MSG_WASM_FILEPATH: &str = "./examples/res/status_message.wasm";

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let worker = workspaces::sandbox().await?;
let wasm = std::fs::read(STATUS_MSG_WASM_FILEPATH)?;
let contract = worker.dev_deploy(&wasm).await?;

let status = contract
.call("set_status")
.args_json(serde_json::json!({
"message": "hello_world",
}))
.transact_async()
.await?;

let outcome = status.await;
println!(
"Async transaction result from setting hello world: {:#?}",
outcome
);

Ok(())
}
129 changes: 125 additions & 4 deletions workspaces/src/operations.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
//! All operation types that are generated/used when making transactions or view calls.

use crate::error::ErrorKind;
use crate::error::{ErrorKind, RpcErrorCode};
use crate::result::{Execution, ExecutionFinalResult, Result, ViewResultDetails};
use crate::rpc::client::{
send_batch_tx_and_retry, Client, DEFAULT_CALL_DEPOSIT, DEFAULT_CALL_FN_GAS,
send_batch_tx_and_retry, send_batch_tx_async_and_retry, Client, DEFAULT_CALL_DEPOSIT,
DEFAULT_CALL_FN_GAS,
};
use crate::rpc::BoxFuture;
use crate::types::{
AccessKey, AccountId, Balance, Gas, InMemorySigner, KeyType, PublicKey, SecretKey,
};
use crate::worker::Worker;
use crate::{Account, Network};
use crate::{Account, CryptoHash, Network};

use near_account_id::ParseAccountError;
use near_jsonrpc_client::errors::{JsonRpcError, JsonRpcServerError};
use near_jsonrpc_client::methods::tx::RpcTransactionError;
use near_primitives::transaction::{
Action, AddKeyAction, CreateAccountAction, DeleteAccountAction, DeleteKeyAction,
DeployContractAction, FunctionCallAction, StakeAction, TransferAction,
};
use near_primitives::views::FinalExecutionOutcomeView;
use std::convert::TryInto;
use std::future::IntoFuture;
use std::task::Poll;

const MAX_GAS: Gas = 300_000_000_000_000;

Expand Down Expand Up @@ -225,13 +231,25 @@ impl<'a> Transaction<'a> {
send_batch_tx_and_retry(self.client, &self.signer, &self.receiver_id, self.actions?).await
}

/// Process the trannsaction, and return the result of the execution.
/// Process the transaction, and return the result of the execution.
pub async fn transact(self) -> Result<ExecutionFinalResult> {
self.transact_raw()
.await
.map(ExecutionFinalResult::from_view)
.map_err(crate::error::Error::from)
}

/// Send the transaction to the network to be processed. This will be done asynchronously
/// without waiting for the transaction to complete. This returns us a [`TransactionStatus`]
/// for which we can call into [`status`] and/or [`wait`] to retrieve info about whether
/// the transaction has been completed or not.
///
/// [`status`]: TransactionStatus::status
/// [`wait`]: TransactionStatus::wait
pub async fn transact_async(self) -> Result<TransactionStatus<'a>> {
send_batch_tx_async_and_retry(self.client, &self.signer, &self.receiver_id, self.actions?)
.await
}
}

/// Similiar to a [`Transaction`], but more specific to making a call into a contract.
Expand Down Expand Up @@ -318,6 +336,29 @@ impl<'a> CallTransaction<'a> {
.map_err(crate::error::Error::from)
}

/// Send the transaction to the network to be processed. This will be done asynchronously
/// without waiting for the transaction to complete. This returns us a [`TransactionStatus`]
/// for which we can call into [`status`] and/or [`wait`] to retrieve info about whether
/// the transaction has been completed or not.
///
/// [`status`]: TransactionStatus::status
/// [`wait`]: TransactionStatus::wait
pub async fn transact_async(self) -> Result<TransactionStatus<'a>> {
send_batch_tx_async_and_retry(
self.worker.client(),
&self.signer,
&self.contract_id,
vec![FunctionCallAction {
args: self.function.args?,
method_name: self.function.name,
gas: self.function.gas,
deposit: self.function.deposit,
}
.into()],
)
.await
}

/// Instead of transacting the transaction, call into the specified view function.
pub async fn view(self) -> Result<ViewResultDetails> {
self.worker
Expand Down Expand Up @@ -393,3 +434,83 @@ impl<'a, 'b> CreateAccountTransaction<'a, 'b> {
})
}
}

/// `TransactionStatus` object relating to an [`asynchronous transaction`] on the network.
/// Used to query into the status of the Transaction for whether it has completed or not.
///
/// [`asynchronous transaction`]: https://docs.near.org/api/rpc/transactions#send-transaction-async
#[must_use]
pub struct TransactionStatus<'a> {
client: &'a Client,
sender_id: AccountId,
hash: CryptoHash,
}

impl<'a> TransactionStatus<'a> {
pub(crate) fn new(
client: &'a Client,
id: AccountId,
hash: near_primitives::hash::CryptoHash,
) -> Self {
Self {
client,
sender_id: id,
hash: CryptoHash(hash.0),
}
}

/// Checks the status of the transaction. If an `Err` is returned, then the transaction
/// is in an unexpected state. The error should have further context. Otherwise, if an
/// `Ok` value with [`Poll::Pending`] is returned, then the transaction has not finished.
pub async fn status(&self) -> Result<Poll<ExecutionFinalResult>> {
let result = self
.client
.tx_async_status(
&self.sender_id,
near_primitives::hash::CryptoHash(self.hash.0),
)
.await
.map(ExecutionFinalResult::from_view);

match result {
Ok(result) => Ok(Poll::Ready(result)),
Err(err) => match err {
JsonRpcError::ServerError(JsonRpcServerError::HandlerError(
RpcTransactionError::UnknownTransaction { .. },
)) => Ok(Poll::Pending),
other => Err(RpcErrorCode::BroadcastTxFailure.custom(other)),
},
}
}

/// Wait until the completion of the transaction by polling [`TransactionStatus::status`].
pub(crate) async fn wait(self) -> Result<ExecutionFinalResult> {
loop {
match self.status().await? {
Poll::Ready(val) => break Ok(val),
Poll::Pending => (),
}

tokio::time::sleep(std::time::Duration::from_millis(300)).await;
}
}

/// Get the [`AccountId`] of the account that initiated this transaction.
pub fn sender_id(&self) -> &AccountId {
&self.sender_id
}

/// Reference [`CryptoHash`] to the submitted transaction, pending completion.
pub fn hash(&self) -> &CryptoHash {
&self.hash
}
}

impl<'a> IntoFuture for TransactionStatus<'a> {
type Output = Result<ExecutionFinalResult>;
type IntoFuture = BoxFuture<'a, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(async { self.wait().await })
}
}
54 changes: 52 additions & 2 deletions workspaces/src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use near_primitives::views::{
};

use crate::error::{Error, ErrorKind, RpcErrorCode};
use crate::operations::TransactionStatus;
use crate::result::Result;
use crate::types::{AccountId, InMemorySigner, Nonce, PublicKey};

Expand All @@ -39,7 +40,7 @@ pub struct Client {
rpc_addr: String,
rpc_client: JsonRpcClient,
/// AccessKey nonces to reference when sending transactions.
access_key_nonces: RwLock<HashMap<AccountId, AtomicU64>>,
pub(crate) access_key_nonces: RwLock<HashMap<AccountId, AtomicU64>>,
}

impl Client {
Expand Down Expand Up @@ -284,6 +285,20 @@ impl Client {
result
}

pub(crate) async fn tx_async_status(
&self,
sender_id: &AccountId,
hash: CryptoHash,
) -> Result<FinalExecutionOutcomeView, JsonRpcError<RpcTransactionError>> {
self.query(methods::tx::RpcTransactionStatusRequest {
transaction_info: methods::tx::TransactionInfo::TransactionId {
account_id: sender_id.clone(),
hash,
},
})
.await
}

pub(crate) async fn wait_for_rpc(&self) -> Result<()> {
let timeout_secs = match std::env::var("NEAR_RPC_TIMEOUT_SECS") {
// hard fail on not being able to parse the env var, since this isn't something
Expand Down Expand Up @@ -316,7 +331,7 @@ impl Client {
}
}

async fn access_key(
pub(crate) async fn access_key(
client: &Client,
account_id: near_primitives::account::id::AccountId,
public_key: near_crypto::PublicKey,
Expand Down Expand Up @@ -456,3 +471,38 @@ pub(crate) async fn send_batch_tx_and_retry(
})
.await
}

pub(crate) async fn send_batch_tx_async_and_retry<'a>(
client: &'a Client,
signer: &InMemorySigner,
receiver_id: &AccountId,
actions: Vec<Action>,
) -> Result<TransactionStatus<'a>> {
let signer = signer.inner();

retry(|| async {
let (block_hash, nonce) =
fetch_tx_nonce(client, signer.account_id.clone(), signer.public_key()).await?;

let hash = client
.query(&methods::broadcast_tx_async::RpcBroadcastTxAsyncRequest {
signed_transaction: SignedTransaction::from_actions(
nonce,
signer.account_id.clone(),
receiver_id.clone(),
&signer as &dyn Signer,
actions.clone(),
block_hash,
),
})
.await
.map_err(|e| RpcErrorCode::BroadcastTxFailure.custom(e))?;

Ok(TransactionStatus::new(
client,
signer.account_id.clone(),
hash,
))
})
.await
}
57 changes: 57 additions & 0 deletions workspaces/tests/parallel_transactions.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{collections::VecDeque, task::Poll};

use serde_json::json;

const STATUS_MSG_CONTRACT: &[u8] = include_bytes!("../../examples/res/status_message.wasm");
Expand Down Expand Up @@ -38,3 +40,58 @@ async fn test_parallel() -> anyhow::Result<()> {

Ok(())
}

#[tokio::test]
async fn test_parallel_async() -> anyhow::Result<()> {
let worker = workspaces::sandbox().await?;
let contract = worker.dev_deploy(STATUS_MSG_CONTRACT).await?;
let account = worker.dev_create_account().await?;

// nonce of access key before any transactions occured.
let nonce_start = worker
.view_access_key(account.id(), &account.secret_key().public_key())
.await?
.nonce;

// Create a queue statuses we can check the status of later.
let mut statuses = VecDeque::new();
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for msg in messages {
let status = account
.call(contract.id(), "set_status")
.args_json(json!({
"message": msg,
}))
.transact_async()
.await?;
statuses.push_back(status);
}

// Retry checking the statuses of all transactions until the queue is empty
// with all transactions completed.
while let Some(status) = statuses.pop_front() {
match status.status().await? {
Poll::Ready(_) => (),
Poll::Pending => statuses.push_back(status),
}
}

// Check the final set message. This should be "j" due to the ordering of the queue.
let final_set_msg = account
.call(contract.id(), "get_status")
.args_json(json!({ "account_id": account.id() }))
.view()
.await?
.json::<String>()?;
assert_eq!(final_set_msg, "j");

let nonce_end = worker
.view_access_key(account.id(), &account.secret_key().public_key())
.await?
.nonce;

// The amount of transactions should equal the increase in nonce:
assert!(nonce_end - nonce_start == messages.len() as u64);

Ok(())
}

0 comments on commit 0b80d06

Please sign in to comment.