Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve retrying of dying pooled connections #133

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ features = ["full"]
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
hyper = "1.3.0"
hyper = "1.4.0"
futures-util = { version = "0.3.16", default-features = false }
http = "1.0"
http-body = "1.0.0"
Expand All @@ -32,7 +32,7 @@ tower-service ={ version = "0.3", optional = true }
tower = { version = "0.4.1", optional = true, default-features = false, features = ["make", "util"] }

[dev-dependencies]
hyper = { version = "1.3.0", features = ["full"] }
hyper = { version = "1.4.0", features = ["full"] }
bytes = "1"
http-body-util = "0.1.0"
tokio = { version = "1", features = ["macros", "test-util", "signal"] }
Expand Down
90 changes: 49 additions & 41 deletions src/client/legacy/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::time::Duration;

use futures_util::future::{self, Either, FutureExt, TryFutureExt};
use http::uri::Scheme;
use hyper::client::conn::TrySendError as ConnTrySendError;
use hyper::header::{HeaderValue, HOST};
use hyper::rt::Timer;
use hyper::{body::Body, Method, Request, Response, Uri, Version};
Expand Down Expand Up @@ -86,6 +87,11 @@ macro_rules! e {
// We might change this... :shrug:
type PoolKey = (http::uri::Scheme, http::uri::Authority);

enum TrySendError<B> {
Retryable { error: Error, req: Request<B> },
Nope(Error),
}

/// A `Future` that will resolve to an HTTP Response.
///
/// This is returned by `Client::request` (and `Client::get`).
Expand Down Expand Up @@ -223,47 +229,46 @@ where
ResponseFuture::new(self.clone().send_request(req, pool_key))
}

/*
async fn retryably_send_request(
async fn send_request(
self,
mut req: Request<B>,
pool_key: PoolKey,
) -> Result<Response<hyper::body::Incoming>, Error> {
let uri = req.uri().clone();

loop {
req = match self.send_request(req, pool_key.clone()).await {
req = match self.try_send_request(req, pool_key.clone()).await {
Ok(resp) => return Ok(resp),
Err(ClientError::Normal(err)) => return Err(err),
Err(ClientError::Canceled {
connection_reused,
mut req,
reason,
}) => {
if !self.config.retry_canceled_requests || !connection_reused {
Err(TrySendError::Nope(err)) => return Err(err),
Err(TrySendError::Retryable { mut req, error }) => {
if !self.config.retry_canceled_requests {
// if client disabled, don't retry
// a fresh connection means we definitely can't retry
return Err(reason);
return Err(error);
}

trace!(
"unstarted request canceled, trying again (reason={:?})",
reason
error
);
*req.uri_mut() = uri.clone();
req
}
}
}
}
*/

async fn send_request(
self,
async fn try_send_request(
&self,
mut req: Request<B>,
pool_key: PoolKey,
) -> Result<Response<hyper::body::Incoming>, Error> {
let mut pooled = self.connection_for(pool_key).await?;
) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
let mut pooled = self
.connection_for(pool_key)
.await
// `connection_for` already retries checkout errors, so if
// it returns an error, there's not much else to retry
.map_err(TrySendError::Nope)?;

req.extensions_mut()
.get_mut::<CaptureConnectionExtension>()
Expand All @@ -272,7 +277,7 @@ where
if pooled.is_http1() {
if req.version() == Version::HTTP_2 {
warn!("Connection is HTTP/1, but request requires HTTP/2");
return Err(e!(UserUnsupportedVersion));
return Err(TrySendError::Nope(e!(UserUnsupportedVersion)));
}

if self.config.set_host {
Expand Down Expand Up @@ -301,18 +306,26 @@ where
authority_form(req.uri_mut());
}

let fut = pooled.send_request(req);
let mut res = match pooled.try_send_request(req).await {
Ok(res) => res,
Err(mut err) => {
return if let Some(req) = err.take_message() {
Err(TrySendError::Retryable {
error: e!(Canceled, err.into_error()),
req,
})
} else {
Err(TrySendError::Nope(e!(SendRequest, err.into_error())))
}
}
};
//.send_request_retryable(req)
//.map_err(ClientError::map_with_reused(pooled.is_reused()));

// If the Connector included 'extra' info, add to Response...
let extra_info = pooled.conn_info.extra.clone();
let fut = fut.map_ok(move |mut res| {
if let Some(extra) = extra_info {
extra.set(res.extensions_mut());
}
res
});
if let Some(extra) = &pooled.conn_info.extra {
extra.set(res.extensions_mut());
}

// As of futures@0.1.21, there is a race condition in the mpsc
// channel, such that sending when the receiver is closing can
Expand All @@ -322,11 +335,9 @@ where
// To counteract this, we must check if our senders 'want' channel
// has been closed after having tried to send. If so, error out...
if pooled.is_closed() {
return fut.await;
return Ok(res);
}

let res = fut.await?;

// If pooled is HTTP/2, we can toss this reference immediately.
//
// when pooled is dropped, it will try to insert back into the
Expand Down Expand Up @@ -749,37 +760,34 @@ impl<B> PoolClient<B> {
}

impl<B: Body + 'static> PoolClient<B> {
fn send_request(
fn try_send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = Result<Response<hyper::body::Incoming>, Error>>
) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
where
B: Send,
{
#[cfg(all(feature = "http1", feature = "http2"))]
return match self.tx {
#[cfg(feature = "http1")]
PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request(req)),
PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
#[cfg(feature = "http2")]
PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request(req)),
}
.map_err(Error::tx);
PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
};

#[cfg(feature = "http1")]
#[cfg(not(feature = "http2"))]
return match self.tx {
#[cfg(feature = "http1")]
PoolTx::Http1(ref mut tx) => tx.send_request(req),
}
.map_err(Error::tx);
PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
};

#[cfg(not(feature = "http1"))]
#[cfg(feature = "http2")]
return match self.tx {
#[cfg(feature = "http2")]
PoolTx::Http2(ref mut tx) => tx.send_request(req),
}
.map_err(Error::tx);
PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
};
}
/*
//TODO: can we re-introduce this somehow? Or must people use tower::retry?
Expand Down