From 71a14b589e4d29e4a6576c2ea3c509f520ce0abf Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Fri, 30 Aug 2024 18:35:51 +0530 Subject: [PATCH 01/30] feat(transport): bare minimal reqwest-tower integration --- .../transport-http/src/reqwest_transport.rs | 152 +++++++++++++++++- 1 file changed, 148 insertions(+), 4 deletions(-) diff --git a/crates/transport-http/src/reqwest_transport.rs b/crates/transport-http/src/reqwest_transport.rs index 0911a8345de..07650d7dcfc 100644 --- a/crates/transport-http/src/reqwest_transport.rs +++ b/crates/transport-http/src/reqwest_transport.rs @@ -4,7 +4,10 @@ use alloy_transport::{ utils::guess_local_url, TransportConnect, TransportError, TransportErrorKind, TransportFut, }; use std::task; -use tower::Service; +use tower::{ + layer::util::{Identity, Stack}, + Layer, Service, ServiceBuilder, +}; use tracing::{debug, debug_span, trace, Instrument}; use url::Url; @@ -38,7 +41,7 @@ impl Http { } /// Make a request. - fn request_reqwest(&self, req: RequestPacket) -> TransportFut<'static> { + fn _request_reqwest(&self, req: RequestPacket) -> TransportFut<'static> { let this = self.clone(); let span: tracing::Span = debug_span!("ReqwestTransport", url = %self.url); Box::pin( @@ -78,6 +81,51 @@ impl Http { .instrument(span), ) } + + /// Make a request using the tower service with layers. + fn request_reqwest_with_layers(&self, req: RequestPacket) -> TransportFut<'static> { + let this = self.clone(); + let span = debug_span!("ReqwestTransport", url = %self.url); + let client = self.client.clone(); + Box::pin( + async move { + let mut service = + ReqwestBuilder::default().layer(LoggingLayer).on_transport(this.clone()); + + let reqwest_request = + client.post(this.url).json(&req).build().map_err(TransportErrorKind::custom)?; + + let resp = + service.call(reqwest_request).await.map_err(TransportErrorKind::custom)?; + + let status = resp.status(); + + debug!(%status, "received response from server"); + + // Unpack data from the response body. We do this regardless of + // the status code, as we want to return the error in the body + // if there is one. + let body = resp.bytes().await.map_err(TransportErrorKind::custom)?; + + debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body"); + trace!(body = %String::from_utf8_lossy(&body), "response body"); + + if status != reqwest::StatusCode::OK { + return Err(TransportErrorKind::http_error( + status.as_u16(), + String::from_utf8_lossy(&body).into_owned(), + )); + } + + // Deserialize a Box from the body. If deserialization fails, return + // the body as a string in the error. The conversion to String + // is lossy and may not cover all the bytes in the body. + serde_json::from_slice(&body) + .map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(&body))) + } + .instrument(span), + ) + } } impl Service for Http { @@ -93,7 +141,7 @@ impl Service for Http { #[inline] fn call(&mut self, req: RequestPacket) -> Self::Future { - self.request_reqwest(req) + self.request_reqwest_with_layers(req) } } @@ -110,6 +158,102 @@ impl Service for &Http { #[inline] fn call(&mut self, req: RequestPacket) -> Self::Future { - self.request_reqwest(req) + self.request_reqwest_with_layers(req) + } +} + +type ReqwestFuture = + Pin> + Send>>; + +impl Service for Http { + type Response = reqwest::Response; + type Error = reqwest::Error; + type Future = ReqwestFuture; + + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: reqwest::Request) -> Self::Future { + let reqwest_client = self.client.clone(); + let future = reqwest_client.execute(req); + Box::pin(async move { + let resp = future.await?; + Ok(resp) + }) + } +} +// == + +/// Builder. +#[derive(Debug)] +pub struct ReqwestBuilder { + inner: ServiceBuilder, +} + +impl Default for ReqwestBuilder { + fn default() -> Self { + Self { inner: ServiceBuilder::new() } + } +} + +impl ReqwestBuilder +where + L: Layer>, +{ + /// Add a middleware layer to the stack. + pub fn layer(self, layer: M) -> ReqwestBuilder> { + ReqwestBuilder { inner: self.inner.layer(layer) } + } + + /// Build with url + pub fn build_with_url(self, url: Url) -> L::Service { + let transport = Http::new(url); + self.on_transport(transport) + } + + /// Get the service from the inner layer. + pub fn on_transport(self, transport: Http) -> L::Service { + self.inner.service(transport) + } +} + +struct LoggingLayer; + +impl Layer for LoggingLayer { + type Service = LoggingService; + + fn layer(&self, inner: S) -> Self::Service { + LoggingService { inner } + } +} + +struct LoggingService { + inner: S, +} + +use std::{future::Future, pin::Pin}; + +impl Service for LoggingService +where + S: Service, + S::Future: Send + 'static, +{ + type Response = reqwest::Response; + type Error = reqwest::Error; + type Future = ReqwestFuture; + + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: reqwest::Request) -> Self::Future { + println!("LoggingLayer - request body is some: {:#?}", req.body().is_some()); + + let future = self.inner.call(req); + Box::pin(async move { + let resp = future.await?; + Ok(resp) + }) } } From 7eea8745787f43b0766460532f2438a191b4b5db Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 2 Sep 2024 14:41:49 +0530 Subject: [PATCH 02/30] feat(transport-http): reqwest-tower layer client --- crates/provider/src/provider/trait.rs | 13 +++ crates/transport-http/src/layer_transport.rs | 91 ++++++++++++++++++++ crates/transport-http/src/lib.rs | 4 + 3 files changed, 108 insertions(+) create mode 100644 crates/transport-http/src/layer_transport.rs diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 17393c9c35e..52c49325cdd 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1035,6 +1035,19 @@ mod tests { assert_eq!(0, num); } + #[tokio::test] + async fn test_layer_transport() { + init_tracing(); + let anvil = Anvil::new().spawn(); + let layer_transport = alloy_transport_http::LayerClient::new(anvil.endpoint_url()); + + let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true); + + let provider = RootProvider::<_, Ethereum>::new(rpc_client); + let num = provider.get_block_number().await.unwrap(); + assert_eq!(0, num); + } + #[tokio::test] async fn test_builder_helper_fn_any_network() { init_tracing(); diff --git a/crates/transport-http/src/layer_transport.rs b/crates/transport-http/src/layer_transport.rs new file mode 100644 index 00000000000..ac0cd42f301 --- /dev/null +++ b/crates/transport-http/src/layer_transport.rs @@ -0,0 +1,91 @@ +use alloy_json_rpc::{RequestPacket, ResponsePacket}; +use alloy_transport::{ + utils::guess_local_url, TransportConnect, TransportError, TransportErrorKind, TransportFut, +}; +use std::task; +use tower::{Service, ServiceBuilder}; +use tracing::{debug, debug_span, trace, Instrument}; +use url::Url; + +/// A [reqwest] client that can be used with tower layers. +#[derive(Debug, Clone)] +pub struct LayerClient { + url: Url, +} + +impl LayerClient { + /// Create a new [LayerClient] with the given URL. + pub const fn new(url: Url) -> Self { + Self { url } + } + + /// Make a request using the tower service with layers. + pub fn request(&self, req: RequestPacket) -> TransportFut<'static> { + let this = self.clone(); + let span = debug_span!("LayerClient", url = %self.url); + Box::pin( + async move { + let client = reqwest::Client::new(); + + let mut service = ServiceBuilder::new().service(client); + + let reqwest_request = service + .post(this.url.to_owned()) + .json(&req) + .build() + .map_err(TransportErrorKind::custom)?; + + let resp = + service.call(reqwest_request).await.map_err(TransportErrorKind::custom)?; + + let status = resp.status(); + + debug!(%status, "received response from server"); + + let body = resp.bytes().await.map_err(TransportErrorKind::custom)?; + + debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body"); + trace!(body = %String::from_utf8_lossy(&body), "response body"); + + if status != reqwest::StatusCode::OK { + return Err(TransportErrorKind::http_error( + status.as_u16(), + String::from_utf8_lossy(&body).into_owned(), + )); + } + + serde_json::from_slice(&body) + .map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(&body))) + } + .instrument(span), + ) + } +} + +impl TransportConnect for LayerClient { + type Transport = LayerClient; + + fn is_local(&self) -> bool { + guess_local_url(self.url.as_str()) + } + + fn get_transport<'a: 'b, 'b>( + &'a self, + ) -> alloy_transport::Pbf<'b, Self::Transport, TransportError> { + Box::pin(async move { Ok(LayerClient::new(self.url.clone())) }) + } +} + +impl Service for LayerClient { + type Response = ResponsePacket; + type Error = TransportError; + type Future = TransportFut<'static>; + + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: RequestPacket) -> Self::Future { + self.request(req) + } +} diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index b12a048ce61..f314813ef05 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -13,6 +13,10 @@ mod reqwest_transport; #[doc(inline)] pub use reqwest_transport::*; +mod layer_transport; + +pub use layer_transport::*; + #[cfg(feature = "reqwest")] pub use reqwest; From 98110e6be4152261d891698148f1a872e664f923 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 2 Sep 2024 17:12:24 +0530 Subject: [PATCH 03/30] feat(transport-http): LayerClient --- crates/provider/Cargo.toml | 1 + crates/provider/src/provider/trait.rs | 5 +- crates/transport-http/src/layer_transport.rs | 81 ++++++++-- crates/transport-http/src/layers/logging.rs | 45 ++++++ crates/transport-http/src/layers/mod.rs | 3 + crates/transport-http/src/lib.rs | 4 + .../transport-http/src/reqwest_transport.rs | 152 +----------------- 7 files changed, 126 insertions(+), 165 deletions(-) create mode 100644 crates/transport-http/src/layers/logging.rs create mode 100644 crates/transport-http/src/layers/mod.rs diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 164c442100f..8be41db776c 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -73,6 +73,7 @@ reqwest.workspace = true tokio = { workspace = true, features = ["macros"] } tracing-subscriber = { workspace = true, features = ["fmt"] } tempfile.workspace = true +tower.workspace = true [features] default = ["reqwest", "reqwest-default-tls"] diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 52c49325cdd..09879a43cbb 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1013,6 +1013,7 @@ mod tests { use alloy_node_bindings::Anvil; use alloy_primitives::{address, b256, bytes, keccak256}; use alloy_rpc_types_eth::{request::TransactionRequest, Block}; + use alloy_transport_http::LoggingLayer; fn init_tracing() { let _ = tracing_subscriber::fmt::try_init(); @@ -1039,7 +1040,9 @@ mod tests { async fn test_layer_transport() { init_tracing(); let anvil = Anvil::new().spawn(); - let layer_transport = alloy_transport_http::LayerClient::new(anvil.endpoint_url()); + let service = + tower::ServiceBuilder::new().layer(LoggingLayer).service(reqwest::Client::new()); + let layer_transport = alloy_transport_http::LayerClient::new(anvil.endpoint_url(), service); let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true); diff --git a/crates/transport-http/src/layer_transport.rs b/crates/transport-http/src/layer_transport.rs index ac0cd42f301..3835394c3bf 100644 --- a/crates/transport-http/src/layer_transport.rs +++ b/crates/transport-http/src/layer_transport.rs @@ -2,21 +2,29 @@ use alloy_json_rpc::{RequestPacket, ResponsePacket}; use alloy_transport::{ utils::guess_local_url, TransportConnect, TransportError, TransportErrorKind, TransportFut, }; -use std::task; -use tower::{Service, ServiceBuilder}; +use std::{future::Future, pin::Pin, task}; +use tower::Service; use tracing::{debug, debug_span, trace, Instrument}; use url::Url; /// A [reqwest] client that can be used with tower layers. #[derive(Debug, Clone)] -pub struct LayerClient { +pub struct LayerClient { url: Url, + service: S, } -impl LayerClient { +impl LayerClient +where + S: Service + + Clone + + Send + + 'static, + S::Future: Send, +{ /// Create a new [LayerClient] with the given URL. - pub const fn new(url: Url) -> Self { - Self { url } + pub fn new(url: Url, service: S) -> Self { + Self { url, service } } /// Make a request using the tower service with layers. @@ -25,18 +33,15 @@ impl LayerClient { let span = debug_span!("LayerClient", url = %self.url); Box::pin( async move { - let client = reqwest::Client::new(); + let mut service = this.service.clone(); - let mut service = ServiceBuilder::new().service(client); - - let reqwest_request = service + let raw_req = reqwest::Client::new() .post(this.url.to_owned()) .json(&req) .build() .map_err(TransportErrorKind::custom)?; - let resp = - service.call(reqwest_request).await.map_err(TransportErrorKind::custom)?; + let resp = service.call(raw_req).await.map_err(TransportErrorKind::custom)?; let status = resp.status(); @@ -62,8 +67,16 @@ impl LayerClient { } } -impl TransportConnect for LayerClient { - type Transport = LayerClient; +impl TransportConnect for LayerClient +where + S: Service + + Clone + + Send + + 'static + + Sync, + S::Future: Send, +{ + type Transport = LayerClient; fn is_local(&self) -> bool { guess_local_url(self.url.as_str()) @@ -72,11 +85,18 @@ impl TransportConnect for LayerClient { fn get_transport<'a: 'b, 'b>( &'a self, ) -> alloy_transport::Pbf<'b, Self::Transport, TransportError> { - Box::pin(async move { Ok(LayerClient::new(self.url.clone())) }) + Box::pin(async move { Ok(LayerClient::new(self.url.clone(), self.service.clone())) }) } } -impl Service for LayerClient { +impl Service for LayerClient +where + S: Service + + Clone + + Send + + 'static, + S::Future: Send, +{ type Response = ResponsePacket; type Error = TransportError; type Future = TransportFut<'static>; @@ -89,3 +109,32 @@ impl Service for LayerClient { self.request(req) } } + +/// Future for reqwest responses. +pub type ReqwestResponseFut = + Pin> + Send + 'static>>; + +// impl Service for LayerClient +// where +// S: Service +// + Clone +// + Send +// + 'static, +// S::Future: Send, +// { +// type Response = reqwest::Response; +// type Error = reqwest::Error; +// type Future = ReqwestResponseFut; + +// fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> +// { task::Poll::Ready(Ok(())) +// } + +// fn call(&mut self, req: reqwest::Request) -> Self::Future { +// let fut = self.service.call(req); +// Box::pin(async move { +// let resp = fut.await?; +// Ok(resp) +// }) +// } +// } diff --git a/crates/transport-http/src/layers/logging.rs b/crates/transport-http/src/layers/logging.rs new file mode 100644 index 00000000000..43e398f9ff7 --- /dev/null +++ b/crates/transport-http/src/layers/logging.rs @@ -0,0 +1,45 @@ +use reqwest::{Error, Request, Response}; +use std::task::{Context, Poll}; +use tower::{Layer, Service}; + +/// A logging layer for the HTTP transport. +#[derive(Debug, Clone)] +pub struct LoggingLayer; + +impl Layer for LoggingLayer { + type Service = LoggingService; + + fn layer(&self, inner: S) -> Self::Service { + LoggingService { inner } + } +} + +/// A service that logs requests and responses. +#[derive(Debug, Clone)] +pub struct LoggingService { + inner: S, +} + +impl Service for LoggingService +where + S: Service, + S::Future: Send + 'static, +{ + type Response = Response; + type Error = Error; + type Future = crate::ReqwestResponseFut; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + tracing::info!("LoggingLayer(Request) {:?}", req); + + let future = self.inner.call(req); + Box::pin(async move { + let resp = future.await?; + Ok(resp) + }) + } +} diff --git a/crates/transport-http/src/layers/mod.rs b/crates/transport-http/src/layers/mod.rs new file mode 100644 index 00000000000..f3809d358d3 --- /dev/null +++ b/crates/transport-http/src/layers/mod.rs @@ -0,0 +1,3 @@ +mod logging; + +pub use logging::*; diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index f314813ef05..322b0c9820e 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -17,6 +17,10 @@ mod layer_transport; pub use layer_transport::*; +mod layers; + +pub use layers::{LoggingLayer, LoggingService}; + #[cfg(feature = "reqwest")] pub use reqwest; diff --git a/crates/transport-http/src/reqwest_transport.rs b/crates/transport-http/src/reqwest_transport.rs index 07650d7dcfc..0911a8345de 100644 --- a/crates/transport-http/src/reqwest_transport.rs +++ b/crates/transport-http/src/reqwest_transport.rs @@ -4,10 +4,7 @@ use alloy_transport::{ utils::guess_local_url, TransportConnect, TransportError, TransportErrorKind, TransportFut, }; use std::task; -use tower::{ - layer::util::{Identity, Stack}, - Layer, Service, ServiceBuilder, -}; +use tower::Service; use tracing::{debug, debug_span, trace, Instrument}; use url::Url; @@ -41,7 +38,7 @@ impl Http { } /// Make a request. - fn _request_reqwest(&self, req: RequestPacket) -> TransportFut<'static> { + fn request_reqwest(&self, req: RequestPacket) -> TransportFut<'static> { let this = self.clone(); let span: tracing::Span = debug_span!("ReqwestTransport", url = %self.url); Box::pin( @@ -81,51 +78,6 @@ impl Http { .instrument(span), ) } - - /// Make a request using the tower service with layers. - fn request_reqwest_with_layers(&self, req: RequestPacket) -> TransportFut<'static> { - let this = self.clone(); - let span = debug_span!("ReqwestTransport", url = %self.url); - let client = self.client.clone(); - Box::pin( - async move { - let mut service = - ReqwestBuilder::default().layer(LoggingLayer).on_transport(this.clone()); - - let reqwest_request = - client.post(this.url).json(&req).build().map_err(TransportErrorKind::custom)?; - - let resp = - service.call(reqwest_request).await.map_err(TransportErrorKind::custom)?; - - let status = resp.status(); - - debug!(%status, "received response from server"); - - // Unpack data from the response body. We do this regardless of - // the status code, as we want to return the error in the body - // if there is one. - let body = resp.bytes().await.map_err(TransportErrorKind::custom)?; - - debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body"); - trace!(body = %String::from_utf8_lossy(&body), "response body"); - - if status != reqwest::StatusCode::OK { - return Err(TransportErrorKind::http_error( - status.as_u16(), - String::from_utf8_lossy(&body).into_owned(), - )); - } - - // Deserialize a Box from the body. If deserialization fails, return - // the body as a string in the error. The conversion to String - // is lossy and may not cover all the bytes in the body. - serde_json::from_slice(&body) - .map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(&body))) - } - .instrument(span), - ) - } } impl Service for Http { @@ -141,7 +93,7 @@ impl Service for Http { #[inline] fn call(&mut self, req: RequestPacket) -> Self::Future { - self.request_reqwest_with_layers(req) + self.request_reqwest(req) } } @@ -158,102 +110,6 @@ impl Service for &Http { #[inline] fn call(&mut self, req: RequestPacket) -> Self::Future { - self.request_reqwest_with_layers(req) - } -} - -type ReqwestFuture = - Pin> + Send>>; - -impl Service for Http { - type Response = reqwest::Response; - type Error = reqwest::Error; - type Future = ReqwestFuture; - - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { - task::Poll::Ready(Ok(())) - } - - fn call(&mut self, req: reqwest::Request) -> Self::Future { - let reqwest_client = self.client.clone(); - let future = reqwest_client.execute(req); - Box::pin(async move { - let resp = future.await?; - Ok(resp) - }) - } -} -// == - -/// Builder. -#[derive(Debug)] -pub struct ReqwestBuilder { - inner: ServiceBuilder, -} - -impl Default for ReqwestBuilder { - fn default() -> Self { - Self { inner: ServiceBuilder::new() } - } -} - -impl ReqwestBuilder -where - L: Layer>, -{ - /// Add a middleware layer to the stack. - pub fn layer(self, layer: M) -> ReqwestBuilder> { - ReqwestBuilder { inner: self.inner.layer(layer) } - } - - /// Build with url - pub fn build_with_url(self, url: Url) -> L::Service { - let transport = Http::new(url); - self.on_transport(transport) - } - - /// Get the service from the inner layer. - pub fn on_transport(self, transport: Http) -> L::Service { - self.inner.service(transport) - } -} - -struct LoggingLayer; - -impl Layer for LoggingLayer { - type Service = LoggingService; - - fn layer(&self, inner: S) -> Self::Service { - LoggingService { inner } - } -} - -struct LoggingService { - inner: S, -} - -use std::{future::Future, pin::Pin}; - -impl Service for LoggingService -where - S: Service, - S::Future: Send + 'static, -{ - type Response = reqwest::Response; - type Error = reqwest::Error; - type Future = ReqwestFuture; - - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { - task::Poll::Ready(Ok(())) - } - - fn call(&mut self, req: reqwest::Request) -> Self::Future { - println!("LoggingLayer - request body is some: {:#?}", req.body().is_some()); - - let future = self.inner.call(req); - Box::pin(async move { - let resp = future.await?; - Ok(resp) - }) + self.request_reqwest(req) } } From f4dbb7eb4d671dba44259042f56e347b08e1a983 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 3 Sep 2024 14:42:52 +0530 Subject: [PATCH 04/30] fix: feature gate layer transport --- crates/provider/src/provider/trait.rs | 15 ------- crates/transport-http/src/layers/logging.rs | 45 --------------------- crates/transport-http/src/layers/mod.rs | 3 -- crates/transport-http/src/lib.rs | 11 ++--- 4 files changed, 4 insertions(+), 70 deletions(-) delete mode 100644 crates/transport-http/src/layers/logging.rs delete mode 100644 crates/transport-http/src/layers/mod.rs diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 09879a43cbb..2bc124f688b 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1036,21 +1036,6 @@ mod tests { assert_eq!(0, num); } - #[tokio::test] - async fn test_layer_transport() { - init_tracing(); - let anvil = Anvil::new().spawn(); - let service = - tower::ServiceBuilder::new().layer(LoggingLayer).service(reqwest::Client::new()); - let layer_transport = alloy_transport_http::LayerClient::new(anvil.endpoint_url(), service); - - let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true); - - let provider = RootProvider::<_, Ethereum>::new(rpc_client); - let num = provider.get_block_number().await.unwrap(); - assert_eq!(0, num); - } - #[tokio::test] async fn test_builder_helper_fn_any_network() { init_tracing(); diff --git a/crates/transport-http/src/layers/logging.rs b/crates/transport-http/src/layers/logging.rs deleted file mode 100644 index 43e398f9ff7..00000000000 --- a/crates/transport-http/src/layers/logging.rs +++ /dev/null @@ -1,45 +0,0 @@ -use reqwest::{Error, Request, Response}; -use std::task::{Context, Poll}; -use tower::{Layer, Service}; - -/// A logging layer for the HTTP transport. -#[derive(Debug, Clone)] -pub struct LoggingLayer; - -impl Layer for LoggingLayer { - type Service = LoggingService; - - fn layer(&self, inner: S) -> Self::Service { - LoggingService { inner } - } -} - -/// A service that logs requests and responses. -#[derive(Debug, Clone)] -pub struct LoggingService { - inner: S, -} - -impl Service for LoggingService -where - S: Service, - S::Future: Send + 'static, -{ - type Response = Response; - type Error = Error; - type Future = crate::ReqwestResponseFut; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - tracing::info!("LoggingLayer(Request) {:?}", req); - - let future = self.inner.call(req); - Box::pin(async move { - let resp = future.await?; - Ok(resp) - }) - } -} diff --git a/crates/transport-http/src/layers/mod.rs b/crates/transport-http/src/layers/mod.rs deleted file mode 100644 index f3809d358d3..00000000000 --- a/crates/transport-http/src/layers/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod logging; - -pub use logging::*; diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index 322b0c9820e..3e672d8501a 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -6,6 +6,8 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +#[cfg(feature = "reqwest")] +pub use reqwest; #[cfg(feature = "reqwest")] mod reqwest_transport; @@ -13,16 +15,11 @@ mod reqwest_transport; #[doc(inline)] pub use reqwest_transport::*; +#[cfg(feature = "reqwest")] mod layer_transport; -pub use layer_transport::*; - -mod layers; - -pub use layers::{LoggingLayer, LoggingService}; - #[cfg(feature = "reqwest")] -pub use reqwest; +pub use layer_transport::*; #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] mod hyper_transport; From 0d77863aaa27eec3f4e3c6fef8abfb272b069782 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 3 Sep 2024 14:45:33 +0530 Subject: [PATCH 05/30] rm logging layer --- Cargo.toml | 1 + crates/provider/src/provider/trait.rs | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 94fd42d6977..13c307e25bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,3 +149,4 @@ assert_matches = "1.5" serial_test = "3.0" similar-asserts = "1.5" tempfile = "3.10" +tower-http = "0.5.2" diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 2bc124f688b..a0e1ca4092e 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1013,7 +1013,6 @@ mod tests { use alloy_node_bindings::Anvil; use alloy_primitives::{address, b256, bytes, keccak256}; use alloy_rpc_types_eth::{request::TransactionRequest, Block}; - use alloy_transport_http::LoggingLayer; fn init_tracing() { let _ = tracing_subscriber::fmt::try_init(); @@ -1036,6 +1035,20 @@ mod tests { assert_eq!(0, num); } + #[tokio::test] + async fn test_layer_transport() { + init_tracing(); + let anvil = Anvil::new().spawn(); + let service = tower::ServiceBuilder::new().service(reqwest::Client::new()); + let layer_transport = alloy_transport_http::LayerClient::new(anvil.endpoint_url(), service); + + let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true); + + let provider = RootProvider::<_, Ethereum>::new(rpc_client); + let num = provider.get_block_number().await.unwrap(); + assert_eq!(0, num); + } + #[tokio::test] async fn test_builder_helper_fn_any_network() { init_tracing(); From 386d8cd6478383910e0cf4df1cb2bce8ba72da2f Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 4 Sep 2024 14:12:56 +0530 Subject: [PATCH 06/30] feat(transport-http): hyper layer transport --- crates/provider/Cargo.toml | 1 + crates/provider/src/provider/trait.rs | 23 +++ crates/transport-http/Cargo.toml | 2 +- .../src/hyper_layer_transport.rs | 135 ++++++++++++++++++ crates/transport-http/src/hyper_transport.rs | 2 +- crates/transport-http/src/lib.rs | 6 + 6 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 crates/transport-http/src/hyper_layer_transport.rs diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 8be41db776c..f25ae511247 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -74,6 +74,7 @@ tokio = { workspace = true, features = ["macros"] } tracing-subscriber = { workspace = true, features = ["fmt"] } tempfile.workspace = true tower.workspace = true +http-body-util.workspace = true [features] default = ["reqwest", "reqwest-default-tls"] diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index a0e1ca4092e..b334dc32cfa 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1049,6 +1049,29 @@ mod tests { assert_eq!(0, num); } + #[cfg(feature = "hyper")] + #[tokio::test] + async fn test_hyper_layer_transport_no_layers() { + use alloy_transport_http::{ + hyper::body::Bytes as HyperBytes, + hyper_util::{client::legacy::Client, rt::TokioExecutor}, + }; + use http_body_util::Full; + + init_tracing(); + let anvil = Anvil::new().spawn(); + let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); + let service = tower::ServiceBuilder::new().service(hyper_client); + let layer_transport = + alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url(), service); + + let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true); + + let provider = RootProvider::<_, Ethereum>::new(rpc_client); + let num = provider.get_block_number().await.unwrap(); + assert_eq!(0, num); + } + #[tokio::test] async fn test_builder_helper_fn_any_network() { init_tracing(); diff --git a/crates/transport-http/Cargo.toml b/crates/transport-http/Cargo.toml index 35f757ad3c2..dcdfe4fed21 100644 --- a/crates/transport-http/Cargo.toml +++ b/crates/transport-http/Cargo.toml @@ -35,7 +35,7 @@ hyper = { workspace = true, default-features = false, optional = true } hyper-util = { workspace = true, features = ["full"], optional = true } [features] -default = ["reqwest", "reqwest-default-tls"] +default = ["reqwest", "reqwest-default-tls", "hyper"] reqwest = [ "dep:reqwest", "dep:alloy-json-rpc", diff --git a/crates/transport-http/src/hyper_layer_transport.rs b/crates/transport-http/src/hyper_layer_transport.rs new file mode 100644 index 00000000000..38f623d2694 --- /dev/null +++ b/crates/transport-http/src/hyper_layer_transport.rs @@ -0,0 +1,135 @@ +use alloy_json_rpc::{RequestPacket, ResponsePacket}; +use alloy_transport::{ + utils::guess_local_url, TransportConnect, TransportError, TransportErrorKind, TransportFut, +}; +use http_body_util::{BodyExt, Full}; +use hyper::{ + body::{Buf, Bytes, Incoming}, + header, Request, Response, +}; +use std::{marker::PhantomData, task}; +use tower::Service; +use tracing::{debug, debug_span, trace, Instrument}; +use url::Url; + +/// A [hyper] client that can be used with tower layers. +#[derive(Clone, Debug)] +pub struct HyperLayerTransport { + url: Url, + service: S, + _pd: PhantomData, +} + +type HyperRequest = Request>; +type HyperResponse = Response; +impl HyperLayerTransport +where + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + B: From + Buf + Send + 'static + Clone, +{ + /// Create a new [HyperLayerTransport] with the given URL and service. + pub const fn new(url: Url, service: S) -> Self { + Self { url, service, _pd: PhantomData } + } + + /// Make a request to the server using the given service. + pub fn request(&mut self, req: RequestPacket) -> TransportFut<'static> { + let this = self.clone(); + let span = debug_span!("HyperLayerTransport", url = %this.url); + Box::pin( + async move { + debug!(count = req.len(), "sending request packet to server"); + let ser = req.serialize().map_err(TransportError::ser_err)?; + // convert the Box into a hyper request + let body = Full::from(Bytes::from(>::from(>::from(ser)))); + + let req = hyper::Request::builder() + .method(hyper::Method::POST) + .uri(this.url.as_str()) + .header( + header::CONTENT_TYPE, + header::HeaderValue::from_static("application/json"), + ) + .body(body) + .expect("request parts are invalid"); + + let mut service = this.service.clone(); + let resp = service.call(req).await.map_err(TransportErrorKind::custom)?; + + let status = resp.status(); + + debug!(%status, "received response from server"); + + // Unpack data from the response body. We do this regardless of + // the status code, as we want to return the error in the body + // if there is one. + let body = resp + .into_body() + .collect() + .await + .map_err(TransportErrorKind::custom)? + .to_bytes(); + + debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body"); + trace!(body = %String::from_utf8_lossy(&body), "response body"); + + if status != hyper::StatusCode::OK { + return Err(TransportErrorKind::http_error( + status.as_u16(), + String::from_utf8_lossy(&body).into_owned(), + )); + } + + // Deserialize a Box from the body. If deserialization fails, return + // the body as a string in the error. The conversion to String + // is lossy and may not cover all the bytes in the body. + serde_json::from_slice(&body).map_err(|err| { + TransportError::deser_err(err, String::from_utf8_lossy(body.as_ref())) + }) + } + .instrument(span), + ) + } +} + +impl TransportConnect for HyperLayerTransport +where + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + B: From + Buf + Send + 'static + Clone + Sync, +{ + type Transport = Self; + + fn is_local(&self) -> bool { + guess_local_url(self.url.as_str()) + } + + fn get_transport<'a: 'b, 'b>( + &'a self, + ) -> alloy_transport::Pbf<'b, Self::Transport, alloy_transport::TransportError> { + Box::pin(async move { Ok(Self::new(self.url.clone(), self.service.clone())) }) + } +} + +impl Service for HyperLayerTransport +where + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + B: From + Buf + Send + 'static + Clone + Sync, +{ + type Response = ResponsePacket; + type Error = TransportError; + type Future = TransportFut<'static>; + + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: RequestPacket) -> Self::Future { + self.request(req) + } +} diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index a92a941ee07..eaba9faae92 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -66,7 +66,7 @@ where header::HeaderValue::from_static("application/json"), ) .body(body) - .expect("request parts are valid"); + .expect("request parts are invalid"); let resp = this.client.request(req).await.map_err(TransportErrorKind::custom)?; let status = resp.status(); diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index 3e672d8501a..a8d9bcdd821 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -32,6 +32,12 @@ pub use hyper; #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] pub use hyper_util; +#[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] +mod hyper_layer_transport; +#[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] +#[doc(inline)] +pub use hyper_layer_transport::*; + use alloy_transport::utils::guess_local_url; use core::{marker::PhantomData, str::FromStr}; use url::Url; From 849a7ffdc6f37b2737c8609367af5a0d6d530f6d Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 9 Sep 2024 15:36:23 +0530 Subject: [PATCH 07/30] hyper layer transport test --- crates/provider/src/provider/trait.rs | 83 +++++++++++++++++-- .../src/hyper_layer_transport.rs | 15 +++- crates/transport-http/src/layer_transport.rs | 31 +------ crates/transport-http/src/lib.rs | 4 +- 4 files changed, 95 insertions(+), 38 deletions(-) diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index b334dc32cfa..0568fe2ce56 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1013,6 +1013,20 @@ mod tests { use alloy_node_bindings::Anvil; use alloy_primitives::{address, b256, bytes, keccak256}; use alloy_rpc_types_eth::{request::TransactionRequest, Block}; + // For layer transport tests + #[cfg(feature = "hyper")] + use alloy_transport_http::{ + hyper::body::{Buf, Bytes as HyperBytes}, + hyper_util::{ + client::legacy::{Client, Error}, + rt::TokioExecutor, + }, + HyperRequest, HyperResponse, HyperResponseFut, + }; + #[cfg(feature = "hyper")] + use http_body_util::Full; + #[cfg(feature = "hyper")] + use tower::{Layer, Service}; fn init_tracing() { let _ = tracing_subscriber::fmt::try_init(); @@ -1052,12 +1066,6 @@ mod tests { #[cfg(feature = "hyper")] #[tokio::test] async fn test_hyper_layer_transport_no_layers() { - use alloy_transport_http::{ - hyper::body::Bytes as HyperBytes, - hyper_util::{client::legacy::Client, rt::TokioExecutor}, - }; - use http_body_util::Full; - init_tracing(); let anvil = Anvil::new().spawn(); let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); @@ -1072,6 +1080,69 @@ mod tests { assert_eq!(0, num); } + #[cfg(feature = "hyper")] + #[tokio::test] + async fn test_hyper_layer_transport() { + struct LoggingLayer; + + impl Layer for LoggingLayer { + type Service = LoggingService; + + fn layer(&self, inner: S) -> Self::Service { + LoggingService { inner } + } + } + + #[derive(Clone)] // required + struct LoggingService { + inner: S, + } + + impl Service> for LoggingService + where + S: Service, Response = HyperResponse, Error = Error> + + Clone + + Send + + Sync + + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + B: From + Buf + Send + 'static + Clone + Sync + std::fmt::Debug, + { + type Response = HyperResponse; + type Error = Error; + type Future = HyperResponseFut; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: HyperRequest) -> Self::Future { + println!("Logging Layer - HyperRequest {req:?}"); + + let fut = self.inner.call(req); + + Box::pin(async move { fut.await }) + } + } + + init_tracing(); + let anvil = Anvil::new().spawn(); + let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); + let service = tower::ServiceBuilder::new().layer(LoggingLayer).service(hyper_client); + let layer_transport = + alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url(), service); + + let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true); + + let provider = RootProvider::<_, Ethereum>::new(rpc_client); + let num = provider.get_block_number().await.unwrap(); + assert_eq!(0, num); + } + #[tokio::test] async fn test_builder_helper_fn_any_network() { init_tracing(); diff --git a/crates/transport-http/src/hyper_layer_transport.rs b/crates/transport-http/src/hyper_layer_transport.rs index 38f623d2694..a5a8f7c90cf 100644 --- a/crates/transport-http/src/hyper_layer_transport.rs +++ b/crates/transport-http/src/hyper_layer_transport.rs @@ -7,7 +7,8 @@ use hyper::{ body::{Buf, Bytes, Incoming}, header, Request, Response, }; -use std::{marker::PhantomData, task}; +use hyper_util::client::legacy::Error; +use std::{future::Future, marker::PhantomData, pin::Pin, task}; use tower::Service; use tracing::{debug, debug_span, trace, Instrument}; use url::Url; @@ -20,8 +21,16 @@ pub struct HyperLayerTransport { _pd: PhantomData, } -type HyperRequest = Request>; -type HyperResponse = Response; +/// Alias for [`Request>`] +pub type HyperRequest = Request>; + +/// Alias for [`Response`] +pub type HyperResponse = Response; + +/// Alias for pinned box future that results in [`HyperResponse`] +pub type HyperResponseFut = + Pin> + Send + 'static>>; + impl HyperLayerTransport where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, diff --git a/crates/transport-http/src/layer_transport.rs b/crates/transport-http/src/layer_transport.rs index 3835394c3bf..edea56dfbbd 100644 --- a/crates/transport-http/src/layer_transport.rs +++ b/crates/transport-http/src/layer_transport.rs @@ -23,7 +23,7 @@ where S::Future: Send, { /// Create a new [LayerClient] with the given URL. - pub fn new(url: Url, service: S) -> Self { + pub const fn new(url: Url, service: S) -> Self { Self { url, service } } @@ -76,7 +76,7 @@ where + Sync, S::Future: Send, { - type Transport = LayerClient; + type Transport = Self; fn is_local(&self) -> bool { guess_local_url(self.url.as_str()) @@ -85,7 +85,7 @@ where fn get_transport<'a: 'b, 'b>( &'a self, ) -> alloy_transport::Pbf<'b, Self::Transport, TransportError> { - Box::pin(async move { Ok(LayerClient::new(self.url.clone(), self.service.clone())) }) + Box::pin(async move { Ok(Self::new(self.url.clone(), self.service.clone())) }) } } @@ -113,28 +113,3 @@ where /// Future for reqwest responses. pub type ReqwestResponseFut = Pin> + Send + 'static>>; - -// impl Service for LayerClient -// where -// S: Service -// + Clone -// + Send -// + 'static, -// S::Future: Send, -// { -// type Response = reqwest::Response; -// type Error = reqwest::Error; -// type Future = ReqwestResponseFut; - -// fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> -// { task::Poll::Ready(Ok(())) -// } - -// fn call(&mut self, req: reqwest::Request) -> Self::Future { -// let fut = self.service.call(req); -// Box::pin(async move { -// let resp = fut.await?; -// Ok(resp) -// }) -// } -// } diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index a8d9bcdd821..77f8f5aa63a 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -36,7 +36,9 @@ pub use hyper_util; mod hyper_layer_transport; #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] #[doc(inline)] -pub use hyper_layer_transport::*; +pub use hyper_layer_transport::{ + HyperLayerTransport, HyperRequest, HyperResponse, HyperResponseFut, +}; use alloy_transport::utils::guess_local_url; use core::{marker::PhantomData, str::FromStr}; From f44eacdd89d784b998df63a5abd6bcfd01cc6f9c Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 9 Sep 2024 16:14:51 +0530 Subject: [PATCH 08/30] test with tower-http layers --- Cargo.toml | 1 + crates/provider/Cargo.toml | 5 +++ crates/provider/src/provider/trait.rs | 48 ++++++++++++++++++++++++++- 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 13c307e25bb..86c396ff87c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -134,6 +134,7 @@ thiserror = "1.0" thiserror-no-std = "2.0.2" url = "2.5" derive_more = "1.0.0" +http = "1.1.0" ## serde serde = { version = "1.0", default-features = false, features = [ diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index f25ae511247..7e304c8004c 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -74,7 +74,12 @@ tokio = { workspace = true, features = ["macros"] } tracing-subscriber = { workspace = true, features = ["fmt"] } tempfile.workspace = true tower.workspace = true +tower-http = { workspace = true, features = [ + "set-header", + "sensitive-headers", +] } http-body-util.workspace = true +http.workspace = true [features] default = ["reqwest", "reqwest-default-tls"] diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 0568fe2ce56..8a40ca138f0 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1129,10 +1129,56 @@ mod tests { } } + use http::header::{self, HeaderValue}; + use tower_http::{ + sensitive_headers::SetSensitiveRequestHeadersLayer, set_header::SetRequestHeaderLayer, + }; + init_tracing(); + let anvil = Anvil::new().spawn(); + let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); + + // Setup tower serive with multiple layers modifying request headers + let service = tower::ServiceBuilder::new() + .layer(SetRequestHeaderLayer::if_not_present( + header::USER_AGENT, + HeaderValue::from_static("alloy app"), + )) + .layer(SetRequestHeaderLayer::overriding( + header::AUTHORIZATION, + HeaderValue::from_static("some-jwt-token"), + )) + .layer(SetRequestHeaderLayer::appending( + header::SET_COOKIE, + HeaderValue::from_static("cookie-value"), + )) + .layer(SetSensitiveRequestHeadersLayer::new([header::AUTHORIZATION])) // Hides the jwt token as sensitive. + .layer(LoggingLayer) + .service(hyper_client); + + let layer_transport = + alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url(), service); + + let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true); + + let provider = RootProvider::<_, Ethereum>::new(rpc_client); + let num = provider.get_block_number().await.unwrap(); + assert_eq!(0, num); + } + + #[cfg(feature = "hyper")] + #[tokio::test] + async fn test_layer_transport_with_tower_http() { + use http::header::{self, HeaderValue}; + use tower_http::set_header::SetRequestHeaderLayer; init_tracing(); let anvil = Anvil::new().spawn(); let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); - let service = tower::ServiceBuilder::new().layer(LoggingLayer).service(hyper_client); + let service = tower::ServiceBuilder::new() + .layer(SetRequestHeaderLayer::if_not_present( + header::USER_AGENT, + HeaderValue::from_static("alloy app"), + )) + .service(hyper_client); let layer_transport = alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url(), service); From 69641a6ed5b4258d0fb3c36e3a1521878a52bb45 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 9 Sep 2024 16:15:55 +0530 Subject: [PATCH 09/30] rm reqwest layer transport --- crates/provider/src/provider/trait.rs | 14 --- crates/transport-http/src/layer_transport.rs | 115 ------------------- crates/transport-http/src/lib.rs | 6 - 3 files changed, 135 deletions(-) delete mode 100644 crates/transport-http/src/layer_transport.rs diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 8a40ca138f0..b5701f19de7 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1049,20 +1049,6 @@ mod tests { assert_eq!(0, num); } - #[tokio::test] - async fn test_layer_transport() { - init_tracing(); - let anvil = Anvil::new().spawn(); - let service = tower::ServiceBuilder::new().service(reqwest::Client::new()); - let layer_transport = alloy_transport_http::LayerClient::new(anvil.endpoint_url(), service); - - let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true); - - let provider = RootProvider::<_, Ethereum>::new(rpc_client); - let num = provider.get_block_number().await.unwrap(); - assert_eq!(0, num); - } - #[cfg(feature = "hyper")] #[tokio::test] async fn test_hyper_layer_transport_no_layers() { diff --git a/crates/transport-http/src/layer_transport.rs b/crates/transport-http/src/layer_transport.rs deleted file mode 100644 index edea56dfbbd..00000000000 --- a/crates/transport-http/src/layer_transport.rs +++ /dev/null @@ -1,115 +0,0 @@ -use alloy_json_rpc::{RequestPacket, ResponsePacket}; -use alloy_transport::{ - utils::guess_local_url, TransportConnect, TransportError, TransportErrorKind, TransportFut, -}; -use std::{future::Future, pin::Pin, task}; -use tower::Service; -use tracing::{debug, debug_span, trace, Instrument}; -use url::Url; - -/// A [reqwest] client that can be used with tower layers. -#[derive(Debug, Clone)] -pub struct LayerClient { - url: Url, - service: S, -} - -impl LayerClient -where - S: Service - + Clone - + Send - + 'static, - S::Future: Send, -{ - /// Create a new [LayerClient] with the given URL. - pub const fn new(url: Url, service: S) -> Self { - Self { url, service } - } - - /// Make a request using the tower service with layers. - pub fn request(&self, req: RequestPacket) -> TransportFut<'static> { - let this = self.clone(); - let span = debug_span!("LayerClient", url = %self.url); - Box::pin( - async move { - let mut service = this.service.clone(); - - let raw_req = reqwest::Client::new() - .post(this.url.to_owned()) - .json(&req) - .build() - .map_err(TransportErrorKind::custom)?; - - let resp = service.call(raw_req).await.map_err(TransportErrorKind::custom)?; - - let status = resp.status(); - - debug!(%status, "received response from server"); - - let body = resp.bytes().await.map_err(TransportErrorKind::custom)?; - - debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body"); - trace!(body = %String::from_utf8_lossy(&body), "response body"); - - if status != reqwest::StatusCode::OK { - return Err(TransportErrorKind::http_error( - status.as_u16(), - String::from_utf8_lossy(&body).into_owned(), - )); - } - - serde_json::from_slice(&body) - .map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(&body))) - } - .instrument(span), - ) - } -} - -impl TransportConnect for LayerClient -where - S: Service - + Clone - + Send - + 'static - + Sync, - S::Future: Send, -{ - type Transport = Self; - - fn is_local(&self) -> bool { - guess_local_url(self.url.as_str()) - } - - fn get_transport<'a: 'b, 'b>( - &'a self, - ) -> alloy_transport::Pbf<'b, Self::Transport, TransportError> { - Box::pin(async move { Ok(Self::new(self.url.clone(), self.service.clone())) }) - } -} - -impl Service for LayerClient -where - S: Service - + Clone - + Send - + 'static, - S::Future: Send, -{ - type Response = ResponsePacket; - type Error = TransportError; - type Future = TransportFut<'static>; - - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { - task::Poll::Ready(Ok(())) - } - - fn call(&mut self, req: RequestPacket) -> Self::Future { - self.request(req) - } -} - -/// Future for reqwest responses. -pub type ReqwestResponseFut = - Pin> + Send + 'static>>; diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index 77f8f5aa63a..c812eaec8d8 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -15,12 +15,6 @@ mod reqwest_transport; #[doc(inline)] pub use reqwest_transport::*; -#[cfg(feature = "reqwest")] -mod layer_transport; - -#[cfg(feature = "reqwest")] -pub use layer_transport::*; - #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] mod hyper_transport; #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] From 1821332717f9bbc06303121ba6a9ab9eb9d6f654 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 11 Sep 2024 13:57:47 +0530 Subject: [PATCH 10/30] rm trait bounds for new --- crates/transport-http/src/hyper_layer_transport.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/transport-http/src/hyper_layer_transport.rs b/crates/transport-http/src/hyper_layer_transport.rs index a5a8f7c90cf..b20e2db69f2 100644 --- a/crates/transport-http/src/hyper_layer_transport.rs +++ b/crates/transport-http/src/hyper_layer_transport.rs @@ -31,6 +31,13 @@ pub type HyperResponse = Response; pub type HyperResponseFut = Pin> + Send + 'static>>; +impl HyperLayerTransport { + /// Create a new [HyperLayerTransport] with the given URL and service. + pub const fn new(url: Url, service: S) -> Self { + Self { url, service, _pd: PhantomData } + } +} + impl HyperLayerTransport where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, @@ -38,11 +45,6 @@ where S::Error: std::error::Error + Send + Sync + 'static, B: From + Buf + Send + 'static + Clone, { - /// Create a new [HyperLayerTransport] with the given URL and service. - pub const fn new(url: Url, service: S) -> Self { - Self { url, service, _pd: PhantomData } - } - /// Make a request to the server using the given service. pub fn request(&mut self, req: RequestPacket) -> TransportFut<'static> { let this = self.clone(); From 5b42d911b0d85a92eae91abe55f55e8831ea0e77 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 11 Sep 2024 14:03:47 +0530 Subject: [PATCH 11/30] nit --- crates/transport-http/src/hyper_layer_transport.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/transport-http/src/hyper_layer_transport.rs b/crates/transport-http/src/hyper_layer_transport.rs index b20e2db69f2..fcabfa7049d 100644 --- a/crates/transport-http/src/hyper_layer_transport.rs +++ b/crates/transport-http/src/hyper_layer_transport.rs @@ -4,7 +4,7 @@ use alloy_transport::{ }; use http_body_util::{BodyExt, Full}; use hyper::{ - body::{Buf, Bytes, Incoming}, + body::{Buf, Incoming}, header, Request, Response, }; use hyper_util::client::legacy::Error; @@ -43,7 +43,7 @@ where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, - B: From + Buf + Send + 'static + Clone, + B: From> + Buf + Send + 'static + Clone, { /// Make a request to the server using the given service. pub fn request(&mut self, req: RequestPacket) -> TransportFut<'static> { @@ -54,7 +54,7 @@ where debug!(count = req.len(), "sending request packet to server"); let ser = req.serialize().map_err(TransportError::ser_err)?; // convert the Box into a hyper request - let body = Full::from(Bytes::from(>::from(>::from(ser)))); + let body = ser.get().as_bytes().to_owned().into(); let req = hyper::Request::builder() .method(hyper::Method::POST) @@ -110,7 +110,7 @@ where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, - B: From + Buf + Send + 'static + Clone + Sync, + B: From> + Buf + Send + 'static + Clone + Sync, { type Transport = Self; @@ -130,7 +130,7 @@ where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, - B: From + Buf + Send + 'static + Clone + Sync, + B: From> + Buf + Send + 'static + Clone + Sync, { type Response = ResponsePacket; type Error = TransportError; From 5f65b9383a4898f707c27c4fe99dd5b7c4cb40e2 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 16 Sep 2024 15:39:33 +0530 Subject: [PATCH 12/30] unify hyper transports --- crates/provider/Cargo.toml | 2 +- crates/provider/src/provider/trait.rs | 21 ++++-- .../src/hyper_layer_transport.rs | 64 +++++++++++++++++++ crates/transport-http/src/lib.rs | 17 +++-- 4 files changed, 93 insertions(+), 11 deletions(-) diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 7e304c8004c..8cd60576462 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -82,7 +82,7 @@ http-body-util.workspace = true http.workspace = true [features] -default = ["reqwest", "reqwest-default-tls"] +default = ["reqwest", "reqwest-default-tls", "hyper"] pubsub = ["alloy-rpc-client/pubsub", "dep:alloy-pubsub"] reqwest = [ "dep:reqwest", diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index b5701f19de7..4bbf4fd4bcd 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1049,17 +1049,24 @@ mod tests { assert_eq!(0, num); } + #[cfg(feature = "hyper")] + #[tokio::test] + async fn test_http_hyper_client() {} + #[cfg(feature = "hyper")] #[tokio::test] async fn test_hyper_layer_transport_no_layers() { init_tracing(); let anvil = Anvil::new().spawn(); let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); - let service = tower::ServiceBuilder::new().service(hyper_client); - let layer_transport = - alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url(), service); + // let service = tower::ServiceBuilder::new().service(hyper_client); + // let layer_transport = + // alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url(), service); - let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true); + let http_hyper = + alloy_transport_http::Http::with_client(hyper_client, anvil.endpoint_url()); + + let rpc_client = alloy_rpc_client::RpcClient::new(http_hyper, true); let provider = RootProvider::<_, Ethereum>::new(rpc_client); let num = provider.get_block_number().await.unwrap(); @@ -1114,7 +1121,6 @@ mod tests { Box::pin(async move { fut.await }) } } - use http::header::{self, HeaderValue}; use tower_http::{ sensitive_headers::SetSensitiveRequestHeadersLayer, set_header::SetRequestHeaderLayer, @@ -1144,7 +1150,10 @@ mod tests { let layer_transport = alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url(), service); - let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true); + let http_hyper = + alloy_transport_http::Http::with_client(layer_transport, anvil.endpoint_url()); + + let rpc_client = alloy_rpc_client::RpcClient::new(http_hyper, true); let provider = RootProvider::<_, Ethereum>::new(rpc_client); let num = provider.get_block_number().await.unwrap(); diff --git a/crates/transport-http/src/hyper_layer_transport.rs b/crates/transport-http/src/hyper_layer_transport.rs index fcabfa7049d..3f8c8e5ac11 100644 --- a/crates/transport-http/src/hyper_layer_transport.rs +++ b/crates/transport-http/src/hyper_layer_transport.rs @@ -13,6 +13,8 @@ use tower::Service; use tracing::{debug, debug_span, trace, Instrument}; use url::Url; +use crate::{Http, HttpConnect}; + /// A [hyper] client that can be used with tower layers. #[derive(Clone, Debug)] pub struct HyperLayerTransport { @@ -105,6 +107,68 @@ where } } +impl Http> +where + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + B: From> + Buf + Send + 'static + Clone, +{ + /// Make a request to the server using the underlying service that may or may not contain + /// layers. + pub fn request_hyper(&mut self, req: RequestPacket) -> TransportFut<'static> { + self.client.request(req) + } +} + +impl TransportConnect for HttpConnect> +where + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + B: From> + Buf + Send + 'static + Clone + Sync, +{ + type Transport = HyperLayerTransport; + + fn is_local(&self) -> bool { + guess_local_url(self.url.as_str()) + } + + fn get_transport<'a: 'b, 'b>( + &'a self, + ) -> alloy_transport::Pbf<'b, Self::Transport, TransportError> { + Box::pin(async move { + match &self.transport { + Some(transport) => { + let underlying_svc_transport = transport.clone().service; + Ok(HyperLayerTransport::new(self.url.clone(), underlying_svc_transport)) + } + None => Err(TransportErrorKind::custom_str("Transport not initialized".into())), + } + }) + } +} + +impl Service for Http> +where + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + B: From> + Buf + Send + 'static + Clone + Sync, +{ + type Response = ResponsePacket; + type Error = TransportError; + type Future = TransportFut<'static>; + + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: RequestPacket) -> Self::Future { + self.request_hyper(req) + } +} + impl TransportConnect for HyperLayerTransport where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index c812eaec8d8..c78a6b4c2bd 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -35,28 +35,37 @@ pub use hyper_layer_transport::{ }; use alloy_transport::utils::guess_local_url; -use core::{marker::PhantomData, str::FromStr}; +use core::str::FromStr; use url::Url; /// Connection details for an HTTP transport. #[derive(Clone, Debug, PartialEq, Eq, Hash)] #[doc(hidden)] -pub struct HttpConnect { +pub struct HttpConnect { /// The URL to connect to. url: Url, - _pd: PhantomData, + transport: Option, } impl HttpConnect { /// Create a new [`HttpConnect`] with the given URL. pub const fn new(url: Url) -> Self { - Self { url, _pd: PhantomData } + Self { url, transport: None } + } + + pub const fn with_transport(transport: T, url: Url) -> Self { + Self { url, transport: Some(transport) } } /// Get a reference to the URL. pub const fn url(&self) -> &Url { &self.url } + + /// Get a reference to the client. + pub const fn transport(&self) -> &Option { + &self.transport + } } impl FromStr for HttpConnect { From 814bd6040533178df01ad744a2b09b151da78b46 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 16 Sep 2024 15:43:26 +0530 Subject: [PATCH 13/30] rm TransportConnect for HyperLayerTransport --- crates/provider/src/provider/trait.rs | 4 ---- .../src/hyper_layer_transport.rs | 20 ------------------- 2 files changed, 24 deletions(-) diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 4bbf4fd4bcd..ad2a81db899 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1049,10 +1049,6 @@ mod tests { assert_eq!(0, num); } - #[cfg(feature = "hyper")] - #[tokio::test] - async fn test_http_hyper_client() {} - #[cfg(feature = "hyper")] #[tokio::test] async fn test_hyper_layer_transport_no_layers() { diff --git a/crates/transport-http/src/hyper_layer_transport.rs b/crates/transport-http/src/hyper_layer_transport.rs index 3f8c8e5ac11..64217fdf12c 100644 --- a/crates/transport-http/src/hyper_layer_transport.rs +++ b/crates/transport-http/src/hyper_layer_transport.rs @@ -169,26 +169,6 @@ where } } -impl TransportConnect for HyperLayerTransport -where - S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, - S::Future: Send, - S::Error: std::error::Error + Send + Sync + 'static, - B: From> + Buf + Send + 'static + Clone + Sync, -{ - type Transport = Self; - - fn is_local(&self) -> bool { - guess_local_url(self.url.as_str()) - } - - fn get_transport<'a: 'b, 'b>( - &'a self, - ) -> alloy_transport::Pbf<'b, Self::Transport, alloy_transport::TransportError> { - Box::pin(async move { Ok(Self::new(self.url.clone(), self.service.clone())) }) - } -} - impl Service for HyperLayerTransport where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, From d9e14c0009ec7bf1edaae3e072f4e3c90802f046 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 16 Sep 2024 16:24:36 +0530 Subject: [PATCH 14/30] make request generic --- crates/provider/src/provider/trait.rs | 13 ++++---- .../src/hyper_layer_transport.rs | 30 ++++++++----------- crates/transport-http/src/lib.rs | 4 +-- 3 files changed, 20 insertions(+), 27 deletions(-) diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index ad2a81db899..97386398308 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1016,12 +1016,13 @@ mod tests { // For layer transport tests #[cfg(feature = "hyper")] use alloy_transport_http::{ - hyper::body::{Buf, Bytes as HyperBytes}, + hyper, + hyper::body::Bytes as HyperBytes, hyper_util::{ client::legacy::{Client, Error}, rt::TokioExecutor, }, - HyperRequest, HyperResponse, HyperResponseFut, + HyperResponse, HyperResponseFut, }; #[cfg(feature = "hyper")] use http_body_util::Full; @@ -1087,16 +1088,16 @@ mod tests { inner: S, } - impl Service> for LoggingService + impl Service> for LoggingService where - S: Service, Response = HyperResponse, Error = Error> + S: Service, Response = HyperResponse, Error = Error> + Clone + Send + Sync + 'static, S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, - B: From + Buf + Send + 'static + Clone + Sync + std::fmt::Debug, + B: From> + Send + 'static + Clone + Sync + std::fmt::Debug, { type Response = HyperResponse; type Error = Error; @@ -1109,7 +1110,7 @@ mod tests { self.inner.poll_ready(cx) } - fn call(&mut self, req: HyperRequest) -> Self::Future { + fn call(&mut self, req: hyper::Request) -> Self::Future { println!("Logging Layer - HyperRequest {req:?}"); let fut = self.inner.call(req); diff --git a/crates/transport-http/src/hyper_layer_transport.rs b/crates/transport-http/src/hyper_layer_transport.rs index 64217fdf12c..c39cbd5e424 100644 --- a/crates/transport-http/src/hyper_layer_transport.rs +++ b/crates/transport-http/src/hyper_layer_transport.rs @@ -2,11 +2,8 @@ use alloy_json_rpc::{RequestPacket, ResponsePacket}; use alloy_transport::{ utils::guess_local_url, TransportConnect, TransportError, TransportErrorKind, TransportFut, }; -use http_body_util::{BodyExt, Full}; -use hyper::{ - body::{Buf, Incoming}, - header, Request, Response, -}; +use http_body_util::BodyExt; +use hyper::{body::Incoming, header, Request, Response}; use hyper_util::client::legacy::Error; use std::{future::Future, marker::PhantomData, pin::Pin, task}; use tower::Service; @@ -23,9 +20,6 @@ pub struct HyperLayerTransport { _pd: PhantomData, } -/// Alias for [`Request>`] -pub type HyperRequest = Request>; - /// Alias for [`Response`] pub type HyperResponse = Response; @@ -42,10 +36,10 @@ impl HyperLayerTransport { impl HyperLayerTransport where - S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, - B: From> + Buf + Send + 'static + Clone, + B: From> + Send + 'static + Clone, { /// Make a request to the server using the given service. pub fn request(&mut self, req: RequestPacket) -> TransportFut<'static> { @@ -109,10 +103,10 @@ where impl Http> where - S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, - B: From> + Buf + Send + 'static + Clone, + B: From> + Send + 'static + Clone, { /// Make a request to the server using the underlying service that may or may not contain /// layers. @@ -123,10 +117,10 @@ where impl TransportConnect for HttpConnect> where - S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, - B: From> + Buf + Send + 'static + Clone + Sync, + B: From> + Send + 'static + Clone + Sync, { type Transport = HyperLayerTransport; @@ -151,10 +145,10 @@ where impl Service for Http> where - S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, - B: From> + Buf + Send + 'static + Clone + Sync, + B: From> + Send + 'static + Clone + Sync, { type Response = ResponsePacket; type Error = TransportError; @@ -171,10 +165,10 @@ where impl Service for HyperLayerTransport where - S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, - B: From> + Buf + Send + 'static + Clone + Sync, + B: From> + Send + 'static + Clone + Sync, { type Response = ResponsePacket; type Error = TransportError; diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index c78a6b4c2bd..4d154f65100 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -30,9 +30,7 @@ pub use hyper_util; mod hyper_layer_transport; #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] #[doc(inline)] -pub use hyper_layer_transport::{ - HyperLayerTransport, HyperRequest, HyperResponse, HyperResponseFut, -}; +pub use hyper_layer_transport::{HyperLayerTransport, HyperResponse, HyperResponseFut}; use alloy_transport::utils::guess_local_url; use core::str::FromStr; From 0b201f2000fac5cd8d05e1916004702565af3913 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:14:29 +0530 Subject: [PATCH 15/30] unify hyper transports --- crates/provider/src/builder.rs | 4 +- crates/provider/src/lib.rs | 2 +- crates/provider/src/provider/trait.rs | 25 ++- crates/rpc-client/Cargo.toml | 4 +- crates/rpc-client/src/builder.rs | 7 +- .../src/hyper_layer_transport.rs | 93 ++++++----- crates/transport-http/src/hyper_transport.rs | 148 ------------------ crates/transport-http/src/lib.rs | 10 +- 8 files changed, 84 insertions(+), 209 deletions(-) delete mode 100644 crates/transport-http/src/hyper_transport.rs diff --git a/crates/provider/src/builder.rs b/crates/provider/src/builder.rs index 4eda6628d2f..700b8b7875e 100644 --- a/crates/provider/src/builder.rs +++ b/crates/provider/src/builder.rs @@ -351,13 +351,13 @@ impl ProviderBuilder { where L: ProviderLayer< crate::HyperProvider, - alloy_transport_http::Http, + alloy_transport_http::Http, N, >, F: TxFiller + ProviderLayer< L::Provider, - alloy_transport_http::Http, + alloy_transport_http::Http, N, >, N: Network, diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index 416c98da727..5be942aae37 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -20,7 +20,7 @@ pub type ReqwestProvider = /// [`Http`]: alloy_transport_http::Http #[cfg(feature = "hyper")] pub type HyperProvider = - crate::RootProvider, N>; + crate::RootProvider, N>; #[macro_use] extern crate tracing; diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 97386398308..94b93c6a886 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1055,13 +1055,10 @@ mod tests { async fn test_hyper_layer_transport_no_layers() { init_tracing(); let anvil = Anvil::new().spawn(); - let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); - // let service = tower::ServiceBuilder::new().service(hyper_client); - // let layer_transport = - // alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url(), service); + let layer_transport = alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url()); let http_hyper = - alloy_transport_http::Http::with_client(hyper_client, anvil.endpoint_url()); + alloy_transport_http::Http::with_client(layer_transport, anvil.endpoint_url()); let rpc_client = alloy_rpc_client::RpcClient::new(http_hyper, true); @@ -1145,7 +1142,7 @@ mod tests { .service(hyper_client); let layer_transport = - alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url(), service); + alloy_transport_http::HyperLayerTransport::with_service(anvil.endpoint_url(), service); let http_hyper = alloy_transport_http::Http::with_client(layer_transport, anvil.endpoint_url()); @@ -1155,6 +1152,15 @@ mod tests { let provider = RootProvider::<_, Ethereum>::new(rpc_client); let num = provider.get_block_number().await.unwrap(); assert_eq!(0, num); + + // Test Cloning + let cloned_t = provider.client().transport().clone(); + + let rpc_client = alloy_rpc_client::RpcClient::new(cloned_t, true); + + let provider = RootProvider::<_, Ethereum>::new(rpc_client); + let num = provider.get_block_number().await.unwrap(); + assert_eq!(0, num); } #[cfg(feature = "hyper")] @@ -1172,9 +1178,12 @@ mod tests { )) .service(hyper_client); let layer_transport = - alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url(), service); + alloy_transport_http::HyperLayerTransport::with_service(anvil.endpoint_url(), service); + + let http_hyper = + alloy_transport_http::Http::with_client(layer_transport, anvil.endpoint_url()); - let rpc_client = alloy_rpc_client::RpcClient::new(layer_transport, true); + let rpc_client = alloy_rpc_client::RpcClient::new(http_hyper, true); let provider = RootProvider::<_, Ethereum>::new(rpc_client); let num = provider.get_block_number().await.unwrap(); diff --git a/crates/rpc-client/Cargo.toml b/crates/rpc-client/Cargo.toml index 74f7e1c7dbf..ef694848afb 100644 --- a/crates/rpc-client/Cargo.toml +++ b/crates/rpc-client/Cargo.toml @@ -38,8 +38,6 @@ alloy-transport-ws = { workspace = true, optional = true } reqwest = { workspace = true, optional = true } -hyper-util = { workspace = true, optional = true } - url = { workspace = true, optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] @@ -57,7 +55,7 @@ futures-util.workspace = true [features] default = ["reqwest"] reqwest = ["dep:url", "dep:reqwest", "alloy-transport-http/reqwest"] -hyper = ["dep:url", "dep:hyper-util", "alloy-transport-http/hyper"] +hyper = ["dep:url", "alloy-transport-http/hyper"] pubsub = ["dep:alloy-pubsub", "dep:alloy-primitives"] ws = ["pubsub", "dep:alloy-transport-ws", "dep:url"] ipc = ["pubsub", "dep:alloy-transport-ipc"] diff --git a/crates/rpc-client/src/builder.rs b/crates/rpc-client/src/builder.rs index 28be4963a76..99e214ac4f6 100644 --- a/crates/rpc-client/src/builder.rs +++ b/crates/rpc-client/src/builder.rs @@ -64,12 +64,11 @@ impl ClientBuilder { #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] pub fn hyper_http(self, url: url::Url) -> RpcClient where - L: Layer>, + L: Layer>, L::Service: Transport, { - let executor = hyper_util::rt::TokioExecutor::new(); - let client = hyper_util::client::legacy::Client::builder(executor).build_http(); - let transport = alloy_transport_http::Http::with_client(client, url); + let hyper = alloy_transport_http::HyperLayerTransport::new(url.clone()); + let transport = alloy_transport_http::Http::with_client(hyper, url); let is_local = transport.guess_local(); self.transport(transport, is_local) diff --git a/crates/transport-http/src/hyper_layer_transport.rs b/crates/transport-http/src/hyper_layer_transport.rs index c39cbd5e424..1f85a3d92d8 100644 --- a/crates/transport-http/src/hyper_layer_transport.rs +++ b/crates/transport-http/src/hyper_layer_transport.rs @@ -2,8 +2,11 @@ use alloy_json_rpc::{RequestPacket, ResponsePacket}; use alloy_transport::{ utils::guess_local_url, TransportConnect, TransportError, TransportErrorKind, TransportFut, }; -use http_body_util::BodyExt; -use hyper::{body::Incoming, header, Request, Response}; +use http_body_util::{BodyExt, Full}; +use hyper::{ + body::{Bytes, Incoming}, + header, Request, Response, +}; use hyper_util::client::legacy::Error; use std::{future::Future, marker::PhantomData, pin::Pin, task}; use tower::Service; @@ -12,9 +15,15 @@ use url::Url; use crate::{Http, HttpConnect}; +/// A [`hyper`] HTTP client. +pub type HyperClient = hyper_util::client::legacy::Client< + hyper_util::client::legacy::connect::HttpConnector, + http_body_util::Full<::hyper::body::Bytes>, +>; + /// A [hyper] client that can be used with tower layers. #[derive(Clone, Debug)] -pub struct HyperLayerTransport { +pub struct HyperLayerTransport, S = HyperClient> { url: Url, service: S, _pd: PhantomData, @@ -27,14 +36,26 @@ pub type HyperResponse = Response; pub type HyperResponseFut = Pin> + Send + 'static>>; -impl HyperLayerTransport { +impl HyperLayerTransport { + /// Create a new [HyperLayerTransport] with the given URL and default hyper client. + pub fn new(url: Url) -> Self { + let executor = hyper_util::rt::TokioExecutor::new(); + + let service = + hyper_util::client::legacy::Client::builder(executor).build_http::>(); + + Self { url, service, _pd: PhantomData } + } +} + +impl HyperLayerTransport { /// Create a new [HyperLayerTransport] with the given URL and service. - pub const fn new(url: Url, service: S) -> Self { + pub const fn with_service(url: Url, service: S) -> Self { Self { url, service, _pd: PhantomData } } } -impl HyperLayerTransport +impl HyperLayerTransport where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, @@ -101,7 +122,7 @@ where } } -impl Http> +impl Http> where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, @@ -115,14 +136,14 @@ where } } -impl TransportConnect for HttpConnect> +impl TransportConnect for HttpConnect> where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, B: From> + Send + 'static + Clone + Sync, { - type Transport = HyperLayerTransport; + type Transport = Http>; fn is_local(&self) -> bool { guess_local_url(self.url.as_str()) @@ -132,18 +153,18 @@ where &'a self, ) -> alloy_transport::Pbf<'b, Self::Transport, TransportError> { Box::pin(async move { - match &self.transport { - Some(transport) => { - let underlying_svc_transport = transport.clone().service; - Ok(HyperLayerTransport::new(self.url.clone(), underlying_svc_transport)) - } - None => Err(TransportErrorKind::custom_str("Transport not initialized".into())), - } + self.transport.as_ref().map_or_else( + || Err(TransportErrorKind::custom_str("transport not initialized")), + |t| { + let transport = t.clone(); + Ok(Http::with_client(transport, self.url.clone())) + }, + ) }) } } -impl Service for Http> +impl Service for Http> where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, @@ -163,22 +184,22 @@ where } } -impl Service for HyperLayerTransport -where - S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, - S::Future: Send, - S::Error: std::error::Error + Send + Sync + 'static, - B: From> + Send + 'static + Clone + Sync, -{ - type Response = ResponsePacket; - type Error = TransportError; - type Future = TransportFut<'static>; - - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { - task::Poll::Ready(Ok(())) - } - - fn call(&mut self, req: RequestPacket) -> Self::Future { - self.request(req) - } -} +// impl Service for HyperLayerTransport +// where +// S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, +// S::Future: Send, +// S::Error: std::error::Error + Send + Sync + 'static, +// B: From> + Send + 'static + Clone + Sync, +// { +// type Response = ResponsePacket; +// type Error = TransportError; +// type Future = TransportFut<'static>; + +// fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> +// { task::Poll::Ready(Ok(())) +// } + +// fn call(&mut self, req: RequestPacket) -> Self::Future { +// self.request(req) +// } +// } diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs deleted file mode 100644 index eaba9faae92..00000000000 --- a/crates/transport-http/src/hyper_transport.rs +++ /dev/null @@ -1,148 +0,0 @@ -use crate::{Http, HttpConnect}; -use alloy_json_rpc::{RequestPacket, ResponsePacket}; -use alloy_transport::{ - utils::guess_local_url, TransportConnect, TransportError, TransportErrorKind, TransportFut, -}; -use http_body_util::{BodyExt, Full}; -use hyper::{ - body::{Buf, Bytes}, - header, -}; -use hyper_util::client::legacy::{connect::Connect, Client}; -use std::task; -use tower::Service; -use tracing::{debug, debug_span, trace, Instrument}; - -/// A [`hyper`] HTTP client. -pub type HyperClient = hyper_util::client::legacy::Client< - hyper_util::client::legacy::connect::HttpConnector, - http_body_util::Full<::hyper::body::Bytes>, ->; - -/// An [`Http`] transport using [`hyper`]. -pub type HyperTransport = Http; - -/// Connection details for a [`HyperTransport`]. -pub type HyperConnect = HttpConnect; - -impl TransportConnect for HyperConnect { - type Transport = HyperTransport; - - fn is_local(&self) -> bool { - guess_local_url(self.url.as_str()) - } - - fn get_transport<'a: 'b, 'b>( - &'a self, - ) -> alloy_transport::Pbf<'b, Self::Transport, TransportError> { - let executor = hyper_util::rt::TokioExecutor::new(); - - let client = hyper_util::client::legacy::Client::builder(executor).build_http(); - - Box::pin(async move { Ok(Http::with_client(client, self.url.clone())) }) - } -} - -impl Http>> -where - C: Connect + Clone + Send + Sync + 'static, - B: From + Buf + Send + 'static, -{ - /// Make a request. - fn request_hyper(&self, req: RequestPacket) -> TransportFut<'static> { - let this = self.clone(); - let span = debug_span!("HyperTransport", url = %self.url); - Box::pin( - async move { - debug!(count = req.len(), "sending request packet to server"); - let ser = req.serialize().map_err(TransportError::ser_err)?; - // convert the Box into a hyper request - let body = Full::from(Bytes::from(>::from(>::from(ser)))); - let req = hyper::Request::builder() - .method(hyper::Method::POST) - .uri(this.url.as_str()) - .header( - header::CONTENT_TYPE, - header::HeaderValue::from_static("application/json"), - ) - .body(body) - .expect("request parts are invalid"); - - let resp = this.client.request(req).await.map_err(TransportErrorKind::custom)?; - let status = resp.status(); - - debug!(%status, "received response from server"); - - // Unpack data from the response body. We do this regardless of - // the status code, as we want to return the error in the body - // if there is one. - let body = resp - .into_body() - .collect() - .await - .map_err(TransportErrorKind::custom)? - .to_bytes(); - - debug!(bytes = body.len(), "retrieved response body. Use `trace` for full body"); - trace!(body = %String::from_utf8_lossy(&body), "response body"); - - if status != hyper::StatusCode::OK { - return Err(TransportErrorKind::http_error( - status.as_u16(), - String::from_utf8_lossy(&body).into_owned(), - )); - } - - // Deserialize a Box from the body. If deserialization fails, return - // the body as a string in the error. The conversion to String - // is lossy and may not cover all the bytes in the body. - serde_json::from_slice(&body).map_err(|err| { - TransportError::deser_err(err, String::from_utf8_lossy(body.as_ref())) - }) - } - .instrument(span), - ) - } -} - -impl Service for &Http>> -where - C: Connect + Clone + Send + Sync + 'static, - B: From + Buf + Send + 'static, -{ - type Response = ResponsePacket; - type Error = TransportError; - type Future = TransportFut<'static>; - - #[inline] - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { - // hyper always returns ok - task::Poll::Ready(Ok(())) - } - - #[inline] - fn call(&mut self, req: RequestPacket) -> Self::Future { - self.request_hyper(req) - } -} - -impl Service for Http>> -where - C: Connect + Clone + Send + Sync + 'static, - B: From + Buf + Send + 'static, -{ - type Response = ResponsePacket; - type Error = TransportError; - type Future = TransportFut<'static>; - - #[inline] - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { - // hyper always returns ok - task::Poll::Ready(Ok(())) - } - - #[inline] - fn call(&mut self, req: RequestPacket) -> Self::Future { - self.request_hyper(req) - } -} diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index 4d154f65100..8b734804657 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -15,12 +15,6 @@ mod reqwest_transport; #[doc(inline)] pub use reqwest_transport::*; -#[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] -mod hyper_transport; -#[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] -#[doc(inline)] -pub use hyper_transport::*; - #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] pub use hyper; #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] @@ -30,7 +24,9 @@ pub use hyper_util; mod hyper_layer_transport; #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] #[doc(inline)] -pub use hyper_layer_transport::{HyperLayerTransport, HyperResponse, HyperResponseFut}; +pub use hyper_layer_transport::{ + HyperClient, HyperLayerTransport, HyperResponse, HyperResponseFut, +}; use alloy_transport::utils::guess_local_url; use core::str::FromStr; From 8e65d0fb1b0dd29631e8bde1c38e470f782be4dd Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:17:36 +0530 Subject: [PATCH 16/30] nit --- crates/provider/Cargo.toml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 5cd4efcdd6f..12444152f42 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -32,7 +32,9 @@ alloy-rpc-types-anvil = { workspace = true, optional = true } alloy-rpc-types-eth = { workspace = true, features = ["serde"] } alloy-rpc-types-trace = { workspace = true, optional = true } alloy-rpc-types-txpool = { workspace = true, optional = true } -alloy-rpc-types-engine = { workspace = true, optional = true, features = ["serde"] } +alloy-rpc-types-engine = { workspace = true, optional = true, features = [ + "serde", +] } alloy-rpc-types = { workspace = true, optional = true } alloy-transport-http = { workspace = true, optional = true } alloy-transport-ipc = { workspace = true, optional = true } @@ -82,7 +84,7 @@ http-body-util.workspace = true http.workspace = true [features] -default = ["reqwest", "reqwest-default-tls", "hyper"] +default = ["reqwest", "reqwest-default-tls"] pubsub = ["alloy-rpc-client/pubsub", "dep:alloy-pubsub"] reqwest = [ "dep:reqwest", From 3310d0544573d047b56a7c124805544c419dcad1 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:22:49 +0530 Subject: [PATCH 17/30] nit --- crates/provider/Cargo.toml | 2 +- crates/provider/src/provider/trait.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 12444152f42..55cb0577fb0 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -84,7 +84,7 @@ http-body-util.workspace = true http.workspace = true [features] -default = ["reqwest", "reqwest-default-tls"] +default = ["reqwest", "reqwest-default-tls", "hyper"] pubsub = ["alloy-rpc-client/pubsub", "dep:alloy-pubsub"] reqwest = [ "dep:reqwest", diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 94b93c6a886..067b2be4f5b 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1112,7 +1112,7 @@ mod tests { let fut = self.inner.call(req); - Box::pin(async move { fut.await }) + Box::pin(fut) } } use http::header::{self, HeaderValue}; From 1f0c99b9c74000649210419d42c546caa41edf68 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:35:09 +0530 Subject: [PATCH 18/30] nit --- crates/alloy/Cargo.toml | 2 +- crates/provider/Cargo.toml | 2 +- crates/transport-http/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/alloy/Cargo.toml b/crates/alloy/Cargo.toml index cbea4be8d4d..20bb53c3146 100644 --- a/crates/alloy/Cargo.toml +++ b/crates/alloy/Cargo.toml @@ -238,7 +238,7 @@ signer-yubihsm = ["signer-local", "alloy-signer-local?/yubihsm"] # transports transports = ["dep:alloy-transport"] -transport-http = ["transports", "dep:alloy-transport-http"] +transport-http = ["transports", "dep:alloy-transport-http", "reqwest"] transport-ipc = ["transports", "pubsub", "dep:alloy-transport-ipc"] transport-ipc-mock = ["alloy-transport-ipc?/mock"] transport-ws = ["transports", "pubsub", "dep:alloy-transport-ws"] diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 55cb0577fb0..12444152f42 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -84,7 +84,7 @@ http-body-util.workspace = true http.workspace = true [features] -default = ["reqwest", "reqwest-default-tls", "hyper"] +default = ["reqwest", "reqwest-default-tls"] pubsub = ["alloy-rpc-client/pubsub", "dep:alloy-pubsub"] reqwest = [ "dep:reqwest", diff --git a/crates/transport-http/Cargo.toml b/crates/transport-http/Cargo.toml index dcdfe4fed21..35f757ad3c2 100644 --- a/crates/transport-http/Cargo.toml +++ b/crates/transport-http/Cargo.toml @@ -35,7 +35,7 @@ hyper = { workspace = true, default-features = false, optional = true } hyper-util = { workspace = true, features = ["full"], optional = true } [features] -default = ["reqwest", "reqwest-default-tls", "hyper"] +default = ["reqwest", "reqwest-default-tls"] reqwest = [ "dep:reqwest", "dep:alloy-json-rpc", From 1971f605d2d009fdb33f15617d122da20c51b09d Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:41:39 +0530 Subject: [PATCH 19/30] rm unintended reqwest default --- crates/alloy/Cargo.toml | 11 +++++++++-- crates/transport-http/src/lib.rs | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/crates/alloy/Cargo.toml b/crates/alloy/Cargo.toml index 20bb53c3146..ef4a91b1b79 100644 --- a/crates/alloy/Cargo.toml +++ b/crates/alloy/Cargo.toml @@ -131,7 +131,14 @@ network = ["dep:alloy-network"] node-bindings = ["dep:alloy-node-bindings", "alloy-provider?/anvil-node"] # providers -providers = ["dep:alloy-provider", "rpc-client", "transports", "eips", "consensus", "network"] +providers = [ + "dep:alloy-provider", + "rpc-client", + "transports", + "eips", + "consensus", + "network", +] provider-http = ["providers", "transport-http"] provider-ws = ["providers", "alloy-provider?/ws", "transport-ws"] provider-ipc = ["providers", "alloy-provider?/ipc", "transport-ipc"] @@ -238,7 +245,7 @@ signer-yubihsm = ["signer-local", "alloy-signer-local?/yubihsm"] # transports transports = ["dep:alloy-transport"] -transport-http = ["transports", "dep:alloy-transport-http", "reqwest"] +transport-http = ["transports", "dep:alloy-transport-http"] transport-ipc = ["transports", "pubsub", "dep:alloy-transport-ipc"] transport-ipc-mock = ["alloy-transport-ipc?/mock"] transport-ws = ["transports", "pubsub", "dep:alloy-transport-ws"] diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index 8b734804657..5e61c80b172 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -35,7 +35,7 @@ use url::Url; /// Connection details for an HTTP transport. #[derive(Clone, Debug, PartialEq, Eq, Hash)] #[doc(hidden)] -pub struct HttpConnect { +pub struct HttpConnect { /// The URL to connect to. url: Url, transport: Option, From a0299062fb94296c75f5e9401731e3b663c7f7d1 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:48:37 +0530 Subject: [PATCH 20/30] rename HyperLayerTransport to HyperTransport --- crates/provider/src/builder.rs | 4 +- crates/provider/src/lib.rs | 2 +- crates/provider/src/provider/trait.rs | 6 +-- crates/rpc-client/src/builder.rs | 4 +- .../src/hyper_layer_transport.rs | 42 +++++-------------- crates/transport-http/src/lib.rs | 4 +- 6 files changed, 20 insertions(+), 42 deletions(-) diff --git a/crates/provider/src/builder.rs b/crates/provider/src/builder.rs index 0b1ef599525..9deb2ffed4c 100644 --- a/crates/provider/src/builder.rs +++ b/crates/provider/src/builder.rs @@ -351,13 +351,13 @@ impl ProviderBuilder { where L: ProviderLayer< crate::HyperProvider, - alloy_transport_http::Http, + alloy_transport_http::Http, N, >, F: TxFiller + ProviderLayer< L::Provider, - alloy_transport_http::Http, + alloy_transport_http::Http, N, >, N: Network, diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index 5be942aae37..64165172176 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -20,7 +20,7 @@ pub type ReqwestProvider = /// [`Http`]: alloy_transport_http::Http #[cfg(feature = "hyper")] pub type HyperProvider = - crate::RootProvider, N>; + crate::RootProvider, N>; #[macro_use] extern crate tracing; diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 067b2be4f5b..e562da5fadf 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1055,7 +1055,7 @@ mod tests { async fn test_hyper_layer_transport_no_layers() { init_tracing(); let anvil = Anvil::new().spawn(); - let layer_transport = alloy_transport_http::HyperLayerTransport::new(anvil.endpoint_url()); + let layer_transport = alloy_transport_http::HyperTransport::new(anvil.endpoint_url()); let http_hyper = alloy_transport_http::Http::with_client(layer_transport, anvil.endpoint_url()); @@ -1142,7 +1142,7 @@ mod tests { .service(hyper_client); let layer_transport = - alloy_transport_http::HyperLayerTransport::with_service(anvil.endpoint_url(), service); + alloy_transport_http::HyperTransport::with_service(anvil.endpoint_url(), service); let http_hyper = alloy_transport_http::Http::with_client(layer_transport, anvil.endpoint_url()); @@ -1178,7 +1178,7 @@ mod tests { )) .service(hyper_client); let layer_transport = - alloy_transport_http::HyperLayerTransport::with_service(anvil.endpoint_url(), service); + alloy_transport_http::HyperTransport::with_service(anvil.endpoint_url(), service); let http_hyper = alloy_transport_http::Http::with_client(layer_transport, anvil.endpoint_url()); diff --git a/crates/rpc-client/src/builder.rs b/crates/rpc-client/src/builder.rs index 99e214ac4f6..8d30ba57f17 100644 --- a/crates/rpc-client/src/builder.rs +++ b/crates/rpc-client/src/builder.rs @@ -64,10 +64,10 @@ impl ClientBuilder { #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] pub fn hyper_http(self, url: url::Url) -> RpcClient where - L: Layer>, + L: Layer>, L::Service: Transport, { - let hyper = alloy_transport_http::HyperLayerTransport::new(url.clone()); + let hyper = alloy_transport_http::HyperTransport::new(url.clone()); let transport = alloy_transport_http::Http::with_client(hyper, url); let is_local = transport.guess_local(); diff --git a/crates/transport-http/src/hyper_layer_transport.rs b/crates/transport-http/src/hyper_layer_transport.rs index 1f85a3d92d8..87594462407 100644 --- a/crates/transport-http/src/hyper_layer_transport.rs +++ b/crates/transport-http/src/hyper_layer_transport.rs @@ -23,7 +23,7 @@ pub type HyperClient = hyper_util::client::legacy::Client< /// A [hyper] client that can be used with tower layers. #[derive(Clone, Debug)] -pub struct HyperLayerTransport, S = HyperClient> { +pub struct HyperTransport, S = HyperClient> { url: Url, service: S, _pd: PhantomData, @@ -36,8 +36,8 @@ pub type HyperResponse = Response; pub type HyperResponseFut = Pin> + Send + 'static>>; -impl HyperLayerTransport { - /// Create a new [HyperLayerTransport] with the given URL and default hyper client. +impl HyperTransport { + /// Create a new [HyperTransport] with the given URL and default hyper client. pub fn new(url: Url) -> Self { let executor = hyper_util::rt::TokioExecutor::new(); @@ -48,14 +48,14 @@ impl HyperLayerTransport { } } -impl HyperLayerTransport { - /// Create a new [HyperLayerTransport] with the given URL and service. +impl HyperTransport { + /// Create a new [HyperTransport] with the given URL and service. pub const fn with_service(url: Url, service: S) -> Self { Self { url, service, _pd: PhantomData } } } -impl HyperLayerTransport +impl HyperTransport where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, @@ -65,7 +65,7 @@ where /// Make a request to the server using the given service. pub fn request(&mut self, req: RequestPacket) -> TransportFut<'static> { let this = self.clone(); - let span = debug_span!("HyperLayerTransport", url = %this.url); + let span = debug_span!("HyperTransport", url = %this.url); Box::pin( async move { debug!(count = req.len(), "sending request packet to server"); @@ -122,7 +122,7 @@ where } } -impl Http> +impl Http> where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, @@ -136,14 +136,14 @@ where } } -impl TransportConnect for HttpConnect> +impl TransportConnect for HttpConnect> where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, B: From> + Send + 'static + Clone + Sync, { - type Transport = Http>; + type Transport = Http>; fn is_local(&self) -> bool { guess_local_url(self.url.as_str()) @@ -164,7 +164,7 @@ where } } -impl Service for Http> +impl Service for Http> where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, @@ -183,23 +183,3 @@ where self.request_hyper(req) } } - -// impl Service for HyperLayerTransport -// where -// S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, -// S::Future: Send, -// S::Error: std::error::Error + Send + Sync + 'static, -// B: From> + Send + 'static + Clone + Sync, -// { -// type Response = ResponsePacket; -// type Error = TransportError; -// type Future = TransportFut<'static>; - -// fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> -// { task::Poll::Ready(Ok(())) -// } - -// fn call(&mut self, req: RequestPacket) -> Self::Future { -// self.request(req) -// } -// } diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index 5e61c80b172..2473db11f37 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -24,9 +24,7 @@ pub use hyper_util; mod hyper_layer_transport; #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] #[doc(inline)] -pub use hyper_layer_transport::{ - HyperClient, HyperLayerTransport, HyperResponse, HyperResponseFut, -}; +pub use hyper_layer_transport::{HyperClient, HyperResponse, HyperResponseFut, HyperTransport}; use alloy_transport::utils::guess_local_url; use core::str::FromStr; From d18e0bd7fb7f019953a10224921005477235861b Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:58:03 +0530 Subject: [PATCH 21/30] rename file --- .../src/{hyper_layer_transport.rs => hyper_transport.rs} | 0 crates/transport-http/src/lib.rs | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename crates/transport-http/src/{hyper_layer_transport.rs => hyper_transport.rs} (100%) diff --git a/crates/transport-http/src/hyper_layer_transport.rs b/crates/transport-http/src/hyper_transport.rs similarity index 100% rename from crates/transport-http/src/hyper_layer_transport.rs rename to crates/transport-http/src/hyper_transport.rs diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index 2473db11f37..a0d755285a6 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -21,10 +21,10 @@ pub use hyper; pub use hyper_util; #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] -mod hyper_layer_transport; +mod hyper_transport; #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] #[doc(inline)] -pub use hyper_layer_transport::{HyperClient, HyperResponse, HyperResponseFut, HyperTransport}; +pub use hyper_transport::{HyperClient, HyperResponse, HyperResponseFut, HyperTransport}; use alloy_transport::utils::guess_local_url; use core::str::FromStr; From c2f14327a57c4db34dae4ea2c6f839e94a1a1e90 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 17 Sep 2024 22:06:47 +0530 Subject: [PATCH 22/30] nit --- crates/provider/src/provider/trait.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index e562da5fadf..868cbc641bf 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1052,13 +1052,12 @@ mod tests { #[cfg(feature = "hyper")] #[tokio::test] - async fn test_hyper_layer_transport_no_layers() { + async fn test_default_hyper_transport() { init_tracing(); let anvil = Anvil::new().spawn(); - let layer_transport = alloy_transport_http::HyperTransport::new(anvil.endpoint_url()); + let hyper_t = alloy_transport_http::HyperTransport::new(anvil.endpoint_url()); - let http_hyper = - alloy_transport_http::Http::with_client(layer_transport, anvil.endpoint_url()); + let http_hyper = alloy_transport_http::Http::with_client(hyper_t, anvil.endpoint_url()); let rpc_client = alloy_rpc_client::RpcClient::new(http_hyper, true); From 9f880db980b0c4c773038fb21e14e23a0edf7858 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 18 Sep 2024 11:26:13 +0530 Subject: [PATCH 23/30] fix: rm transport from HttpConnect --- crates/provider/src/provider/trait.rs | 29 +------------------- crates/transport-http/src/hyper_transport.rs | 20 ++++---------- crates/transport-http/src/lib.rs | 15 +++------- 3 files changed, 10 insertions(+), 54 deletions(-) diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 868cbc641bf..fc8c72f485c 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1152,7 +1152,7 @@ mod tests { let num = provider.get_block_number().await.unwrap(); assert_eq!(0, num); - // Test Cloning + // Test Cloning with service let cloned_t = provider.client().transport().clone(); let rpc_client = alloy_rpc_client::RpcClient::new(cloned_t, true); @@ -1162,33 +1162,6 @@ mod tests { assert_eq!(0, num); } - #[cfg(feature = "hyper")] - #[tokio::test] - async fn test_layer_transport_with_tower_http() { - use http::header::{self, HeaderValue}; - use tower_http::set_header::SetRequestHeaderLayer; - init_tracing(); - let anvil = Anvil::new().spawn(); - let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); - let service = tower::ServiceBuilder::new() - .layer(SetRequestHeaderLayer::if_not_present( - header::USER_AGENT, - HeaderValue::from_static("alloy app"), - )) - .service(hyper_client); - let layer_transport = - alloy_transport_http::HyperTransport::with_service(anvil.endpoint_url(), service); - - let http_hyper = - alloy_transport_http::Http::with_client(layer_transport, anvil.endpoint_url()); - - let rpc_client = alloy_rpc_client::RpcClient::new(http_hyper, true); - - let provider = RootProvider::<_, Ethereum>::new(rpc_client); - let num = provider.get_block_number().await.unwrap(); - assert_eq!(0, num); - } - #[tokio::test] async fn test_builder_helper_fn_any_network() { init_tracing(); diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index 87594462407..e43bcc22533 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -136,14 +136,8 @@ where } } -impl TransportConnect for HttpConnect> -where - S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, - S::Future: Send, - S::Error: std::error::Error + Send + Sync + 'static, - B: From> + Send + 'static + Clone + Sync, -{ - type Transport = Http>; +impl TransportConnect for HttpConnect { + type Transport = Http; fn is_local(&self) -> bool { guess_local_url(self.url.as_str()) @@ -153,13 +147,9 @@ where &'a self, ) -> alloy_transport::Pbf<'b, Self::Transport, TransportError> { Box::pin(async move { - self.transport.as_ref().map_or_else( - || Err(TransportErrorKind::custom_str("transport not initialized")), - |t| { - let transport = t.clone(); - Ok(Http::with_client(transport, self.url.clone())) - }, - ) + let hyper_t = HyperTransport::new(self.url.clone()); + + Ok(Http::with_client(hyper_t, self.url.clone())) }) } } diff --git a/crates/transport-http/src/lib.rs b/crates/transport-http/src/lib.rs index a0d755285a6..d1881fefaae 100644 --- a/crates/transport-http/src/lib.rs +++ b/crates/transport-http/src/lib.rs @@ -28,6 +28,7 @@ pub use hyper_transport::{HyperClient, HyperResponse, HyperResponseFut, HyperTra use alloy_transport::utils::guess_local_url; use core::str::FromStr; +use std::marker::PhantomData; use url::Url; /// Connection details for an HTTP transport. @@ -36,28 +37,20 @@ use url::Url; pub struct HttpConnect { /// The URL to connect to. url: Url, - transport: Option, + + _pd: PhantomData, } impl HttpConnect { /// Create a new [`HttpConnect`] with the given URL. pub const fn new(url: Url) -> Self { - Self { url, transport: None } - } - - pub const fn with_transport(transport: T, url: Url) -> Self { - Self { url, transport: Some(transport) } + Self { url, _pd: PhantomData } } /// Get a reference to the URL. pub const fn url(&self) -> &Url { &self.url } - - /// Get a reference to the client. - pub const fn transport(&self) -> &Option { - &self.transport - } } impl FromStr for HttpConnect { From 939ac7e4e9d7bcc8031040ea562c7b1ef9e065fc Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 18 Sep 2024 11:43:00 +0530 Subject: [PATCH 24/30] fix: rm url from HyperTransport, infer it from Http --- crates/provider/src/provider/trait.rs | 5 ++--- crates/rpc-client/src/builder.rs | 2 +- crates/transport-http/src/hyper_transport.rs | 21 ++++++++++---------- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index fc8c72f485c..001b31118de 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1055,7 +1055,7 @@ mod tests { async fn test_default_hyper_transport() { init_tracing(); let anvil = Anvil::new().spawn(); - let hyper_t = alloy_transport_http::HyperTransport::new(anvil.endpoint_url()); + let hyper_t = alloy_transport_http::HyperTransport::new(); let http_hyper = alloy_transport_http::Http::with_client(hyper_t, anvil.endpoint_url()); @@ -1140,8 +1140,7 @@ mod tests { .layer(LoggingLayer) .service(hyper_client); - let layer_transport = - alloy_transport_http::HyperTransport::with_service(anvil.endpoint_url(), service); + let layer_transport = alloy_transport_http::HyperTransport::with_service(service); let http_hyper = alloy_transport_http::Http::with_client(layer_transport, anvil.endpoint_url()); diff --git a/crates/rpc-client/src/builder.rs b/crates/rpc-client/src/builder.rs index 8d30ba57f17..3a3748b941e 100644 --- a/crates/rpc-client/src/builder.rs +++ b/crates/rpc-client/src/builder.rs @@ -67,7 +67,7 @@ impl ClientBuilder { L: Layer>, L::Service: Transport, { - let hyper = alloy_transport_http::HyperTransport::new(url.clone()); + let hyper = alloy_transport_http::HyperTransport::new(); let transport = alloy_transport_http::Http::with_client(hyper, url); let is_local = transport.guess_local(); diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index e43bcc22533..956aea8984c 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -24,7 +24,6 @@ pub type HyperClient = hyper_util::client::legacy::Client< /// A [hyper] client that can be used with tower layers. #[derive(Clone, Debug)] pub struct HyperTransport, S = HyperClient> { - url: Url, service: S, _pd: PhantomData, } @@ -38,20 +37,20 @@ pub type HyperResponseFut = impl HyperTransport { /// Create a new [HyperTransport] with the given URL and default hyper client. - pub fn new(url: Url) -> Self { + pub fn new() -> Self { let executor = hyper_util::rt::TokioExecutor::new(); let service = hyper_util::client::legacy::Client::builder(executor).build_http::>(); - Self { url, service, _pd: PhantomData } + Self { service, _pd: PhantomData } } } impl HyperTransport { /// Create a new [HyperTransport] with the given URL and service. - pub const fn with_service(url: Url, service: S) -> Self { - Self { url, service, _pd: PhantomData } + pub const fn with_service(service: S) -> Self { + Self { service, _pd: PhantomData } } } @@ -63,9 +62,9 @@ where B: From> + Send + 'static + Clone, { /// Make a request to the server using the given service. - pub fn request(&mut self, req: RequestPacket) -> TransportFut<'static> { + fn request(&mut self, req: RequestPacket, url: Url) -> TransportFut<'static> { let this = self.clone(); - let span = debug_span!("HyperTransport", url = %this.url); + let span = debug_span!("HyperTransport", %url); Box::pin( async move { debug!(count = req.len(), "sending request packet to server"); @@ -75,7 +74,7 @@ where let req = hyper::Request::builder() .method(hyper::Method::POST) - .uri(this.url.as_str()) + .uri(url.as_str()) .header( header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"), @@ -131,8 +130,8 @@ where { /// Make a request to the server using the underlying service that may or may not contain /// layers. - pub fn request_hyper(&mut self, req: RequestPacket) -> TransportFut<'static> { - self.client.request(req) + fn request_hyper(&mut self, req: RequestPacket) -> TransportFut<'static> { + self.client.request(req, self.url.clone()) } } @@ -147,7 +146,7 @@ impl TransportConnect for HttpConnect { &'a self, ) -> alloy_transport::Pbf<'b, Self::Transport, TransportError> { Box::pin(async move { - let hyper_t = HyperTransport::new(self.url.clone()); + let hyper_t = HyperTransport::new(); Ok(Http::with_client(hyper_t, self.url.clone())) }) From 563ecbb08966bd19ced73b04b3b4ee1896688f7d Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 18 Sep 2024 11:46:29 +0530 Subject: [PATCH 25/30] clippy --- crates/transport-http/src/hyper_transport.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index 956aea8984c..f5a2657aa59 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -22,7 +22,7 @@ pub type HyperClient = hyper_util::client::legacy::Client< >; /// A [hyper] client that can be used with tower layers. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct HyperTransport, S = HyperClient> { service: S, _pd: PhantomData, From cdfcd105f00e1bf69c5c07b3ebf6c5d06ed149ac Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 18 Sep 2024 11:47:33 +0530 Subject: [PATCH 26/30] fix --- crates/transport-http/src/hyper_transport.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index f5a2657aa59..f5fb71076e2 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -22,7 +22,7 @@ pub type HyperClient = hyper_util::client::legacy::Client< >; /// A [hyper] client that can be used with tower layers. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct HyperTransport, S = HyperClient> { service: S, _pd: PhantomData, @@ -47,6 +47,12 @@ impl HyperTransport { } } +impl Default for HyperTransport { + fn default() -> Self { + Self::new() + } +} + impl HyperTransport { /// Create a new [HyperTransport] with the given URL and service. pub const fn with_service(service: S) -> Self { From edc580eec25c50cf1c23558aacb9442ddae25a67 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 18 Sep 2024 13:55:54 +0530 Subject: [PATCH 27/30] impl Http --- crates/transport-http/src/hyper_transport.rs | 25 ++++---------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index f5fb71076e2..e39633cb552 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -11,7 +11,6 @@ use hyper_util::client::legacy::Error; use std::{future::Future, marker::PhantomData, pin::Pin, task}; use tower::Service; use tracing::{debug, debug_span, trace, Instrument}; -use url::Url; use crate::{Http, HttpConnect}; @@ -60,7 +59,7 @@ impl HyperTransport { } } -impl HyperTransport +impl Http> where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, @@ -68,9 +67,9 @@ where B: From> + Send + 'static + Clone, { /// Make a request to the server using the given service. - fn request(&mut self, req: RequestPacket, url: Url) -> TransportFut<'static> { + fn request_hyper(&mut self, req: RequestPacket) -> TransportFut<'static> { let this = self.clone(); - let span = debug_span!("HyperTransport", %url); + let span = debug_span!("HyperTransport", url = %this.url); Box::pin( async move { debug!(count = req.len(), "sending request packet to server"); @@ -80,7 +79,7 @@ where let req = hyper::Request::builder() .method(hyper::Method::POST) - .uri(url.as_str()) + .uri(this.url.as_str()) .header( header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"), @@ -88,7 +87,7 @@ where .body(body) .expect("request parts are invalid"); - let mut service = this.service.clone(); + let mut service = this.client.service.clone(); let resp = service.call(req).await.map_err(TransportErrorKind::custom)?; let status = resp.status(); @@ -127,20 +126,6 @@ where } } -impl Http> -where - S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, - S::Future: Send, - S::Error: std::error::Error + Send + Sync + 'static, - B: From> + Send + 'static + Clone, -{ - /// Make a request to the server using the underlying service that may or may not contain - /// layers. - fn request_hyper(&mut self, req: RequestPacket) -> TransportFut<'static> { - self.client.request(req, self.url.clone()) - } -} - impl TransportConnect for HttpConnect { type Transport = Http; From 8a38603ab39802e7ca274dfe54e597d71285942b Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 18 Sep 2024 14:04:02 +0530 Subject: [PATCH 28/30] fix --- crates/transport-http/src/hyper_transport.rs | 22 +++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index e39633cb552..4d95a4815d3 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -67,7 +67,7 @@ where B: From> + Send + 'static + Clone, { /// Make a request to the server using the given service. - fn request_hyper(&mut self, req: RequestPacket) -> TransportFut<'static> { + fn request_hyper(&self, req: RequestPacket) -> TransportFut<'static> { let this = self.clone(); let span = debug_span!("HyperTransport", url = %this.url); Box::pin( @@ -163,3 +163,23 @@ where self.request_hyper(req) } } + +impl Service for &Http> +where + S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + B: From> + Send + 'static + Clone + Sync, +{ + type Response = ResponsePacket; + type Error = TransportError; + type Future = TransportFut<'static>; + + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: RequestPacket) -> Self::Future { + self.request_hyper(req) + } +} From 9af4caee5d19417ec6c64fbe2d4bec68201fa260 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 18 Sep 2024 14:25:04 +0530 Subject: [PATCH 29/30] rename --- crates/provider/src/builder.rs | 13 +----- crates/provider/src/lib.rs | 2 +- crates/provider/src/provider/trait.rs | 8 ++-- crates/rpc-client/src/builder.rs | 5 +-- crates/transport-http/src/hyper_transport.rs | 42 ++++++++++++-------- 5 files changed, 34 insertions(+), 36 deletions(-) diff --git a/crates/provider/src/builder.rs b/crates/provider/src/builder.rs index 3660f240e1d..fedf12dd269 100644 --- a/crates/provider/src/builder.rs +++ b/crates/provider/src/builder.rs @@ -354,17 +354,8 @@ impl ProviderBuilder { #[cfg(feature = "hyper")] pub fn on_hyper_http(self, url: url::Url) -> F::Provider where - L: ProviderLayer< - crate::HyperProvider, - alloy_transport_http::Http, - N, - >, - F: TxFiller - + ProviderLayer< - L::Provider, - alloy_transport_http::Http, - N, - >, + L: ProviderLayer, alloy_transport_http::HyperTransport, N>, + F: TxFiller + ProviderLayer, N: Network, { let client = ClientBuilder::default().hyper_http(url); diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index 64165172176..a59bdc8c0fb 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -20,7 +20,7 @@ pub type ReqwestProvider = /// [`Http`]: alloy_transport_http::Http #[cfg(feature = "hyper")] pub type HyperProvider = - crate::RootProvider, N>; + crate::RootProvider; #[macro_use] extern crate tracing; diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 001b31118de..c522c8a3799 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -1055,11 +1055,9 @@ mod tests { async fn test_default_hyper_transport() { init_tracing(); let anvil = Anvil::new().spawn(); - let hyper_t = alloy_transport_http::HyperTransport::new(); + let hyper_t = alloy_transport_http::HyperTransport::new_hyper(anvil.endpoint_url()); - let http_hyper = alloy_transport_http::Http::with_client(hyper_t, anvil.endpoint_url()); - - let rpc_client = alloy_rpc_client::RpcClient::new(http_hyper, true); + let rpc_client = alloy_rpc_client::RpcClient::new(hyper_t, true); let provider = RootProvider::<_, Ethereum>::new(rpc_client); let num = provider.get_block_number().await.unwrap(); @@ -1140,7 +1138,7 @@ mod tests { .layer(LoggingLayer) .service(hyper_client); - let layer_transport = alloy_transport_http::HyperTransport::with_service(service); + let layer_transport = alloy_transport_http::HyperClient::with_service(service); let http_hyper = alloy_transport_http::Http::with_client(layer_transport, anvil.endpoint_url()); diff --git a/crates/rpc-client/src/builder.rs b/crates/rpc-client/src/builder.rs index 3a3748b941e..759affd3608 100644 --- a/crates/rpc-client/src/builder.rs +++ b/crates/rpc-client/src/builder.rs @@ -64,11 +64,10 @@ impl ClientBuilder { #[cfg(all(not(target_arch = "wasm32"), feature = "hyper"))] pub fn hyper_http(self, url: url::Url) -> RpcClient where - L: Layer>, + L: Layer, L::Service: Transport, { - let hyper = alloy_transport_http::HyperTransport::new(); - let transport = alloy_transport_http::Http::with_client(hyper, url); + let transport = alloy_transport_http::HyperTransport::new_hyper(url); let is_local = transport.guess_local(); self.transport(transport, is_local) diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index 4d95a4815d3..18e1db87793 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -14,15 +14,25 @@ use tracing::{debug, debug_span, trace, Instrument}; use crate::{Http, HttpConnect}; -/// A [`hyper`] HTTP client. -pub type HyperClient = hyper_util::client::legacy::Client< +type Hyper = hyper_util::client::legacy::Client< hyper_util::client::legacy::connect::HttpConnector, http_body_util::Full<::hyper::body::Bytes>, >; -/// A [hyper] client that can be used with tower layers. +/// A [`hyper`] based transport client. +pub type HyperTransport = Http; + +impl HyperTransport { + /// Create a new [`HyperTransport`] with the given URL and default hyper client. + pub fn new_hyper(url: url::Url) -> Self { + let client = HyperClient::new(); + Self::with_client(client, url) + } +} + +/// A [hyper] based client that can be used with tower layers. #[derive(Clone, Debug)] -pub struct HyperTransport, S = HyperClient> { +pub struct HyperClient, S = Hyper> { service: S, _pd: PhantomData, } @@ -34,8 +44,8 @@ pub type HyperResponse = Response; pub type HyperResponseFut = Pin> + Send + 'static>>; -impl HyperTransport { - /// Create a new [HyperTransport] with the given URL and default hyper client. +impl HyperClient { + /// Create a new [HyperClient] with the given URL and default hyper client. pub fn new() -> Self { let executor = hyper_util::rt::TokioExecutor::new(); @@ -46,20 +56,20 @@ impl HyperTransport { } } -impl Default for HyperTransport { +impl Default for HyperClient { fn default() -> Self { Self::new() } } -impl HyperTransport { - /// Create a new [HyperTransport] with the given URL and service. +impl HyperClient { + /// Create a new [HyperClient] with the given URL and service. pub const fn with_service(service: S) -> Self { Self { service, _pd: PhantomData } } } -impl Http> +impl Http> where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, @@ -69,7 +79,7 @@ where /// Make a request to the server using the given service. fn request_hyper(&self, req: RequestPacket) -> TransportFut<'static> { let this = self.clone(); - let span = debug_span!("HyperTransport", url = %this.url); + let span = debug_span!("HyperClient", url = %this.url); Box::pin( async move { debug!(count = req.len(), "sending request packet to server"); @@ -126,8 +136,8 @@ where } } -impl TransportConnect for HttpConnect { - type Transport = Http; +impl TransportConnect for HttpConnect { + type Transport = Http; fn is_local(&self) -> bool { guess_local_url(self.url.as_str()) @@ -137,14 +147,14 @@ impl TransportConnect for HttpConnect { &'a self, ) -> alloy_transport::Pbf<'b, Self::Transport, TransportError> { Box::pin(async move { - let hyper_t = HyperTransport::new(); + let hyper_t = HyperClient::new(); Ok(Http::with_client(hyper_t, self.url.clone())) }) } } -impl Service for Http> +impl Service for Http> where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, @@ -164,7 +174,7 @@ where } } -impl Service for &Http> +impl Service for &Http> where S: Service, Response = HyperResponse> + Clone + Send + Sync + 'static, S::Future: Send, From e16447f0ac6bdfc7d9ceec3d36f1d97ef76388bb Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Wed, 18 Sep 2024 14:36:26 +0530 Subject: [PATCH 30/30] nit --- crates/transport-http/src/hyper_transport.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/transport-http/src/hyper_transport.rs b/crates/transport-http/src/hyper_transport.rs index 18e1db87793..389d0289c03 100644 --- a/crates/transport-http/src/hyper_transport.rs +++ b/crates/transport-http/src/hyper_transport.rs @@ -136,8 +136,8 @@ where } } -impl TransportConnect for HttpConnect { - type Transport = Http; +impl TransportConnect for HttpConnect { + type Transport = HyperTransport; fn is_local(&self) -> bool { guess_local_url(self.url.as_str())