Skip to content

Commit

Permalink
Merge pull request #22 from dfinity/komarevskiy/ic-gateway-add-retries
Browse files Browse the repository at this point in the history
chore: add retries for network-related failures
  • Loading branch information
nikolay-komarevskiy committed Aug 16, 2024
2 parents 8219493 + df52ae9 commit 32e00ef
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 27 deletions.
8 changes: 6 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,17 @@ pub struct Ic {
#[clap(env, long)]
pub ic_root_key: Option<PathBuf>,

/// Maximum mumber of request retries in the transport layer for both network- and http-related failures.
#[clap(env, long, default_value = "5")]
pub ic_max_request_retries: u32,

/// Disable response verification for the IC requests.
#[clap(env, long, default_value = "false")]
pub unsafe_disable_response_verification: bool,
pub ic_unsafe_disable_response_verification: bool,

/// Disable replica-signed queries in the agent.
#[clap(env, long, default_value = "false")]
pub unsafe_disable_replica_signed_queries: bool,
pub ic_unsafe_disable_replica_signed_queries: bool,
}

#[derive(Args)]
Expand Down
4 changes: 2 additions & 2 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
ENV.set(cli.misc.env.clone()).unwrap();
HOSTNAME.set(cli.misc.hostname.clone()).unwrap();

if cli.ic.unsafe_disable_replica_signed_queries {
if cli.ic.ic_unsafe_disable_replica_signed_queries {
warn!("Replica-signed queries are disabled");
}

if cli.ic.unsafe_disable_response_verification {
if cli.ic.ic_unsafe_disable_response_verification {
warn!("Response verification is disabled");
}

Expand Down
9 changes: 6 additions & 3 deletions src/routing/ic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,14 @@ pub fn setup(
http_client: Arc<dyn HttpClient>,
route_provider: Arc<dyn RouteProvider>,
) -> Result<HttpGatewayClient, Error> {
let transport =
transport::ReqwestTransport::create_with_client_route(route_provider, http_client);
let transport = transport::ReqwestTransport::create_with_client_route(
route_provider,
http_client,
cli.ic.ic_max_request_retries,
);
let agent = ic_agent::Agent::builder()
.with_transport(transport)
.with_verify_query_signatures(!cli.ic.unsafe_disable_replica_signed_queries)
.with_verify_query_signatures(!cli.ic.ic_unsafe_disable_replica_signed_queries)
.build()?;

if let Some(v) = &cli.ic.ic_root_key {
Expand Down
81 changes: 62 additions & 19 deletions src/routing/ic/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,22 @@ pub struct ReqwestTransport {
client: Arc<dyn HttpClient>,
max_response_body_size: Option<usize>,
use_call_v3_endpoint: bool,
max_request_retries: u32,
}

impl ReqwestTransport {
/// Creates a transport for the agent from a [`RouteProvider`] and an [`HttpClient`].
pub fn create_with_client_route(
route_provider: Arc<dyn RouteProvider>,
client: Arc<dyn HttpClient>,
max_request_retries: u32,
) -> Self {
Self {
route_provider,
client,
max_response_body_size: Some(MAX_RESPONSE_SIZE),
use_call_v3_endpoint: false,
max_request_retries,
}
}

Expand Down Expand Up @@ -114,37 +117,77 @@ impl ReqwestTransport {
endpoint: &str,
body: Option<Vec<u8>>,
) -> Result<(StatusCode, Vec<u8>), AgentError> {
let url = self.route_provider.route()?.join(endpoint)?;
let mut http_request = Request::new(method, url);
http_request
.headers_mut()
.insert(CONTENT_TYPE, CONTENT_TYPE_CBOR);

// Add HTTP headers if requested
let _ = PASS_HEADERS.try_with(|x| {
let mut pass = x.borrow_mut();
for (k, v) in &pass.headers_out {
http_request.headers_mut().append(k, v.clone());
}
pass.headers_out.clear();
});
let create_request_with_generated_url = || -> Result<Request, AgentError> {
let url = self.route_provider.route()?.join(endpoint)?;
let mut http_request = Request::new(method.clone(), url);
http_request
.headers_mut()
.insert(CONTENT_TYPE, CONTENT_TYPE_CBOR);

// Add HTTP headers if requested
let _ = PASS_HEADERS.try_with(|x| {
let mut pass = x.borrow_mut();
for (k, v) in &pass.headers_out {
http_request.headers_mut().append(k, v.clone());
}
pass.headers_out.clear();
});

*http_request.body_mut() = body.as_ref().cloned().map(Body::from);

*http_request.body_mut() = body.map(Body::from);
Ok(http_request)
};

let mut delay = Duration::from_millis(100);
let mut retries = 5;
let mut retries = self.max_request_retries;

let request_result = loop {
let result = self.request(http_request.try_clone().unwrap()).await?;
let result = {
// RouteProvider generates urls dynamically. Some urls can be unhealthy.
// TCP related errors (host unreachable, connection refused, connection timed out, connection reset) can be safely retried with a newly generated url.
loop {
let http_request = create_request_with_generated_url()?;

match self.request(http_request).await {
Ok(response) => break response,
Err(agent_error) => match agent_error {
AgentError::TransportError(ref err) => {
let is_connect_err = err
.downcast_ref::<reqwest::Error>()
.is_some_and(|e| e.is_connect());

// Retry only connection-related errors.
if is_connect_err {
if retries <= 0 {
return Err(AgentError::TransportError(
"retries exhausted".into(),
));
}
retries -= 1;
// Sleep before retrying. Delay time is not changed, as is the case for http retry.
tokio::time::sleep(delay).await;
continue;
}
// All other transport errors are not retried.
return Err(agent_error);
}
// All non-transport errors are not retried.
_ => return Err(agent_error),
},
}
}
};

if result.0 != StatusCode::TOO_MANY_REQUESTS {
break result;
}

retries -= 1;
if retries == 0 {
if retries <= 0 {
return Err(AgentError::TransportError("retries exhausted".into()));
}

retries -= 1;

tokio::time::sleep(delay).await;
delay *= 2;
};
Expand Down
2 changes: 1 addition & 1 deletion src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ pub async fn setup_router(
// Prepare the states
let state_handler = Arc::new(handler::HandlerState::new(
client,
!cli.ic.unsafe_disable_response_verification,
!cli.ic.ic_unsafe_disable_response_verification,
));
let state_api = Arc::new(proxy::ApiProxyState::new(
http_client.clone(),
Expand Down

0 comments on commit 32e00ef

Please sign in to comment.