Skip to content

Commit

Permalink
Fix the S3 alternate runtime retries test in orchestrator mode (#2825)
Browse files Browse the repository at this point in the history
This PR fixes the S3 `alternative-async-runtime` retry tests. The retry
backoff time was being included in the operation attempt timeout, which
I think is undesirable as it makes retry config much harder to get right
since the backoff times are not predictable (they have randomness
incorporated into them). The overall operation timeout is the only one
that needs to incorporate backoff times.

In addition, this PR also:
- Updates READMEs for the `aws-smithy-runtime-api` and
`aws-smithy-runtime` crates
- Adds top-level crate docs to describe features to `aws-smithy-runtime`
- Copies `capture_test_logs` into `aws-smithy-runtime` so that it can be
used (just about) everywhere instead of just in `aws-config`
- Adds service/operation name to the tracing `invoke` span so it's
possible to tell which operation the events are for
- Makes the `Debug` impl for `Identity` useful
- Adds a ton of trace/debug spans and events to the orchestrator
- Fixes an issue in `aws-smithy-runtime` where a huge amount of the
orchestrator tests weren't being compiled due to a removed feature flag

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
jdisanti committed Jun 30, 2023
1 parent 386ded8 commit 847ed0b
Show file tree
Hide file tree
Showing 23 changed files with 318 additions and 118 deletions.
2 changes: 2 additions & 0 deletions aws/rust-runtime/aws-config/src/test_case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ pub(crate) struct Metadata {
name: String,
}

// TODO(enableNewSmithyRuntimeCleanup): Replace Tee, capture_test_logs, and Rx with
// the implementations added to aws_smithy_runtime::test_util::capture_test_logs
struct Tee<W> {
buf: Arc<Mutex<Vec<u8>>>,
quiet: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@
mod test {
use aws_sdk_dynamodb::config::{Credentials, Region, SharedAsyncSleep};
use aws_sdk_dynamodb::{config::retry::RetryConfig, error::ProvideErrorMetadata};
use aws_smithy_async::rt::sleep::TokioSleep;
use aws_smithy_async::test_util::instant_time_and_sleep;
use aws_smithy_async::time::SharedTimeSource;
use aws_smithy_async::time::SystemTimeSource;
use aws_smithy_client::test_connection::TestConnection;
use aws_smithy_http::body::SdkBody;
use aws_smithy_runtime::client::retries::RetryPartition;
use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
use aws_smithy_types::timeout::TimeoutConfigBuilder;
use std::time::{Duration, Instant, SystemTime};
use std::time::{Duration, SystemTime};

fn req() -> HttpRequest {
http::Request::builder()
Expand Down Expand Up @@ -111,15 +108,16 @@ mod test {
}

#[tokio::test]
async fn test_adaptive_retries_with_throttling_errors_times_out() {
tracing_subscriber::fmt::init();
async fn test_adaptive_retries_with_throttling_errors() {
let (time_source, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);

let events = vec![
// First operation
(req(), err()),
(req(), throttling_err()),
(req(), throttling_err()),
(req(), ok()),
// Second operation
(req(), err()),
(req(), throttling_err()),
(req(), ok()),
];

Expand All @@ -130,44 +128,31 @@ mod test {
.retry_config(
RetryConfig::adaptive()
.with_max_attempts(4)
.with_initial_backoff(Duration::from_millis(50))
.with_use_static_exponential_base(true),
)
.timeout_config(
TimeoutConfigBuilder::new()
.operation_attempt_timeout(Duration::from_millis(100))
.build(),
)
.time_source(SharedTimeSource::new(SystemTimeSource::new()))
.sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
.http_connector(conn.clone())
.time_source(SharedTimeSource::new(time_source))
.sleep_impl(SharedAsyncSleep::new(sleep_impl.clone()))
.retry_partition(RetryPartition::new(
"test_adaptive_retries_with_throttling_errors_times_out",
"test_adaptive_retries_with_throttling_errors",
))
.http_connector(conn.clone())
.build();

let expected_table_names = vec!["Test".to_owned()];
let start = Instant::now();

// We create a new client each time to ensure that the cross-client retry state is working.
let client = aws_sdk_dynamodb::Client::from_conf(config.clone());
let res = client.list_tables().send().await.unwrap();
assert_eq!(sleep_impl.total_duration(), Duration::from_secs(40));
assert_eq!(res.table_names(), Some(expected_table_names.as_slice()));
// Three requests should have been made, two failing & one success
assert_eq!(conn.requests().len(), 2);
assert_eq!(conn.requests().len(), 3);

let client = aws_sdk_dynamodb::Client::from_conf(config);
let err = client.list_tables().send().await.unwrap_err();
assert_eq!(err.to_string(), "request has timed out".to_owned());
// two requests should have been made, both failing (plus previous requests)
assert_eq!(conn.requests().len(), 2 + 2);

let since = start.elapsed();
// At least 300 milliseconds must pass:
// - 50ms for the first retry on attempt 1
// - 50ms for the second retry on attempt 3
// - 100ms for the throttling delay triggered by attempt 4, which required a delay longer than the attempt timeout.
// - 100ms for the 5th attempt, which would have succeeded, but required a delay longer than the attempt timeout.
assert!(since.as_secs_f64() > 0.3);
let client = aws_sdk_dynamodb::Client::from_conf(config.clone());
let res = client.list_tables().send().await.unwrap();
assert!(Duration::from_secs(48) < sleep_impl.total_duration());
assert!(Duration::from_secs(49) > sleep_impl.total_duration());
assert_eq!(res.table_names(), Some(expected_table_names.as_slice()));
// Two requests should have been made, one failing & one success (plus previous requests)
assert_eq!(conn.requests().len(), 5);
}
}
1 change: 1 addition & 0 deletions aws/sdk/integration-tests/s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ aws-smithy-async = { path = "../../build/aws-sdk/sdk/aws-smithy-async", features
aws-smithy-client = { path = "../../build/aws-sdk/sdk/aws-smithy-client", features = ["test-util", "rustls"] }
aws-smithy-http = { path = "../../build/aws-sdk/sdk/aws-smithy-http" }
aws-smithy-protocol-test = { path = "../../build/aws-sdk/sdk/aws-smithy-protocol-test" }
aws-smithy-runtime = { path = "../../build/aws-sdk/sdk/aws-smithy-runtime", features = ["test-util"] }
aws-smithy-types = { path = "../../build/aws-sdk/sdk/aws-smithy-types" }
aws-types = { path = "../../build/aws-sdk/sdk/aws-types" }
bytes = "1"
Expand Down
17 changes: 16 additions & 1 deletion aws/sdk/integration-tests/s3/tests/alternative-async-runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use aws_smithy_types::timeout::TimeoutConfig;
use std::fmt::Debug;
use std::time::{Duration, Instant};

#[cfg(aws_sdk_orchestrator_mode)]
use aws_smithy_runtime::test_util::capture_test_logs::capture_test_logs;

#[derive(Debug)]
struct SmolSleep;

Expand All @@ -33,6 +36,9 @@ impl AsyncSleep for SmolSleep {

#[test]
fn test_smol_runtime_timeouts() {
#[cfg(aws_sdk_orchestrator_mode)]
let _guard = capture_test_logs();

if let Err(err) = smol::block_on(async { timeout_test(SharedAsyncSleep::new(SmolSleep)).await })
{
println!("{err}");
Expand All @@ -42,6 +48,9 @@ fn test_smol_runtime_timeouts() {

#[test]
fn test_smol_runtime_retry() {
#[cfg(aws_sdk_orchestrator_mode)]
let _guard = capture_test_logs();

if let Err(err) = smol::block_on(async { retry_test(SharedAsyncSleep::new(SmolSleep)).await }) {
println!("{err}");
panic!();
Expand All @@ -59,6 +68,9 @@ impl AsyncSleep for AsyncStdSleep {

#[test]
fn test_async_std_runtime_timeouts() {
#[cfg(aws_sdk_orchestrator_mode)]
let _guard = capture_test_logs();

if let Err(err) = async_std::task::block_on(async {
timeout_test(SharedAsyncSleep::new(AsyncStdSleep)).await
}) {
Expand All @@ -69,6 +81,9 @@ fn test_async_std_runtime_timeouts() {

#[test]
fn test_async_std_runtime_retry() {
#[cfg(aws_sdk_orchestrator_mode)]
let _guard = capture_test_logs();

if let Err(err) =
async_std::task::block_on(async { retry_test(SharedAsyncSleep::new(AsyncStdSleep)).await })
{
Expand Down Expand Up @@ -137,7 +152,7 @@ async fn retry_test(sleep_impl: SharedAsyncSleep) -> Result<(), Box<dyn std::err
.region(Region::new("us-east-2"))
.http_connector(conn.clone())
.credentials_provider(SharedCredentialsProvider::new(Credentials::for_tests()))
.retry_config(RetryConfig::standard())
.retry_config(RetryConfig::standard().with_max_attempts(3))
.timeout_config(
TimeoutConfig::builder()
.operation_attempt_timeout(Duration::from_secs_f64(0.1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope
import software.amazon.smithy.rust.codegen.core.smithy.RustCrate
import software.amazon.smithy.rust.codegen.core.util.sdkId

class ResiliencyConfigCustomization(private val codegenContext: ClientCodegenContext) : ConfigCustomization() {
private val runtimeConfig = codegenContext.runtimeConfig
Expand Down Expand Up @@ -369,10 +370,10 @@ class ResiliencyConfigCustomization(private val codegenContext: ClientCodegenCon
if (runtimeMode.defaultToOrchestrator) {
rustTemplate(
"""
let retry_partition = layer.load::<#{RetryPartition}>().cloned().unwrap_or_else(|| #{RetryPartition}::new("${codegenContext.serviceShape.id.name}"));
let retry_partition = layer.load::<#{RetryPartition}>().cloned().unwrap_or_else(|| #{RetryPartition}::new("${codegenContext.serviceShape.sdkId()}"));
let retry_config = layer.load::<#{RetryConfig}>().cloned().unwrap_or_else(#{RetryConfig}::disabled);
if retry_config.has_retry() {
#{debug}!("creating retry strategy with partition '{}'", retry_partition);
#{debug}!("using retry strategy with partition '{}'", retry_partition);
}
if retry_config.mode() == #{RetryMode}::Adaptive {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.pre
import software.amazon.smithy.rust.codegen.core.smithy.customize.writeCustomizations
import software.amazon.smithy.rust.codegen.core.smithy.generators.protocol.ProtocolPayloadGenerator
import software.amazon.smithy.rust.codegen.core.smithy.protocols.Protocol
import software.amazon.smithy.rust.codegen.core.util.dq
import software.amazon.smithy.rust.codegen.core.util.inputShape
import software.amazon.smithy.rust.codegen.core.util.outputShape
import software.amazon.smithy.rust.codegen.core.util.sdkId

open class OperationGenerator(
private val codegenContext: ClientCodegenContext,
Expand Down Expand Up @@ -142,7 +144,13 @@ open class OperationGenerator(
stop_point: #{StopPoint},
) -> #{Result}<#{InterceptorContext}, #{SdkError}<#{Error}, #{HttpResponse}>> {
let input = #{TypedBox}::new(input).erase();
#{invoke_with_stop_point}(input, runtime_plugins, stop_point).await
#{invoke_with_stop_point}(
${codegenContext.serviceShape.sdkId().dq()},
${operationName.dq()},
input,
runtime_plugins,
stop_point
).await
}
pub(crate) fn operation_runtime_plugins(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class OperationRuntimePluginGenerator(
fn config(&self) -> #{Option}<#{FrozenLayer}> {
let mut cfg = #{Layer}::new(${operationShape.id.name.dq()});
use #{ConfigBagAccessors} as _;
cfg.set_request_serializer(#{SharedRequestSerializer}::new(${operationStructName}RequestSerializer));
cfg.set_response_deserializer(#{DynResponseDeserializer}::new(${operationStructName}ResponseDeserializer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package software.amazon.smithy.rust.codegen.client.smithy.generators.protocol

import software.amazon.smithy.aws.traits.ServiceTrait
import software.amazon.smithy.model.shapes.BlobShape
import software.amazon.smithy.model.shapes.OperationShape
import software.amazon.smithy.rust.codegen.client.smithy.ClientRustModule
Expand All @@ -31,9 +30,9 @@ import software.amazon.smithy.rust.codegen.core.smithy.protocols.HttpLocation
import software.amazon.smithy.rust.codegen.core.smithy.protocols.Protocol
import software.amazon.smithy.rust.codegen.core.util.dq
import software.amazon.smithy.rust.codegen.core.util.findStreamingMember
import software.amazon.smithy.rust.codegen.core.util.getTrait
import software.amazon.smithy.rust.codegen.core.util.inputShape
import software.amazon.smithy.rust.codegen.core.util.letIf
import software.amazon.smithy.rust.codegen.core.util.sdkId

// TODO(enableNewSmithyRuntimeCleanup): Delete this class when cleaning up `enableNewSmithyRuntime`
/** Generates the `make_operation` function on input structs */
Expand All @@ -53,9 +52,7 @@ open class MakeOperationGenerator(
private val defaultClassifier = RuntimeType.smithyHttp(runtimeConfig)
.resolve("retry::DefaultResponseRetryClassifier")

private val sdkId =
codegenContext.serviceShape.getTrait<ServiceTrait>()?.sdkId?.lowercase()?.replace(" ", "")
?: codegenContext.serviceShape.id.getName(codegenContext.serviceShape)
private val sdkId = codegenContext.serviceShape.sdkId()

private val codegenScope = arrayOf(
*preludeScope,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package software.amazon.smithy.rust.codegen.core.util

import software.amazon.smithy.aws.traits.ServiceTrait
import software.amazon.smithy.codegen.core.CodegenException
import software.amazon.smithy.model.Model
import software.amazon.smithy.model.shapes.BooleanShape
Expand Down Expand Up @@ -146,3 +147,7 @@ fun String.shapeId() = ShapeId.from(this)

/** Returns the service name, or a default value if the service doesn't have a title trait */
fun ServiceShape.serviceNameOrDefault(default: String) = getTrait<TitleTrait>()?.value ?: default

/** Returns the SDK ID of the given service shape */
fun ServiceShape.sdkId(): String =
getTrait<ServiceTrait>()?.sdkId?.lowercase()?.replace(" ", "") ?: id.getName(this)
4 changes: 2 additions & 2 deletions rust-runtime/aws-smithy-runtime-api/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# aws-smithy-retries
# aws-smithy-runtime-api

**This crate is UNSTABLE! All internal and external interfaces are subject to change without notice.**

Smithy runtime types.
Lightweight crate with traits and types necessary to configure runtime logic in the `aws-smithy-runtime` crate.

<!-- anchor_start:footer -->
This crate is part of the [AWS SDK for Rust](https://awslabs.github.io/aws-sdk-rust/) and the [smithy-rs](https://github.com/awslabs/smithy-rs) code generator. In most cases, it should not be used directly.
Expand Down
22 changes: 19 additions & 3 deletions rust-runtime/aws-smithy-runtime-api/src/client/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::client::auth::AuthSchemeId;
use crate::client::orchestrator::Future;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreAppend, StoreReplace};
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::SystemTime;
Expand Down Expand Up @@ -96,21 +97,27 @@ impl IdentityResolvers {
}
}

#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct Identity {
data: Arc<dyn Any + Send + Sync>,
#[allow(clippy::type_complexity)]
data_debug: Arc<dyn (Fn(&Arc<dyn Any + Send + Sync>) -> &dyn Debug) + Send + Sync>,
expiration: Option<SystemTime>,
}

impl Identity {
pub fn new(data: impl Any + Send + Sync, expiration: Option<SystemTime>) -> Self {
pub fn new<T>(data: T, expiration: Option<SystemTime>) -> Self
where
T: Any + Debug + Send + Sync,
{
Self {
data: Arc::new(data),
data_debug: Arc::new(|d| d.downcast_ref::<T>().expect("type-checked") as _),
expiration,
}
}

pub fn data<T: 'static>(&self) -> Option<&T> {
pub fn data<T: Any + Debug + Send + Sync + 'static>(&self) -> Option<&T> {
self.data.downcast_ref()
}

Expand All @@ -119,6 +126,15 @@ impl Identity {
}
}

impl fmt::Debug for Identity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Identity")
.field("data", (self.data_debug)(&self.data))
.field("expiration", &self.expiration)
.finish()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
11 changes: 11 additions & 0 deletions rust-runtime/aws-smithy-runtime-api/src/client/interceptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,11 @@ macro_rules! interceptor_impl_fn {
ctx: &mut InterceptorContext,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!(concat!(
"running `",
stringify!($interceptor),
"` interceptors"
));
let mut result: Result<(), BoxError> = Ok(());
let mut ctx = ctx.into();
for interceptor in self.interceptors() {
Expand Down Expand Up @@ -774,6 +779,7 @@ impl Interceptors {
ctx: &InterceptorContext<Input, Output, Error>,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `client_read_before_execution` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let ctx: BeforeSerializationInterceptorContextRef<'_> = ctx.into();
for interceptor in self.client_interceptors() {
Expand All @@ -794,6 +800,7 @@ impl Interceptors {
ctx: &InterceptorContext<Input, Output, Error>,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `operation_read_before_execution` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let ctx: BeforeSerializationInterceptorContextRef<'_> = ctx.into();
for interceptor in self.operation_interceptors() {
Expand Down Expand Up @@ -829,6 +836,7 @@ impl Interceptors {
ctx: &mut InterceptorContext<Input, Output, Error>,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `modify_before_attempt_completion` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let mut ctx: FinalizerInterceptorContextMut<'_> = ctx.into();
for interceptor in self.interceptors() {
Expand All @@ -850,6 +858,7 @@ impl Interceptors {
ctx: &InterceptorContext<Input, Output, Error>,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `read_after_attempt` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let ctx: FinalizerInterceptorContextRef<'_> = ctx.into();
for interceptor in self.interceptors() {
Expand All @@ -870,6 +879,7 @@ impl Interceptors {
ctx: &mut InterceptorContext<Input, Output, Error>,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `modify_before_completion` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let mut ctx: FinalizerInterceptorContextMut<'_> = ctx.into();
for interceptor in self.interceptors() {
Expand All @@ -890,6 +900,7 @@ impl Interceptors {
ctx: &InterceptorContext<Input, Output, Error>,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `read_after_execution` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let ctx: FinalizerInterceptorContextRef<'_> = ctx.into();
for interceptor in self.interceptors() {
Expand Down
Loading

0 comments on commit 847ed0b

Please sign in to comment.