Skip to content

fix: extra byte read from chunk transfer #1294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions runtime/auth/aws-signing-common/api/aws-signing-common.api
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public final class aws/smithy/kotlin/runtime/auth/awssigning/AwsChunkedSource :
public fun <init> (Laws/smithy/kotlin/runtime/io/SdkSource;Laws/smithy/kotlin/runtime/auth/awssigning/AwsSigner;Laws/smithy/kotlin/runtime/auth/awssigning/AwsSigningConfig;[BLaws/smithy/kotlin/runtime/http/DeferredHeaders;)V
public synthetic fun <init> (Laws/smithy/kotlin/runtime/io/SdkSource;Laws/smithy/kotlin/runtime/auth/awssigning/AwsSigner;Laws/smithy/kotlin/runtime/auth/awssigning/AwsSigningConfig;[BLaws/smithy/kotlin/runtime/http/DeferredHeaders;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun close ()V
public final fun getContentBytesTransferred ()J
public fun read (Laws/smithy/kotlin/runtime/io/SdkBuffer;J)J
public final fun setContentBytesTransferred (J)V
}

public final class aws/smithy/kotlin/runtime/auth/awssigning/AwsSignatureType : java/lang/Enum {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,27 @@ public class AwsChunkedSource(
trailingHeaders,
)

/**
* Tracks the content bytes transferred, excluding chunk metadata.
* This public property can be accessed to monitor file transfer progress.
*/
public var contentBytesTransferred: Long = 0L

override fun read(sink: SdkBuffer, limit: Long): Long {
require(limit >= 0L) { "Invalid limit ($limit) must be >= 0L" }
// COROUTINE SAFETY: runBlocking is allowed here because SdkSource is a synchronous blocking interface

// reset metadata bytes counter
chunkReader.chunkMetadataBytes = 0
val isChunkValid = runBlocking {
chunkReader.ensureValidChunk()
}
if (!isChunkValid) return -1L
return chunkReader.chunk.read(sink, limit)

val totalBytesTransferred = chunkReader.chunk.read(sink, limit)
contentBytesTransferred = totalBytesTransferred - chunkReader.chunkMetadataBytes

return totalBytesTransferred
Comment on lines -50 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I feel pretty strongly that AwsChunkedSource.read should return the number of bytes which were read from the delegate source rather than the number of bytes written to the destination sink. Can we just return totalBytesTransferred - chunkReader.chunkMetadataBytes instead of storing it in a property? What do we need the total bytes transferred for (at least, as the output from read)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to return totalBytes for some of our tests:

val totalBytesExpected = encodedUnsignedChunkLength(CHUNK_SIZE_BYTES) + encodedUnsignedChunkLength(0) + "\r\n".length

If we not return the totalBytesTransferred, those tests will fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure we can find a way around that, maybe by exposing the total bytes as some kind of test utility?

}

override fun close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ internal class AwsChunkedReader(
*/
internal var hasLastChunkBeenSent: Boolean = false

/**
* Tracks the number of bytes used for chunk metadata.
* This includes chunk headers, terminators (CRLF), and trailers.
* Used to calculate the content bytes transferred by subtracting metadata bytes from the total bytes read.
*/
internal var chunkMetadataBytes: Long = 0L

/**
* Ensures that the internal [chunk] is valid for reading. If it's not valid, try to load the next chunk. Note that
* this function will suspend until the whole chunk has been loaded.
Expand All @@ -65,7 +72,9 @@ internal class AwsChunkedReader(
*/
internal suspend fun ensureValidChunk(): Boolean {
// check if the current chunk is still valid
if (chunk.size > 0L) return true
if (chunk.size > 0L) {
return true
}

// if not, try to fetch a new chunk
val nextChunk = when {
Expand All @@ -80,9 +89,10 @@ internal class AwsChunkedReader(
next
}
}

val preTerminatorChunkSize = nextChunk?.size ?: 0L
nextChunk?.writeUtf8("\r\n") // terminating CRLF to signal end of chunk

val chunkSizeWithTerminator = nextChunk?.size ?: 0L
chunkMetadataBytes += chunkSizeWithTerminator - preTerminatorChunkSize
// transfer all segments to the working chunk
nextChunk?.let { chunk.writeAll(it) }

Expand All @@ -96,12 +106,14 @@ internal class AwsChunkedReader(
private suspend fun getFinalChunk(): SdkBuffer {
// empty chunk
val lastChunk = checkNotNull(if (signingConfig.isUnsigned) getUnsignedChunk(SdkBuffer()) else getSignedChunk(SdkBuffer()))

val preTrailerChunkSize = lastChunk.size
// + any trailers
if (!trailingHeaders.isEmpty()) {
val trailingHeaderChunk = getTrailingHeadersChunk(trailingHeaders.toHeaders())
lastChunk.writeAll(trailingHeaderChunk)
}
val trailersSize = lastChunk.size - preTrailerChunkSize
chunkMetadataBytes += trailersSize
return lastChunk
}

Expand Down Expand Up @@ -155,7 +167,7 @@ internal class AwsChunkedReader(
write(chunkSignature)
writeUtf8("\r\n")
}

chunkMetadataBytes += signedChunk.size
// append the body
signedChunk.write(chunkBody)

Expand All @@ -174,7 +186,7 @@ internal class AwsChunkedReader(
*/
private suspend fun getUnsignedChunk(data: SdkBuffer? = null): SdkBuffer? {
val bodyBuffer = data ?: stream.readChunk() ?: return null

val bodyBytes = bodyBuffer.size
val unsignedChunk = SdkBuffer()

// headers
Expand All @@ -183,7 +195,7 @@ internal class AwsChunkedReader(
writeUtf8("\r\n")
writeAll(bodyBuffer) // append the body
}

chunkMetadataBytes += unsignedChunk.size - bodyBytes
return unsignedChunk
}

Expand Down
Loading