diff --git a/examples/s3/Cargo.toml b/examples/s3/Cargo.toml index ec6cf9de1e3f..3c9f35b6c353 100644 --- a/examples/s3/Cargo.toml +++ b/examples/s3/Cargo.toml @@ -40,3 +40,6 @@ features = ["env-filter"] [dependencies.uuid] version = "0.8" features = ["serde", "v4"] + +[dependencies.http] +version = "0.2" diff --git a/examples/s3/src/bin/upload-file-multipart.rs b/examples/s3/src/bin/upload-file-multipart.rs new file mode 100644 index 000000000000..67c2c335cdee --- /dev/null +++ b/examples/s3/src/bin/upload-file-multipart.rs @@ -0,0 +1,100 @@ +use aws_sdk_s3::types::ByteStream; +use aws_sdk_s3::{Client, Endpoint, Error}; +use tokio::io::AsyncSeekExt; +use aws_sdk_s3::model::CompletedMultipartUpload; +use aws_sdk_s3::model::CompletedPart; + +/// Upload file reading files in chunks. +/// +/// ## Usage +/// ``` +/// upload-file-multipart +/// ``` +/// +#[tokio::main] +async fn main() -> Result<(), aws_sdk_s3::Error> { + const REGION: &str = "us-east-1"; + let args = std::env::args().collect::>(); + let usage = format!("{} ", args[0]); + let profile = args.get(1).expect(&usage); + let url = args.get(2).expect(&usage); + let bucket = args.get(3).expect(&usage); + let key = args.get(4).expect(&usage); + let file_name = args.get(5).expect(&usage); + let num_parts = args.get(6).expect(&usage).parse::().expect("Error parsing num parts"); + // credentials are read from .aws/credentials file + let conf = aws_config::from_env() + .region(REGION) + .credentials_provider( + aws_config::profile::ProfileFileCredentialsProvider::builder() + .profile_name(profile) + .build(), + ) + .load() + .await; + let uri = url.parse::().expect("Invalid URL"); + let ep = Endpoint::immutable(uri); + let s3_conf = aws_sdk_s3::config::Builder::from(&conf) + .endpoint_resolver(ep) + .build(); + let client = Client::from_conf(s3_conf); + upload_multipart(&client, &bucket, &file_name, &key, num_parts).await?; + Ok(()) +} + +/// Multipart upload +pub async fn upload_multipart( + client: &Client, + bucket: &str, + file_name: &str, + key: &str, + num_parts: usize +) -> Result<(), Error> { + let len: u64 = std::fs::metadata(file_name).map_err(|err| Error::Unhandled(Box::new(err)))?.len(); + let num_parts = num_parts as u64; + let file = tokio::fs::File::open(file_name).await.map_err(|err| Error::Unhandled(Box::new(err)))?; + let chunk_size = len / num_parts; + let last_chunk_size = chunk_size + len % num_parts; + // Initiate multipart upload and store upload id. + let u = client + .create_multipart_upload() + .bucket(bucket) + .key(key) + .send() + .await?; + let uid = u.upload_id().ok_or( + Error::NoSuchUpload(aws_sdk_s3::error::NoSuchUpload::builder().message("No upload ID").build()))?; + // Iterate over file chunks, changing the file pointer at each iteration + // and storing part id and associated etag into vector. + let mut completed_parts: Vec = Vec::new(); + for i in 0..num_parts { + let mut file = file.try_clone().await.unwrap(); + let size = if i != (num_parts - 1) { chunk_size } else { last_chunk_size }; + file.seek(std::io::SeekFrom::Start((i * len / num_parts) as u64)).await.map_err(|err| Error::Unhandled(Box::new(err)))?; + let body = ByteStream::from_file_chunk(file, size).await.map_err(|err| Error::Unhandled(Box::new(err)))?; + let up = client + .upload_part() + .bucket(bucket) + .key(key) + .content_length(size as i64) + .upload_id(uid.clone()) + .part_number((i + 1) as i32) + .body(body) + .send() + .await?; + let cp = CompletedPart::builder().set_e_tag(up.e_tag).part_number((i+1) as i32).build(); + completed_parts.push(cp); + + } + // Complete multipart upload, sending the (etag, part id) list along the request. + let b = CompletedMultipartUpload::builder().set_parts(Some(completed_parts)).build(); + let completed = client.complete_multipart_upload().multipart_upload(b). + upload_id(uid.clone()).bucket(bucket).key(key).send().await?; + // Print etag removing quotes. + if let Some(etag) = completed.e_tag { + println!("{}", etag.replace("\"","")); + } else { + eprintln!("Error receiving etag"); + } + Ok(()) +} diff --git a/examples/s3/src/bin/upload-file.rs b/examples/s3/src/bin/upload-file.rs new file mode 100644 index 000000000000..c16e94bf839f --- /dev/null +++ b/examples/s3/src/bin/upload-file.rs @@ -0,0 +1,66 @@ +use aws_sdk_s3::types::ByteStream; +use aws_sdk_s3::{Client, Endpoint, Error}; // snippet-end:[s3.rust.client-use] +use std::path::Path; + + +/// Upload a file chunk to an S3 object +#[tokio::main] +async fn main() -> Result<(), aws_sdk_s3::Error> { + let args = std::env::args().collect::>(); + let usage = format!("{} ", args[0]); + let profile = args.get(1).expect(&usage); + let url = args.get(2).expect(&usage); + let bucket = args.get(3).expect(&usage); + let key = args.get(4).expect(&usage); + let file_name = args.get(5).expect(&usage); + let start_offset = args.get(6).expect(&usage).parse::().expect("Error parsing offset"); + let chunk_size = args.get(7).expect(&usage).parse::().expect("Error parsing chunk size"); + let md = std::fs::metadata(file_name).map_err(|err| Error::Unhandled(Box::new(err)))?; + let chunk_size = if chunk_size == 0 { md.len()} else {chunk_size}; + + // credentials are read from .aws/credentials file + let conf = aws_config::from_env() + .region("us-east-1") + .credentials_provider( + aws_config::profile::ProfileFileCredentialsProvider::builder() + .profile_name(profile) + .build(), + ) + .load() + .await; + let uri = url.parse::().expect("Invalid URL"); + let ep = Endpoint::immutable(uri); + let s3_conf = aws_sdk_s3::config::Builder::from(&conf) + .endpoint_resolver(ep) + .build(); + let client = Client::from_conf(s3_conf); + upload_chunk(&client, &bucket, &file_name, &key, start_offset, chunk_size).await?; + Ok(()) +} + +/// Upload file chunk to bucket/key +pub async fn upload_chunk( + client: &Client, + bucket: &str, + file_name: &str, + key: &str, + start_offset: u64, + chunk_size: u64 +) -> Result<(), Error> { + let body = ByteStream::from_path_chunk(Path::new(file_name), start_offset, chunk_size) + .await + .expect(&format!("Cannot read from {}", file_name)); + client + .put_object() + .bucket(bucket) + .key(key) + .body(body) + .send() + .await?; + println!( + "Uploaded chunk of size {} from file {}", + chunk_size, + file_name, + ); + Ok(()) +} diff --git a/sdk/aws-smithy-http/src/byte_stream.rs b/sdk/aws-smithy-http/src/byte_stream.rs index d92937ba1f84..a5c98568f01c 100644 --- a/sdk/aws-smithy-http/src/byte_stream.rs +++ b/sdk/aws-smithy-http/src/byte_stream.rs @@ -110,6 +110,7 @@ use std::task::{Context, Poll}; #[cfg(feature = "rt-tokio")] mod bytestream_util; +use tokio::io::AsyncSeekExt; //to enable tokio::fs::File::seek /// Stream of binary data /// @@ -293,6 +294,81 @@ impl ByteStream { )); Ok(ByteStream::new(body)) } + + /// Create a ByteStream from a file chunk. In order to keep the file + /// argument immutable the current offset in the file cannot be checked + /// because a call to `stream_position()` requires the file handle to + /// be mutable. + /// + /// Applications must call `file.seek()` before passing the file + /// handle to this function. + /// + /// NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of + /// upstream failures, use [`ByteStream::from_path`](ByteStream::from_path) + #[cfg(feature = "rt-tokio")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] + pub async fn from_file_chunk(file: tokio::fs::File, chunk_size: u64) -> Result { + let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new( + bytestream_util::PathBody::from_file(file, chunk_size), + )); + Ok(ByteStream::new(body)) + } + + /// Create a ByteStream from a file path specifying offset and length. + /// + /// # Examples + ///```no_run + ///use std::time::Instant; + ///pub async fn upload_chunk( + /// client: &Client, + /// bucket: &str, + /// file_name: &str, + /// key: &str, + /// start_offset: u64, + /// chunk_size: u64 + ///) -> Result<(), Error> { + /// let body = ByteStream::from_path_chunk(Path::new(file_name), start_offset, chunk_size) + /// .await + /// .expect(&format!("Cannot read from {}", file_name)); + /// let start = Instant::now(); + /// client + /// .put_object() + /// .bucket(bucket) + /// .key(key) + /// .body(body) + /// .send() + /// .await?; + /// let elapsed = start.elapsed(); + /// println!( + /// "Uploaded chunk of size {} from file {} in {:.2} s", + /// chunk_size, + /// file_name, + /// elapsed.as_secs_f32() + /// ); + /// Ok(()) + ///} + ///``` + #[cfg(feature = "rt-tokio")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] + pub async fn from_path_chunk(path: impl AsRef, start_offset: u64, chunk_size: u64) -> Result { + let path = path.as_ref(); + let sz = tokio::fs::metadata(path) + .await + .map_err(|err| Error(err.into()))? + .len(); + if start_offset >= sz { + return Err(Error(Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, "Offset exceeds file size")))); + } + if chunk_size > (sz - start_offset) || chunk_size == 0 { + return Err(Error(Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, "Chunk size out of range")))); + } + let mut file = tokio::fs::File::open(path).await.map_err(|err| Error(err.into()))?; + let _s = file.seek(std::io::SeekFrom::Start(start_offset)).await.map_err(|err| Error(err.into()))?; + let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new( + bytestream_util::PathBody::from_file(file, chunk_size), + )); + Ok(ByteStream::new(body)) + } } impl Default for ByteStream {