Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add retries for network-related failures #22

Merged
merged 7 commits into from
Aug 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 62 additions & 22 deletions src/routing/ic/transport.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#![allow(clippy::declare_interior_mutable_const)]

use std::{cell::RefCell, pin::Pin, sync::Arc, time::Duration};

use discower_bowndary::transport::{TransportProvider, TransportProviderError};
use futures::Future;
use futures_util::StreamExt;
Expand All @@ -19,6 +17,7 @@ use reqwest::{
header::{HeaderMap, HeaderValue, CONTENT_TYPE},
Body, Method, Request, StatusCode,
};
use std::{cell::RefCell, error::Error, pin::Pin, sync::Arc, time::Duration};
use tokio::task_local;
use url::Url;

Expand All @@ -28,6 +27,11 @@ type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + S

const MAX_RESPONSE_SIZE: usize = 2 * 1_048_576;

// Maximum number of retries for both http- and network-related errors.
const MAX_TOTAL_RETRIES: usize = 5;
nikolay-komarevskiy marked this conversation as resolved.
Show resolved Hide resolved
const MAX_HTTP_RETRIES: usize = 5;
const MAX_NETWORK_RETRIES: usize = 5;

pub struct PassHeaders {
pub headers_in: HeaderMap<HeaderValue>,
pub headers_out: HeaderMap<HeaderValue>,
Expand Down Expand Up @@ -118,37 +122,73 @@ impl ReqwestTransport {
endpoint: &str,
body: Option<Vec<u8>>,
) -> Result<(StatusCode, Vec<u8>), AgentError> {
let url = self.route_provider.route()?.join(endpoint)?;
let mut http_request = Request::new(method, url);
http_request
.headers_mut()
.insert(CONTENT_TYPE, CONTENT_TYPE_CBOR);

// Add HTTP headers if requested
let _ = PASS_HEADERS.try_with(|x| {
let mut pass = x.borrow_mut();
for (k, v) in &pass.headers_out {
http_request.headers_mut().append(k, v.clone());
}
pass.headers_out.clear();
});
let create_request_with_generated_url = || -> Result<Request, AgentError> {
let url = self.route_provider.route()?.join(endpoint)?;
let mut http_request = Request::new(method.clone(), url);
http_request
.headers_mut()
.insert(CONTENT_TYPE, CONTENT_TYPE_CBOR);

// Add HTTP headers if requested
let _ = PASS_HEADERS.try_with(|x| {
let mut pass = x.borrow_mut();
for (k, v) in &pass.headers_out {
http_request.headers_mut().append(k, v.clone());
}
pass.headers_out.clear();
});

*http_request.body_mut() = body.map(Body::from);
*http_request.body_mut() = body.as_ref().cloned().map(Body::from);

Ok(http_request)
};

let mut delay = Duration::from_millis(100);
let mut retries = 5;
let mut total_retries = MAX_TOTAL_RETRIES;
nikolay-komarevskiy marked this conversation as resolved.
Show resolved Hide resolved
let mut network_retries = MAX_NETWORK_RETRIES;
let mut http_retries = MAX_HTTP_RETRIES;

let request_result = loop {
let result = self.request(http_request.try_clone().unwrap()).await?;
let result = {
// RouteProvider generates urls dynamically. Some urls can be unhealthy.
// TCP related errors (host unreachable, connection refused, connection timed out, connection reset) can be safely retried with a newly generated url.
loop {
let http_request = create_request_with_generated_url()?;

match self.request(http_request).await {
Ok(response) => break response,
Err(agent_error) => {
if (&agent_error as &dyn Error)
.downcast_ref::<hyper_util::client::legacy::Error>()
.is_some_and(|e| e.is_connect())
{
if network_retries <= 0 || total_retries <= 0 {
let msg = format!("max request retries reached: http_retries={http_retries}, network_retries={network_retries}");
return Err(AgentError::TransportError(msg.into()));
}
network_retries -= 1;
total_retries -= 1;
continue;
}
// All other errors are not retried.
return Err(agent_error);
}
}
}
};

if result.0 != StatusCode::TOO_MANY_REQUESTS {
break result;
}

retries -= 1;
if retries == 0 {
return Err(AgentError::TransportError("retries exhausted".into()));
if http_retries <= 0 || total_retries <= 0 {
let msg = format!("max request retries reached: http_retries={http_retries}, network_retries={network_retries}");
return Err(AgentError::TransportError(msg.into()));
}

http_retries -= 1;
total_retries -= 1;

tokio::time::sleep(delay).await;
delay *= 2;
};
Expand Down