From 4e2e79c4d600f7f6746c9682157265a78824ab4f Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 10 Aug 2021 14:02:34 -0400 Subject: [PATCH 1/3] Restore RPC query parsing and tests Signed-off-by: Thane Thomson --- rpc/Cargo.toml | 3 + rpc/src/query.rs | 430 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 428 insertions(+), 5 deletions(-) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 6fbc00e45..4d69ddb86 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -68,6 +68,7 @@ std = [ bytes = "1.0" chrono = "0.4" getrandom = "0.1" +peg = "0.7.0" pin-project = "1.0.1" serde = { version = "1", features = [ "derive" ] } serde_bytes = "0.11" @@ -80,6 +81,8 @@ subtle-encoding = { version = "0.5", features = ["bech32-preview"] } url = "2.2" walkdir = "2.3" flex-error = { version = "0.4.1", default-features = false } + +# Optional dependencies async-trait = { version = "0.1", optional = true } async-tungstenite = { version = "0.12", features = ["tokio-runtime", "tokio-rustls"], optional = true } futures = { version = "0.3", optional = true } diff --git a/rpc/src/query.rs b/rpc/src/query.rs index ca82f265a..d6df24e69 100644 --- a/rpc/src/query.rs +++ b/rpc/src/query.rs @@ -4,11 +4,8 @@ //! //! [`Query`]: struct.Query.html -// TODO(thane): These warnings are generated by the PEG for some reason. Try to fix and remove. -#![allow(clippy::redundant_closure_call, clippy::unit_arg)] - use crate::Error; -use chrono::{Date, DateTime, FixedOffset, Utc}; +use chrono::{Date, DateTime, FixedOffset, NaiveDate, Utc}; use std::fmt; use std::str::FromStr; @@ -36,6 +33,21 @@ use std::str::FromStr; /// assert_eq!("tm.event = 'Tx' AND tx.height >= 100", query.to_string()); /// ``` /// +/// ### Query parsing +/// +/// ```rust +/// use tendermint_rpc::query::{Query, EventType}; +/// +/// let query: Query = "tm.event = 'NewBlock'".parse().unwrap(); +/// assert_eq!(query, Query::from(EventType::NewBlock)); +/// +/// let query: Query = "tm.event = 'Tx' AND tx.hash = 'XYZ'".parse().unwrap(); +/// assert_eq!(query, Query::from(EventType::Tx).and_eq("tx.hash", "XYZ")); +/// +/// let query: Query = "tm.event = 'Tx' AND tx.height >= 100".parse().unwrap(); +/// assert_eq!(query, Query::from(EventType::Tx).and_gte("tx.height", 100_u64)); +/// ``` +/// /// [subscribe endpoint documentation]: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe #[derive(Debug, Clone, PartialEq)] pub struct Query { @@ -191,6 +203,172 @@ impl fmt::Display for Query { } } +peg::parser! { + grammar query_parser() for str { + // Some or no whitespace. + rule _() = quiet!{[' ']*} + + // At least some whitespace. + rule __() = quiet!{[' ']+} + + rule string() -> &'input str + = "'" s:$([^'\'']*) "'" { s } + + rule unsigned() -> u64 + = s:$(['0'..='9']+) {? + u64::from_str(s) + .map_err(|_| "failed to parse as an unsigned integer") + } + + rule signed() -> i64 + = s:$("-" ['1'..='9'] ['0'..='9']*) {? + i64::from_str(s) + .map_err(|_| "failed to parse as a signed integer") + } + + rule year() -> &'input str + = $(['0'..='9']*<4>) + + rule month() -> &'input str + = $(['0' | '1'] ['0'..='9']) + + rule day() -> &'input str + = $(['0'..='3'] ['0'..='9']) + + rule date() -> &'input str + = $(year() "-" month() "-" day()) + + rule hour() -> &'input str + = $(['0'..='2'] ['0'..='9']) + + rule min_sec() -> &'input str + = $(['0'..='5'] ['0'..='9']) + + rule nanosec() -> &'input str + = $("." ['0'..='9']+) + + rule time() -> &'input str + = $(hour() ":" min_sec() ":" min_sec() nanosec()? "Z") + + rule datetime() -> &'input str + = dt:$(date() "T" time()) { dt } + + rule float() -> f64 + = s:$("-"? ['0'..='9']+ "." ['0'..='9']+) {? + f64::from_str(s) + .map_err(|_| "failed to parse as a 64-bit floating point number") + } + + rule string_op() -> Operand + = s:string() { Operand::String(s.to_owned()) } + + rule unsigned_op() -> Operand + = u:unsigned() { Operand::Unsigned(u) } + + rule signed_op() -> Operand + = s:signed() { Operand::Signed(s) } + + rule datetime_op() -> Operand + = "TIME" __ dt:datetime() {? + DateTime::parse_from_rfc3339(dt) + .map(|dt| Operand::DateTime(dt.with_timezone(&Utc))) + .map_err(|_| "failed to parse as RFC3339-compatible date/time") + } + + rule date_op() -> Operand + = "DATE" __ dt:date() {? + let naive_date = NaiveDate::parse_from_str(dt, "%Y-%m-%d") + .map_err(|_| "failed to parse as RFC3339-compatible date")?; + Ok(Operand::Date(Date::from_utc(naive_date, Utc))) + } + + rule float_op() -> Operand + = f:float() { Operand::Float(f) } + + rule tag() -> &'input str + = $(['a'..='z' | 'A'..='Z'] ['a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '.']*) + + rule operand() -> Operand + = datetime_op() / date_op() / string_op() / float_op() / signed_op() / unsigned_op() + + rule eq() -> Condition + = t:tag() _ "=" _ op:operand() { Condition::Eq(t.to_owned(), op) } + + rule lte() -> Condition + = t:tag() _ "<=" _ op:operand() { Condition::Lte(t.to_owned(), op) } + + rule lt() -> Condition + = t:tag() _ "<" _ op:operand() { Condition::Lt(t.to_owned(), op) } + + rule gte() -> Condition + = t:tag() _ ">=" _ op:operand() { Condition::Gte(t.to_owned(), op) } + + rule gt() -> Condition + = t:tag() _ ">" _ op:operand() { Condition::Gt(t.to_owned(), op) } + + rule contains() -> Condition + = t:tag() __ "CONTAINS" __ op:string() { Condition::Contains(t.to_owned(), op.to_owned()) } + + rule exists() -> Condition + = t:tag() __ "EXISTS" { Condition::Exists(t.to_owned()) } + + rule event_type() -> Term + = "tm.event" _ "=" _ "'" et:$("NewBlock" / "Tx") "'" { + Term::EventType(EventType::from_str(et).unwrap()) + } + + rule condition() -> Term + = c:(eq() / lte() / lt() / gte() / gt() / contains() / exists()) { Term::Condition(c) } + + rule term() -> Term + = event_type() / condition() + + pub rule query() -> Vec + = t:term() ** ( __ "AND" __ ) { t } + } +} + +/// A term in a query is either an event type or a general condition. +/// Exclusively used for query parsing. +#[derive(Debug)] +pub enum Term { + EventType(EventType), + Condition(Condition), +} + +// Separate a list of terms into lists of each type of term. +fn separate_terms(terms: Vec) -> (Vec, Vec) { + terms + .into_iter() + .fold((Vec::new(), Vec::new()), |mut v, t| { + match t { + Term::EventType(et) => v.0.push(et), + Term::Condition(c) => v.1.push(c), + } + v + }) +} + +impl FromStr for Query { + type Err = Error; + + fn from_str(s: &str) -> Result { + let (event_types, conditions) = separate_terms( + query_parser::query(s) + .map_err(|e| Error::invalid_params(format!("failed to parse query: {}", e)))?, + ); + if event_types.len() > 1 { + return Err(Error::invalid_params( + "tm.event can only be used once in a query".to_owned(), + )); + } + Ok(Query { + event_type: event_types.first().cloned(), + conditions, + }) + } +} + fn join(f: &mut fmt::Formatter<'_>, separator: S, iterable: I) -> fmt::Result where S: fmt::Display, @@ -420,7 +598,7 @@ fn escape(s: &str) -> String { #[cfg(test)] mod test { use super::*; - use chrono::NaiveDate; + use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; #[test] fn empty_query() { @@ -518,4 +696,246 @@ mod test { query.to_string() ); } + + #[test] + fn query_event_type_parsing() { + // Test the empty query (that matches all possible events) + let query = Query::from_str("").unwrap(); + assert_eq!(query, Query::default()); + + // With just one event type + let query = Query::from_str("tm.event='Tx'").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert!(query.conditions.is_empty()); + let query = Query::from_str("tm.event='NewBlock'").unwrap(); + assert_eq!(query.event_type, Some(EventType::NewBlock)); + assert!(query.conditions.is_empty()); + + // One event type, with whitespace + let query = Query::from_str("tm.event = 'NewBlock'").unwrap(); + assert_eq!(query.event_type, Some(EventType::NewBlock)); + assert!(query.conditions.is_empty()); + + // Two event types are not allowed + assert!(Query::from_str("tm.event='Tx' AND tm.event='NewBlock'").is_err()); + } + + #[test] + fn query_string_term_parsing() { + // Query with string term + let query = Query::from_str("tm.event='Tx' AND transfer.sender='AddrA'").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq( + "transfer.sender".to_owned(), + Operand::String("AddrA".to_owned()), + )] + ); + // Query with string term, with extra whitespace + let query = Query::from_str("tm.event = 'Tx' AND transfer.sender = 'AddrA'").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq( + "transfer.sender".to_owned(), + Operand::String("AddrA".to_owned()), + )] + ); + } + + #[test] + fn query_unsigned_term_parsing() { + let query = Query::from_str("tm.event = 'Tx' AND tx.height = 10").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq("tx.height".to_owned(), Operand::Unsigned(10))] + ); + + let query = Query::from_str("tm.event = 'Tx' AND tx.height <= 100").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Lte( + "tx.height".to_owned(), + Operand::Unsigned(100) + )] + ); + } + + #[test] + fn query_signed_term_parsing() { + let query = Query::from_str("tm.event = 'Tx' AND some.value = -1").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq("some.value".to_owned(), Operand::Signed(-1))] + ); + + let query = Query::from_str("tm.event = 'Tx' AND some.value <= -100").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Lte( + "some.value".to_owned(), + Operand::Signed(-100) + )] + ); + } + + #[test] + fn query_date_parsing() { + let query = Query::from_str("tm.event = 'Tx' AND some.date <= DATE 2022-02-03").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Lte( + "some.date".to_owned(), + Operand::Date(Date::from_utc(NaiveDate::from_ymd(2022, 2, 3), Utc)) + )] + ); + } + + #[test] + fn query_datetime_parsing() { + let query = + Query::from_str("tm.event = 'Tx' AND some.datetime = TIME 2021-02-26T17:05:02.1495Z") + .unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq( + "some.datetime".to_owned(), + Operand::DateTime(DateTime::from_utc( + NaiveDateTime::new( + NaiveDate::from_ymd(2021, 2, 26), + NaiveTime::from_hms_nano(17, 5, 2, 149500000) + ), + Utc + )) + )] + ) + } + + #[test] + fn query_float_parsing() { + // Positive floating point number + let query = Query::from_str("short.pi = 3.14159").unwrap(); + assert_eq!(query.conditions.len(), 1); + match &query.conditions[0] { + Condition::Eq(tag, op) => { + assert_eq!(tag, "short.pi"); + match op { + Operand::Float(f) => { + assert!(floats_eq(*f, std::f64::consts::PI, 5)); + } + _ => panic!("unexpected operand: {:?}", op), + } + } + c => panic!("unexpected condition: {:?}", c), + } + + // Negative floating point number + let query = Query::from_str("short.pi = -3.14159").unwrap(); + assert_eq!(query.conditions.len(), 1); + match &query.conditions[0] { + Condition::Eq(tag, op) => { + assert_eq!(tag, "short.pi"); + match op { + Operand::Float(f) => { + assert!(floats_eq(*f, -std::f64::consts::PI, 5)); + } + _ => panic!("unexpected operand: {:?}", op), + } + } + c => panic!("unexpected condition: {:?}", c), + } + } + + // From https://stackoverflow.com/a/41447964/1156132 + fn floats_eq(a: f64, b: f64, precision: u8) -> bool { + let factor = 10.0f64.powi(precision as i32); + let a = (a * factor).trunc(); + let b = (b * factor).trunc(); + a == b + } + + #[test] + fn query_conditions() { + let query = Query::from_str("some.field = 'string'").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Eq( + "some.field".to_owned(), + Operand::String("string".to_owned()) + )] + } + ); + + let query = Query::from_str("some.field < 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Lt("some.field".to_owned(), Operand::Unsigned(5),)] + } + ); + + let query = Query::from_str("some.field <= 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Lte( + "some.field".to_owned(), + Operand::Unsigned(5), + )] + } + ); + + let query = Query::from_str("some.field > 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Gt("some.field".to_owned(), Operand::Unsigned(5),)] + } + ); + + let query = Query::from_str("some.field >= 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Gte( + "some.field".to_owned(), + Operand::Unsigned(5), + )] + } + ); + + let query = Query::from_str("some.field CONTAINS 'inner'").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Contains( + "some.field".to_owned(), + "inner".to_owned() + )] + } + ); + + let query = Query::from_str("some.field EXISTS").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Exists("some.field".to_owned())] + } + ); + } } From 06c09956dc83b915899fa495672bc9505dd92f30 Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 10 Aug 2021 14:15:52 -0400 Subject: [PATCH 2/3] Restore RPC client binary subscription capabilities Signed-off-by: Thane Thomson --- rpc/src/client/bin/main.rs | 123 ++++++++++++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 3 deletions(-) diff --git a/rpc/src/client/bin/main.rs b/rpc/src/client/bin/main.rs index 950a9dfcd..fb1ca527a 100644 --- a/rpc/src/client/bin/main.rs +++ b/rpc/src/client/bin/main.rs @@ -1,11 +1,15 @@ //! CLI for performing simple interactions against a Tendermint node's RPC. +use futures::StreamExt; use std::str::FromStr; use structopt::StructOpt; use tendermint::abci::{Path, Transaction}; +use tendermint_rpc::query::Query; use tendermint_rpc::{ - Client, Error, HttpClient, Paging, Scheme, SubscriptionClient, Url, WebSocketClient, + Client, Error, HttpClient, Order, Paging, Scheme, Subscription, SubscriptionClient, Url, + WebSocketClient, }; +use tokio::time::Duration; use tracing::level_filters::LevelFilter; use tracing::{error, info, warn}; @@ -41,7 +45,18 @@ struct Opt { enum Request { #[structopt(flatten)] ClientRequest(ClientRequest), - // TODO(thane): Implement subscription functionality + /// Subscribe to receive events produced by a specific query. + Subscribe { + /// The query against which events will be matched. + query: Query, + /// The maximum number of events to receive before terminating. + #[structopt(long)] + max_events: Option, + /// The maximum amount of time (in seconds) to listen for events before + /// terminating. + #[structopt(long)] + max_time: Option, + }, } #[derive(Debug, StructOpt)] @@ -112,7 +127,18 @@ enum ClientRequest { NetInfo, /// Get Tendermint status (node info, public key, latest block hash, etc.). Status, - // TODO(thane): Implement txsearch endpoint. + TxSearch { + /// The query against which transactions should be matched. + query: Query, + #[structopt(long, default_value = "1")] + page: u32, + #[structopt(long, default_value = "10")] + per_page: u8, + #[structopt(long, default_value = "asc")] + order: Order, + #[structopt(long)] + prove: bool, + }, /// Get the validators at the given height. Validators { /// The height at which to query the validators. @@ -208,6 +234,7 @@ async fn http_request(url: Url, proxy_url: Option, req: Request) -> Result< match req { Request::ClientRequest(r) => client_request(&client, r).await, + _ => Err(Error::invalid_params("HTTP/S clients do not support subscription capabilities (please use the WebSocket client instead)".to_owned())) } } @@ -218,6 +245,11 @@ async fn websocket_request(url: Url, req: Request) -> Result<(), Error> { let result = match req { Request::ClientRequest(r) => client_request(&client, r).await, + Request::Subscribe { + query, + max_events, + max_time, + } => subscription_client_request(&client, query, max_events, max_time).await, }; client.close()?; @@ -308,6 +340,18 @@ where ClientRequest::Status => { serde_json::to_string_pretty(&client.status().await?).map_err(Error::serde)? } + ClientRequest::TxSearch { + query, + page, + per_page, + order, + prove, + } => serde_json::to_string_pretty( + &client + .tx_search(query, prove, page, per_page, order) + .await?, + ) + .map_err(Error::serde)?, ClientRequest::Validators { height, all, @@ -333,3 +377,76 @@ where println!("{}", result); Ok(()) } + +async fn subscription_client_request( + client: &C, + query: Query, + max_events: Option, + max_time: Option, +) -> Result<(), Error> +where + C: SubscriptionClient, +{ + info!("Creating subscription for query: {}", query); + let subs = client.subscribe(query).await?; + match max_time { + Some(secs) => recv_events_with_timeout(subs, max_events, secs).await, + None => recv_events(subs, max_events).await, + } +} + +async fn recv_events_with_timeout( + mut subs: Subscription, + max_events: Option, + timeout_secs: u32, +) -> Result<(), Error> { + let timeout = tokio::time::sleep(Duration::from_secs(timeout_secs as u64)); + let mut event_count = 0u64; + tokio::pin!(timeout); + loop { + tokio::select! { + result_opt = subs.next() => { + let result = match result_opt { + Some(r) => r, + None => { + info!("The server terminated the subscription"); + return Ok(()); + } + }; + let event = result?; + println!("{}", serde_json::to_string_pretty(&event).map_err(Error::serde)?); + event_count += 1; + if let Some(me) = max_events { + if event_count >= (me as u64) { + info!("Reached maximum number of events: {}", me); + return Ok(()); + } + } + } + _ = &mut timeout => { + info!("Reached event receive timeout of {} seconds", timeout_secs); + return Ok(()) + } + } + } +} + +async fn recv_events(mut subs: Subscription, max_events: Option) -> Result<(), Error> { + let mut event_count = 0u64; + while let Some(result) = subs.next().await { + let event = result?; + println!( + "{}", + serde_json::to_string_pretty(&event).map_err(Error::serde)? + ); + event_count += 1; + if let Some(me) = max_events { + if event_count >= (me as u64) { + info!("Reached maximum number of events: {}", me); + return Ok(()); + } + } + } + info!("The server terminated the subscription"); + Ok(()) +} From 256026cd633df88737b859d3cb2271a984fafc0f Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Tue, 10 Aug 2021 14:22:32 -0400 Subject: [PATCH 3/3] Add .changelog entry Signed-off-by: Thane Thomson --- .changelog/unreleased/features/859-rpc-query-parsing.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changelog/unreleased/features/859-rpc-query-parsing.md diff --git a/.changelog/unreleased/features/859-rpc-query-parsing.md b/.changelog/unreleased/features/859-rpc-query-parsing.md new file mode 100644 index 000000000..efeda0292 --- /dev/null +++ b/.changelog/unreleased/features/859-rpc-query-parsing.md @@ -0,0 +1,7 @@ +- `[tendermint-rpc]` Runtime query parsing (relevant to the `/subscribe` and + `/tx_search` endpoints) has been reintroduced. This allows for client-side + validation of queries prior to submitting them to a remote Tendermint node. An + example of how to use this is available in the `tendermint-rpc` CLI (see [the + README](https://github.com/informalsystems/tendermint-rs/tree/master/rpc#cli) + for details). + ([#859](https://github.com/informalsystems/tendermint-rs/issues/859))