Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reintroduce RPC query parsing #949

Merged
merged 4 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/unreleased/features/859-rpc-query-parsing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
- `[tendermint-rpc]` Runtime query parsing (relevant to the `/subscribe` and
`/tx_search` endpoints) has been reintroduced. This allows for client-side
validation of queries prior to submitting them to a remote Tendermint node. An
example of how to use this is available in the `tendermint-rpc` CLI (see [the
README](https://github.com/informalsystems/tendermint-rs/tree/master/rpc#cli)
for details).
([#859](https://github.com/informalsystems/tendermint-rs/issues/859))
3 changes: 3 additions & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ std = [
bytes = "1.0"
chrono = "0.4"
getrandom = "0.1"
peg = "0.7.0"
pin-project = "1.0.1"
serde = { version = "1", features = [ "derive" ] }
serde_bytes = "0.11"
Expand All @@ -80,6 +81,8 @@ subtle-encoding = { version = "0.5", features = ["bech32-preview"] }
url = "2.2"
walkdir = "2.3"
flex-error = { version = "0.4.1", default-features = false }

# Optional dependencies
async-trait = { version = "0.1", optional = true }
async-tungstenite = { version = "0.12", features = ["tokio-runtime", "tokio-rustls"], optional = true }
futures = { version = "0.3", optional = true }
Expand Down
131 changes: 125 additions & 6 deletions rpc/src/client/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
//! CLI for performing simple interactions against a Tendermint node's RPC.

use futures::StreamExt;
use std::str::FromStr;
use structopt::StructOpt;
use tendermint::abci::transaction::Hash;
use tendermint::abci::{Path, Transaction};
use tendermint_rpc::query::Query;
use tendermint_rpc::{
Client, Error, HttpClient, Paging, Scheme, SubscriptionClient, Url, WebSocketClient,
Client, Error, HttpClient, Order, Paging, Scheme, Subscription, SubscriptionClient, Url,
WebSocketClient,
};
use tokio::time::Duration;
use tracing::level_filters::LevelFilter;
use tracing::{error, info, warn};

Expand Down Expand Up @@ -42,7 +46,18 @@ struct Opt {
enum Request {
#[structopt(flatten)]
ClientRequest(ClientRequest),
// TODO(thane): Implement subscription functionality
/// Subscribe to receive events produced by a specific query.
Subscribe {
/// The query against which events will be matched.
query: Query,
/// The maximum number of events to receive before terminating.
#[structopt(long)]
max_events: Option<u32>,
/// The maximum amount of time (in seconds) to listen for events before
/// terminating.
#[structopt(long)]
max_time: Option<u32>,
},
}

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -122,7 +137,20 @@ enum ClientRequest {
#[structopt(long)]
prove: bool,
},
// TODO(thane): Implement txsearch endpoint.
/// Search for a transaction by way of a specific query. Uses the same
/// query syntax as the `subscribe` endpoint.
TxSearch {
/// The query against which transactions should be matched.
query: Query,
#[structopt(long, default_value = "1")]
page: u32,
#[structopt(long, default_value = "10")]
per_page: u8,
#[structopt(long, default_value = "asc")]
order: Order,
#[structopt(long)]
prove: bool,
},
/// Get the validators at the given height.
Validators {
/// The height at which to query the validators.
Expand Down Expand Up @@ -218,6 +246,7 @@ async fn http_request(url: Url, proxy_url: Option<Url>, req: Request) -> Result<

match req {
Request::ClientRequest(r) => client_request(&client, r).await,
_ => Err(Error::invalid_params("HTTP/S clients do not support subscription capabilities (please use the WebSocket client instead)".to_owned()))
}
}

Expand All @@ -228,6 +257,11 @@ async fn websocket_request(url: Url, req: Request) -> Result<(), Error> {

let result = match req {
Request::ClientRequest(r) => client_request(&client, r).await,
Request::Subscribe {
query,
max_events,
max_time,
} => subscription_client_request(&client, query, max_events, max_time).await,
};

client.close()?;
Expand Down Expand Up @@ -315,6 +349,9 @@ where
ClientRequest::NetInfo => {
serde_json::to_string_pretty(&client.net_info().await?).map_err(Error::serde)?
}
ClientRequest::Status => {
serde_json::to_string_pretty(&client.status().await?).map_err(Error::serde)?
}
ClientRequest::Tx { hash, prove } => serde_json::to_string_pretty(
&client
.tx(
Expand All @@ -324,9 +361,18 @@ where
.await?,
)
.map_err(Error::serde)?,
ClientRequest::Status => {
serde_json::to_string_pretty(&client.status().await?).map_err(Error::serde)?
}
ClientRequest::TxSearch {
query,
page,
per_page,
order,
prove,
} => serde_json::to_string_pretty(
&client
.tx_search(query, prove, page, per_page, order)
.await?,
)
.map_err(Error::serde)?,
ClientRequest::Validators {
height,
all,
Expand All @@ -352,3 +398,76 @@ where
println!("{}", result);
Ok(())
}

async fn subscription_client_request<C>(
client: &C,
query: Query,
max_events: Option<u32>,
max_time: Option<u32>,
) -> Result<(), Error>
where
C: SubscriptionClient,
{
info!("Creating subscription for query: {}", query);
let subs = client.subscribe(query).await?;
match max_time {
Some(secs) => recv_events_with_timeout(subs, max_events, secs).await,
None => recv_events(subs, max_events).await,
}
}

async fn recv_events_with_timeout(
mut subs: Subscription,
max_events: Option<u32>,
timeout_secs: u32,
) -> Result<(), Error> {
let timeout = tokio::time::sleep(Duration::from_secs(timeout_secs as u64));
let mut event_count = 0u64;
tokio::pin!(timeout);
loop {
tokio::select! {
result_opt = subs.next() => {
let result = match result_opt {
Some(r) => r,
None => {
info!("The server terminated the subscription");
return Ok(());
}
};
let event = result?;
println!("{}", serde_json::to_string_pretty(&event).map_err(Error::serde)?);
event_count += 1;
if let Some(me) = max_events {
if event_count >= (me as u64) {
info!("Reached maximum number of events: {}", me);
return Ok(());
}
}
}
_ = &mut timeout => {
info!("Reached event receive timeout of {} seconds", timeout_secs);
return Ok(())
}
}
}
}

async fn recv_events(mut subs: Subscription, max_events: Option<u32>) -> Result<(), Error> {
let mut event_count = 0u64;
while let Some(result) = subs.next().await {
let event = result?;
println!(
"{}",
serde_json::to_string_pretty(&event).map_err(Error::serde)?
);
event_count += 1;
if let Some(me) = max_events {
if event_count >= (me as u64) {
info!("Reached maximum number of events: {}", me);
return Ok(());
}
}
}
info!("The server terminated the subscription");
Ok(())
}
Loading