diff --git a/Cargo.toml b/Cargo.toml index 2f7faa8..dbe7ffb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ features = ["full"] rustdoc-args = ["--cfg", "docsrs"] [dependencies] -hyper = "1.3.0" +hyper = "1.4.0" futures-util = { version = "0.3.16", default-features = false } http = "1.0" http-body = "1.0.0" @@ -32,7 +32,7 @@ tower-service ={ version = "0.3", optional = true } tower = { version = "0.4.1", optional = true, default-features = false, features = ["make", "util"] } [dev-dependencies] -hyper = { version = "1.3.0", features = ["full"] } +hyper = { version = "1.4.0", features = ["full"] } bytes = "1" http-body-util = "0.1.0" tokio = { version = "1", features = ["macros", "test-util", "signal"] } diff --git a/src/client/legacy/client.rs b/src/client/legacy/client.rs index 53cd714..ee3bb4f 100644 --- a/src/client/legacy/client.rs +++ b/src/client/legacy/client.rs @@ -13,6 +13,7 @@ use std::time::Duration; use futures_util::future::{self, Either, FutureExt, TryFutureExt}; use http::uri::Scheme; +use hyper::client::conn::TrySendError as ConnTrySendError; use hyper::header::{HeaderValue, HOST}; use hyper::rt::Timer; use hyper::{body::Body, Method, Request, Response, Uri, Version}; @@ -86,6 +87,11 @@ macro_rules! e { // We might change this... :shrug: type PoolKey = (http::uri::Scheme, http::uri::Authority); +enum TrySendError { + Retryable { error: Error, req: Request }, + Nope(Error), +} + /// A `Future` that will resolve to an HTTP Response. /// /// This is returned by `Client::request` (and `Client::get`). @@ -223,8 +229,7 @@ where ResponseFuture::new(self.clone().send_request(req, pool_key)) } - /* - async fn retryably_send_request( + async fn send_request( self, mut req: Request, pool_key: PoolKey, @@ -232,23 +237,19 @@ where let uri = req.uri().clone(); loop { - req = match self.send_request(req, pool_key.clone()).await { + req = match self.try_send_request(req, pool_key.clone()).await { Ok(resp) => return Ok(resp), - Err(ClientError::Normal(err)) => return Err(err), - Err(ClientError::Canceled { - connection_reused, - mut req, - reason, - }) => { - if !self.config.retry_canceled_requests || !connection_reused { + Err(TrySendError::Nope(err)) => return Err(err), + Err(TrySendError::Retryable { mut req, error }) => { + if !self.config.retry_canceled_requests { // if client disabled, don't retry // a fresh connection means we definitely can't retry - return Err(reason); + return Err(error); } trace!( "unstarted request canceled, trying again (reason={:?})", - reason + error ); *req.uri_mut() = uri.clone(); req @@ -256,14 +257,18 @@ where } } } - */ - async fn send_request( - self, + async fn try_send_request( + &self, mut req: Request, pool_key: PoolKey, - ) -> Result, Error> { - let mut pooled = self.connection_for(pool_key).await?; + ) -> Result, TrySendError> { + let mut pooled = self + .connection_for(pool_key) + .await + // `connection_for` already retries checkout errors, so if + // it returns an error, there's not much else to retry + .map_err(TrySendError::Nope)?; req.extensions_mut() .get_mut::() @@ -272,7 +277,7 @@ where if pooled.is_http1() { if req.version() == Version::HTTP_2 { warn!("Connection is HTTP/1, but request requires HTTP/2"); - return Err(e!(UserUnsupportedVersion)); + return Err(TrySendError::Nope(e!(UserUnsupportedVersion))); } if self.config.set_host { @@ -301,18 +306,26 @@ where authority_form(req.uri_mut()); } - let fut = pooled.send_request(req); + let mut res = match pooled.try_send_request(req).await { + Ok(res) => res, + Err(mut err) => { + return if let Some(req) = err.take_message() { + Err(TrySendError::Retryable { + error: e!(Canceled, err.into_error()), + req, + }) + } else { + Err(TrySendError::Nope(e!(SendRequest, err.into_error()))) + } + } + }; //.send_request_retryable(req) //.map_err(ClientError::map_with_reused(pooled.is_reused())); // If the Connector included 'extra' info, add to Response... - let extra_info = pooled.conn_info.extra.clone(); - let fut = fut.map_ok(move |mut res| { - if let Some(extra) = extra_info { - extra.set(res.extensions_mut()); - } - res - }); + if let Some(extra) = &pooled.conn_info.extra { + extra.set(res.extensions_mut()); + } // As of futures@0.1.21, there is a race condition in the mpsc // channel, such that sending when the receiver is closing can @@ -322,11 +335,9 @@ where // To counteract this, we must check if our senders 'want' channel // has been closed after having tried to send. If so, error out... if pooled.is_closed() { - return fut.await; + return Ok(res); } - let res = fut.await?; - // If pooled is HTTP/2, we can toss this reference immediately. // // when pooled is dropped, it will try to insert back into the @@ -749,37 +760,34 @@ impl PoolClient { } impl PoolClient { - fn send_request( + fn try_send_request( &mut self, req: Request, - ) -> impl Future, Error>> + ) -> impl Future, ConnTrySendError>>> where B: Send, { #[cfg(all(feature = "http1", feature = "http2"))] return match self.tx { #[cfg(feature = "http1")] - PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request(req)), + PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)), #[cfg(feature = "http2")] - PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request(req)), - } - .map_err(Error::tx); + PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)), + }; #[cfg(feature = "http1")] #[cfg(not(feature = "http2"))] return match self.tx { #[cfg(feature = "http1")] - PoolTx::Http1(ref mut tx) => tx.send_request(req), - } - .map_err(Error::tx); + PoolTx::Http1(ref mut tx) => tx.try_send_request(req), + }; #[cfg(not(feature = "http1"))] #[cfg(feature = "http2")] return match self.tx { #[cfg(feature = "http2")] - PoolTx::Http2(ref mut tx) => tx.send_request(req), - } - .map_err(Error::tx); + PoolTx::Http2(ref mut tx) => tx.try_send_request(req), + }; } /* //TODO: can we re-introduce this somehow? Or must people use tower::retry?