From 3ddc5b8282ab9e11c4c89e89d0a3ca56bf3d58c1 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Mon, 9 Jun 2025 13:35:44 -0700 Subject: [PATCH 01/12] use the new send_data API --- packages/smithy-http/pyproject.toml | 2 +- .../smithy-http/src/smithy_http/aio/crt.py | 75 ++++++++++--------- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/packages/smithy-http/pyproject.toml b/packages/smithy-http/pyproject.toml index bd9ed9e66..6a1966bcd 100644 --- a/packages/smithy-http/pyproject.toml +++ b/packages/smithy-http/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ [project.optional-dependencies] awscrt = [ - "awscrt>=0.23.10", + "awscrt>=0.27.2", ] aiohttp = [ "aiohttp>=3.11.12, <4.0", diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index 9f0b5a418..68a8f29ec 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -117,7 +117,8 @@ def __init__(self) -> None: def set_stream(self, stream: "crt_http.HttpClientStream") -> None: if self._stream is not None: - raise SmithyHTTPException("Stream already set on AWSCRTHTTPResponse object") + raise SmithyHTTPException( + "Stream already set on AWSCRTHTTPResponse object") self._stream = stream concurrent_future: ConcurrentFuture[int] = stream.completion_future self._completion_future = asyncio.wrap_future(concurrent_future) @@ -236,19 +237,45 @@ async def send( :param request: The request including destination URI, fields, payload. :param request_config: Configuration specific to this request. """ - crt_request, crt_body = await self._marshal_request(request) + crt_request = await self._marshal_request(request) connection = await self._get_connection(request.destination) response_body = CRTResponseBody() response_factory = CRTResponseFactory(response_body) + # TODO: assert the connection is HTTP/2 crt_stream = connection.request( crt_request, response_factory.on_response, response_body.on_body, + True # allow manual stream write. ) response_factory.set_done_callback(crt_stream) response_body.set_stream(crt_stream) + + body = request.body + if isinstance(body, bytes | bytearray): + # If the body is already directly in memory, wrap in a BytesIO to hand + # off to CRT. + crt_body = BytesIO(body) + crt_stream.write_data(crt_body, True) + else: + # If the body is async, or potentially very large, start up a task to read + # it into the intermediate object that CRT needs. By using + # asyncio.create_task we'll start the coroutine without having to + # explicitly await it. + + if not isinstance(body, AsyncIterable): + body = AsyncBytesReader(body) + + # Start the read task in the background. + read_task = asyncio.create_task( + self._consume_body_async(body, crt_stream)) + + # Keep track of the read task so that it doesn't get garbage colllected, + # and stop tracking it once it's done. + self._async_reads.add(read_task) + read_task.add_done_callback(self._async_reads.discard) crt_stream.completion_future.add_done_callback( - partial(self._close_input_body, body=crt_body) + partial(self._close_input_body, stream=crt_stream) ) response = await response_factory.await_response() @@ -258,10 +285,10 @@ async def send( return response def _close_input_body( - self, future: ConcurrentFuture[int], *, body: "BufferableByteStream | BytesIO" + self, future: ConcurrentFuture[int], *, stream: "crt_http.HttpClientStream" ) -> None: if future.exception(timeout=0): - body.close() + stream.write_data(BytesIO(b''), True) async def _create_connection( self, url: core_interfaces.URI @@ -326,7 +353,8 @@ def _validate_connection(self, connection: "crt_http.HttpClientConnection") -> N if force_http_2 and connection.version is not crt_http.HttpVersion.Http2: connection.close() negotiated = crt_http.HttpVersion(connection.version).name - raise SmithyHTTPException(f"HTTP/2 could not be negotiated: {negotiated}") + raise SmithyHTTPException( + f"HTTP/2 could not be negotiated: {negotiated}") def _render_path(self, url: core_interfaces.URI) -> str: path = url.path if url.path is not None else "/" @@ -357,49 +385,25 @@ async def _marshal_request( path = self._render_path(request.destination) headers = crt_http.HttpHeaders(headers_list) - body = request.body - if isinstance(body, bytes | bytearray): - # If the body is already directly in memory, wrap in a BytesIO to hand - # off to CRT. - crt_body = BytesIO(body) - else: - # If the body is async, or potentially very large, start up a task to read - # it into the intermediate object that CRT needs. By using - # asyncio.create_task we'll start the coroutine without having to - # explicitly await it. - crt_body = BufferableByteStream() - - if not isinstance(body, AsyncIterable): - body = AsyncBytesReader(body) - - # Start the read task in the background. - read_task = asyncio.create_task(self._consume_body_async(body, crt_body)) - - # Keep track of the read task so that it doesn't get garbage colllected, - # and stop tracking it once it's done. - self._async_reads.add(read_task) - read_task.add_done_callback(self._async_reads.discard) - crt_request = crt_http.HttpRequest( method=request.method, path=path, headers=headers, - body_stream=crt_body, ) - return crt_request, crt_body + return crt_request async def _consume_body_async( - self, source: AsyncIterable[bytes], dest: "BufferableByteStream" + self, source: AsyncIterable[bytes], dest: "crt_http.HttpClientStream" ) -> None: try: async for chunk in source: - dest.write(chunk) + dest.write_data(BytesIO(chunk), False) except Exception: dest.close() raise finally: await close(source) - dest.end_stream() + dest.write_data(BytesIO(b''), True) def __deepcopy__(self, memo: Any) -> "AWSCRTHTTPClient": return AWSCRTHTTPClient( @@ -468,7 +472,8 @@ def write(self, buffer: "ReadableBuffer") -> int: ) if self._closed: - raise OSError("Stream is completed and doesn't support further writes.") + raise OSError( + "Stream is completed and doesn't support further writes.") if buffer: self._chunks.append(buffer) From 96edd6a228175e0e90ce504c3f82c94185ab7f3f Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Mon, 9 Jun 2025 13:55:45 -0700 Subject: [PATCH 02/12] couple fix --- packages/smithy-http/src/smithy_http/aio/crt.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index 68a8f29ec..575404d83 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -256,6 +256,7 @@ async def send( # If the body is already directly in memory, wrap in a BytesIO to hand # off to CRT. crt_body = BytesIO(body) + # TODO handle error, and it returns a future for now. crt_stream.write_data(crt_body, True) else: # If the body is async, or potentially very large, start up a task to read @@ -274,22 +275,11 @@ async def send( # and stop tracking it once it's done. self._async_reads.add(read_task) read_task.add_done_callback(self._async_reads.discard) - crt_stream.completion_future.add_done_callback( - partial(self._close_input_body, stream=crt_stream) - ) response = await response_factory.await_response() - if response.status != 200 and response.status >= 300: - await close(crt_body) return response - def _close_input_body( - self, future: ConcurrentFuture[int], *, stream: "crt_http.HttpClientStream" - ) -> None: - if future.exception(timeout=0): - stream.write_data(BytesIO(b''), True) - async def _create_connection( self, url: core_interfaces.URI ) -> "crt_http.HttpClientConnection": @@ -399,7 +389,6 @@ async def _consume_body_async( async for chunk in source: dest.write_data(BytesIO(chunk), False) except Exception: - dest.close() raise finally: await close(source) From e148ae4f88d794e323edeb8840a281a759c8b95b Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 10 Jun 2025 09:51:49 -0700 Subject: [PATCH 03/12] async-crt --- .../smithy-http/src/smithy_http/aio/crt.py | 51 +++++++++---------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index 575404d83..0b5a0b512 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -21,11 +21,13 @@ # pyright doesn't like optional imports. This is reasonable because if we use these # in type hints then they'd result in runtime errors. # TODO: add integ tests that import these without the dependendency installed - from awscrt import http as crt_http + from awscrt import http_asyncio as crt_http + from awscrt import http as crt_http_base from awscrt import io as crt_io try: - from awscrt import http as crt_http + from awscrt import http_asyncio as crt_http + from awscrt import http as crt_http_base from awscrt import io as crt_io HAS_CRT = True @@ -105,7 +107,7 @@ def __repr__(self) -> str: class CRTResponseBody: def __init__(self) -> None: - self._stream: crt_http.HttpClientStream | None = None + self._stream: crt_http.HttpClientStreamAsync | None = None self._completion_future: AsyncFuture[int] | None = None self._chunk_futures: deque[ConcurrentFuture[bytes]] = deque() @@ -115,13 +117,12 @@ def __init__(self) -> None: # an interface that better matches python's async. self._received_chunks: deque[bytes] = deque() - def set_stream(self, stream: "crt_http.HttpClientStream") -> None: + def set_stream(self, stream: "crt_http.HttpClientStreamAsync") -> None: if self._stream is not None: raise SmithyHTTPException( "Stream already set on AWSCRTHTTPResponse object") self._stream = stream - concurrent_future: ConcurrentFuture[int] = stream.completion_future - self._completion_future = asyncio.wrap_future(concurrent_future) + self._completion_future = stream._completion_future self._completion_future.add_done_callback(self._on_complete) self._stream.activate() @@ -185,8 +186,9 @@ def on_response( async def await_response(self) -> AWSCRTHTTPResponse: return await asyncio.wrap_future(self._response_future) - def set_done_callback(self, stream: "crt_http.HttpClientStream") -> None: - stream.completion_future.add_done_callback(self._cancel) + def set_done_callback(self, stream: "crt_http.HttpClientStreamAsync") -> None: + print(stream) + stream._completion_future.add_done_callback(self._cancel) def _cancel(self, completion_future: ConcurrentFuture[int | Exception]) -> None: if not self._response_future.done(): @@ -237,7 +239,7 @@ async def send( :param request: The request including destination URI, fields, payload. :param request_config: Configuration specific to this request. """ - crt_request = await self._marshal_request(request) + crt_request = self._marshal_request(request) connection = await self._get_connection(request.destination) response_body = CRTResponseBody() response_factory = CRTResponseFactory(response_body) @@ -257,7 +259,7 @@ async def send( # off to CRT. crt_body = BytesIO(body) # TODO handle error, and it returns a future for now. - crt_stream.write_data(crt_body, True) + await crt_stream.write_data(crt_body, True) else: # If the body is async, or potentially very large, start up a task to read # it into the intermediate object that CRT needs. By using @@ -285,7 +287,7 @@ async def _create_connection( ) -> "crt_http.HttpClientConnection": """Builds and validates connection to ``url``""" connect_future = self._build_new_connection(url) - connection = await asyncio.wrap_future(connect_future) + connection = await connect_future self._validate_connection(connection) return connection @@ -322,16 +324,13 @@ def _build_new_connection( if url.port is not None: port = url.port - connect_future: ConcurrentFuture[crt_http.HttpClientConnection] = ( - crt_http.HttpClientConnection.new( - bootstrap=self._client_bootstrap, - host_name=url.host, - port=port, - socket_options=self._socket_options, - tls_connection_options=tls_connection_options, - ) + return crt_http.HttpClientConnectionAsync.new( + bootstrap=self._client_bootstrap, + host_name=url.host, + port=port, + socket_options=self._socket_options, + tls_connection_options=tls_connection_options, ) - return connect_future def _validate_connection(self, connection: "crt_http.HttpClientConnection") -> None: """Validates an existing connection against the client config. @@ -351,7 +350,7 @@ def _render_path(self, url: core_interfaces.URI) -> str: query = f"?{url.query}" if url.query is not None else "" return f"{path}{query}" - async def _marshal_request( + def _marshal_request( self, request: http_aio_interfaces.HTTPRequest ) -> tuple["crt_http.HttpRequest", "BufferableByteStream | BytesIO"]: """Create :py:class:`awscrt.http.HttpRequest` from @@ -373,9 +372,9 @@ async def _marshal_request( headers_list.append((fld.name, val)) path = self._render_path(request.destination) - headers = crt_http.HttpHeaders(headers_list) + headers = crt_http_base.HttpHeaders(headers_list) - crt_request = crt_http.HttpRequest( + crt_request = crt_http_base.HttpRequest( method=request.method, path=path, headers=headers, @@ -383,16 +382,16 @@ async def _marshal_request( return crt_request async def _consume_body_async( - self, source: AsyncIterable[bytes], dest: "crt_http.HttpClientStream" + self, source: AsyncIterable[bytes], dest: "crt_http.HttpClientStreamAsync" ) -> None: try: async for chunk in source: - dest.write_data(BytesIO(chunk), False) + await dest.write_data(BytesIO(chunk), False) except Exception: raise finally: await close(source) - dest.write_data(BytesIO(b''), True) + await dest.write_data(BytesIO(b''), True) def __deepcopy__(self, memo: Any) -> "AWSCRTHTTPClient": return AWSCRTHTTPClient( From f82cbb23ece112648b27b7c9053d399c838a0220 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 10 Jun 2025 10:37:42 -0700 Subject: [PATCH 04/12] skip body --- .../smithy-http/src/smithy_http/aio/crt.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index 0b5a0b512..6bf3a89f2 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -259,7 +259,7 @@ async def send( # off to CRT. crt_body = BytesIO(body) # TODO handle error, and it returns a future for now. - await crt_stream.write_data(crt_body, True) + # await crt_stream.write_data(crt_body, True) else: # If the body is async, or potentially very large, start up a task to read # it into the intermediate object that CRT needs. By using @@ -384,14 +384,16 @@ def _marshal_request( async def _consume_body_async( self, source: AsyncIterable[bytes], dest: "crt_http.HttpClientStreamAsync" ) -> None: - try: - async for chunk in source: - await dest.write_data(BytesIO(chunk), False) - except Exception: - raise - finally: - await close(source) - await dest.write_data(BytesIO(b''), True) + pass + # try: + # asyncio.sleep(1) # Yield control to the event loop + # async for chunk in source: + # await dest.write_data(BytesIO(chunk), False) + # except Exception: + # raise + # finally: + # await close(source) + # await dest.write_data(BytesIO(b''), True) def __deepcopy__(self, memo: Any) -> "AWSCRTHTTPClient": return AWSCRTHTTPClient( From 62f0a2207dff1d0c0690f72f6487ad7588ec74d1 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 10 Jun 2025 12:07:07 -0700 Subject: [PATCH 05/12] WIP --- .../smithy-http/src/smithy_http/aio/crt.py | 45 +++++-------------- 1 file changed, 12 insertions(+), 33 deletions(-) diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index 6bf3a89f2..ae9a78427 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -126,27 +126,8 @@ def set_stream(self, stream: "crt_http.HttpClientStreamAsync") -> None: self._completion_future.add_done_callback(self._on_complete) self._stream.activate() - def on_body(self, chunk: bytes, **kwargs: Any) -> None: # pragma: crt-callback - # TODO: update back pressure window once CRT supports it - if self._chunk_futures: - future = self._chunk_futures.popleft() - future.set_result(chunk) - else: - self._received_chunks.append(chunk) - async def next(self) -> bytes: - if self._completion_future is None: - raise SmithyHTTPException("Stream not set") - - # TODO: update backpressure window once CRT supports it - if self._received_chunks: - return self._received_chunks.popleft() - elif self._completion_future.done(): - return b"" - else: - future = ConcurrentFuture[bytes]() - self._chunk_futures.append(future) - return await asyncio.wrap_future(future) + await self._stream.next() def _on_complete( self, completion_future: AsyncFuture[int] @@ -247,8 +228,7 @@ async def send( crt_stream = connection.request( crt_request, response_factory.on_response, - response_body.on_body, - True # allow manual stream write. + manual_write=True # allow manual stream write. ) response_factory.set_done_callback(crt_stream) response_body.set_stream(crt_stream) @@ -259,7 +239,7 @@ async def send( # off to CRT. crt_body = BytesIO(body) # TODO handle error, and it returns a future for now. - # await crt_stream.write_data(crt_body, True) + await crt_stream.write_data(crt_body, True) else: # If the body is async, or potentially very large, start up a task to read # it into the intermediate object that CRT needs. By using @@ -324,7 +304,7 @@ def _build_new_connection( if url.port is not None: port = url.port - return crt_http.HttpClientConnectionAsync.new( + return crt_http.Http2ClientConnectionAsync.new( bootstrap=self._client_bootstrap, host_name=url.host, port=port, @@ -385,15 +365,14 @@ async def _consume_body_async( self, source: AsyncIterable[bytes], dest: "crt_http.HttpClientStreamAsync" ) -> None: pass - # try: - # asyncio.sleep(1) # Yield control to the event loop - # async for chunk in source: - # await dest.write_data(BytesIO(chunk), False) - # except Exception: - # raise - # finally: - # await close(source) - # await dest.write_data(BytesIO(b''), True) + try: + async for chunk in source: + await dest.write_data(BytesIO(chunk), False) + except Exception: + raise + finally: + await close(source) + await dest.write_data(BytesIO(b''), True) def __deepcopy__(self, memo: Any) -> "AWSCRTHTTPClient": return AWSCRTHTTPClient( From 1ed821cc2c4ae848c0e6e81bbc5fdc4331456eac Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 10 Jun 2025 14:36:38 -0700 Subject: [PATCH 06/12] WIP --- .../smithy-http/src/smithy_http/aio/crt.py | 60 +++++-------------- 1 file changed, 16 insertions(+), 44 deletions(-) diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index ae9a78427..c9a45376e 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -109,42 +109,29 @@ class CRTResponseBody: def __init__(self) -> None: self._stream: crt_http.HttpClientStreamAsync | None = None self._completion_future: AsyncFuture[int] | None = None - self._chunk_futures: deque[ConcurrentFuture[bytes]] = deque() - - # deque is thread safe and the crt is only going to be writing - # with one thread anyway, so we *shouldn't* need to gate this - # behind a lock. In an ideal world, the CRT would expose - # an interface that better matches python's async. - self._received_chunks: deque[bytes] = deque() def set_stream(self, stream: "crt_http.HttpClientStreamAsync") -> None: if self._stream is not None: raise SmithyHTTPException( "Stream already set on AWSCRTHTTPResponse object") self._stream = stream - self._completion_future = stream._completion_future - self._completion_future.add_done_callback(self._on_complete) self._stream.activate() async def next(self) -> bytes: - await self._stream.next() - - def _on_complete( - self, completion_future: AsyncFuture[int] - ) -> None: # pragma: crt-callback - for future in self._chunk_futures: - future.set_result(b"") - self._chunk_futures.clear() + print("CRTResponseBody.next called") + return await self._stream.next() class CRTResponseFactory: - def __init__(self, body: CRTResponseBody) -> None: + def __init__(self, body: CRTResponseBody, stream: "crt_http.HttpClientStreamAsync") -> None: self._body = body - self._response_future = ConcurrentFuture[AWSCRTHTTPResponse]() + self._stream = stream - def on_response( - self, status_code: int, headers: list[tuple[str, str]], **kwargs: Any - ) -> None: # pragma: crt-callback + async def await_response(self) -> AWSCRTHTTPResponse: + status_code = await self._stream.response_status_code() + headers = await self._stream.response_headers() + print(f"Response headers: {headers}") + print(f"status: {status_code}") fields = Fields() for header_name, header_val in headers: try: @@ -155,26 +142,12 @@ def on_response( values=[header_val], kind=FieldPosition.HEADER, ) - - self._response_future.set_result( - AWSCRTHTTPResponse( - status=status_code, - fields=fields, - body=self._body, - ) + return AWSCRTHTTPResponse( + status=status_code, + fields=fields, + body=self._body, ) - async def await_response(self) -> AWSCRTHTTPResponse: - return await asyncio.wrap_future(self._response_future) - - def set_done_callback(self, stream: "crt_http.HttpClientStreamAsync") -> None: - print(stream) - stream._completion_future.add_done_callback(self._cancel) - - def _cancel(self, completion_future: ConcurrentFuture[int | Exception]) -> None: - if not self._response_future.done(): - self._response_future.cancel() - ConnectionPoolKey = tuple[str, str, int | None] ConnectionPoolDict = dict[ConnectionPoolKey, "crt_http.HttpClientConnection"] @@ -223,14 +196,12 @@ async def send( crt_request = self._marshal_request(request) connection = await self._get_connection(request.destination) response_body = CRTResponseBody() - response_factory = CRTResponseFactory(response_body) # TODO: assert the connection is HTTP/2 crt_stream = connection.request( crt_request, - response_factory.on_response, manual_write=True # allow manual stream write. ) - response_factory.set_done_callback(crt_stream) + response_factory = CRTResponseFactory(response_body, crt_stream) response_body.set_stream(crt_stream) body = request.body @@ -364,7 +335,7 @@ def _marshal_request( async def _consume_body_async( self, source: AsyncIterable[bytes], dest: "crt_http.HttpClientStreamAsync" ) -> None: - pass + print("###################### AWSCRTHTTPClient._consume_body_async called") try: async for chunk in source: await dest.write_data(BytesIO(chunk), False) @@ -372,6 +343,7 @@ async def _consume_body_async( raise finally: await close(source) + print("###################### AWSCRTHTTPClient._consume_body_async done") await dest.write_data(BytesIO(b''), True) def __deepcopy__(self, memo: Any) -> "AWSCRTHTTPClient": From 6198e19bd299b7fb53564424e51c9a61a37abd5b Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 10 Jun 2025 14:54:26 -0700 Subject: [PATCH 07/12] printfs --- packages/smithy-http/src/smithy_http/aio/crt.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index c9a45376e..d9df7fbe6 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -335,9 +335,10 @@ def _marshal_request( async def _consume_body_async( self, source: AsyncIterable[bytes], dest: "crt_http.HttpClientStreamAsync" ) -> None: - print("###################### AWSCRTHTTPClient._consume_body_async called") try: async for chunk in source: + print( + "###################### AWSCRTHTTPClient._consume_body_async chunk") await dest.write_data(BytesIO(chunk), False) except Exception: raise From 1109e95d79c5e6062e211dfb32019bb313b8a6ac Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 10 Jun 2025 15:08:38 -0700 Subject: [PATCH 08/12] this is soooooo hard to debug --- packages/smithy-http/src/smithy_http/aio/crt.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index d9df7fbe6..3ee340bfa 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -118,7 +118,6 @@ def set_stream(self, stream: "crt_http.HttpClientStreamAsync") -> None: self._stream.activate() async def next(self) -> bytes: - print("CRTResponseBody.next called") return await self._stream.next() @@ -210,7 +209,7 @@ async def send( # off to CRT. crt_body = BytesIO(body) # TODO handle error, and it returns a future for now. - await crt_stream.write_data(crt_body, True) + await crt_stream.write_data_async(crt_body, True) else: # If the body is async, or potentially very large, start up a task to read # it into the intermediate object that CRT needs. By using @@ -337,15 +336,14 @@ async def _consume_body_async( ) -> None: try: async for chunk in source: - print( - "###################### AWSCRTHTTPClient._consume_body_async chunk") - await dest.write_data(BytesIO(chunk), False) + # "###################### AWSCRTHTTPClient._consume_body_async chunk") + await dest.write_data_async(BytesIO(chunk), False) except Exception: raise finally: await close(source) - print("###################### AWSCRTHTTPClient._consume_body_async done") - await dest.write_data(BytesIO(b''), True) + # print("###################### AWSCRTHTTPClient._consume_body_async done") + await dest.write_data_async(BytesIO(b''), True) def __deepcopy__(self, memo: Any) -> "AWSCRTHTTPClient": return AWSCRTHTTPClient( From cfb90bdae8802c56a0edbd0f297e6c20024d60d0 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Wed, 11 Jun 2025 10:31:01 -0700 Subject: [PATCH 09/12] unneeded --- packages/smithy-http/src/smithy_http/aio/crt.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index 3ee340bfa..0157cd4a9 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -3,12 +3,10 @@ # pyright: reportMissingTypeStubs=false,reportUnknownMemberType=false # flake8: noqa: F811 import asyncio -from asyncio import Future as AsyncFuture from collections import deque from collections.abc import AsyncGenerator, AsyncIterable from concurrent.futures import Future as ConcurrentFuture from copy import deepcopy -from functools import partial from io import BufferedIOBase, BytesIO from typing import TYPE_CHECKING, Any @@ -108,7 +106,6 @@ def __repr__(self) -> str: class CRTResponseBody: def __init__(self) -> None: self._stream: crt_http.HttpClientStreamAsync | None = None - self._completion_future: AsyncFuture[int] | None = None def set_stream(self, stream: "crt_http.HttpClientStreamAsync") -> None: if self._stream is not None: From 9ddefbf6c8ce6967d6c55f29d0056e11aa1fb7ee Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 12 Jun 2025 09:41:24 -0700 Subject: [PATCH 10/12] clean up --- .../smithy-http/src/smithy_http/aio/crt.py | 126 ++---------------- 1 file changed, 14 insertions(+), 112 deletions(-) diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index 0157cd4a9..bb69eaa10 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -3,18 +3,12 @@ # pyright: reportMissingTypeStubs=false,reportUnknownMemberType=false # flake8: noqa: F811 import asyncio -from collections import deque from collections.abc import AsyncGenerator, AsyncIterable -from concurrent.futures import Future as ConcurrentFuture from copy import deepcopy -from io import BufferedIOBase, BytesIO +from io import BytesIO from typing import TYPE_CHECKING, Any if TYPE_CHECKING: - # Both of these are types that essentially are "castable to bytes/memoryview" - # Unfortunately they're not exposed anywhere so we have to import them from - # _typeshed. - from _typeshed import ReadableBuffer, WriteableBuffer # pyright doesn't like optional imports. This is reasonable because if we use these # in type hints then they'd result in runtime errors. @@ -115,7 +109,7 @@ def set_stream(self, stream: "crt_http.HttpClientStreamAsync") -> None: self._stream.activate() async def next(self) -> bytes: - return await self._stream.next() + return await self._stream.get_next_response_chunk() class CRTResponseFactory: @@ -124,10 +118,8 @@ def __init__(self, body: CRTResponseBody, stream: "crt_http.HttpClientStreamAsyn self._stream = stream async def await_response(self) -> AWSCRTHTTPResponse: - status_code = await self._stream.response_status_code() - headers = await self._stream.response_headers() - print(f"Response headers: {headers}") - print(f"status: {status_code}") + status_code = await self._stream.get_response_status_code() + headers = await self._stream.get_response_headers() fields = Fields() for header_name, header_val in headers: try: @@ -192,7 +184,6 @@ async def send( crt_request = self._marshal_request(request) connection = await self._get_connection(request.destination) response_body = CRTResponseBody() - # TODO: assert the connection is HTTP/2 crt_stream = connection.request( crt_request, manual_write=True # allow manual stream write. @@ -205,7 +196,6 @@ async def send( # If the body is already directly in memory, wrap in a BytesIO to hand # off to CRT. crt_body = BytesIO(body) - # TODO handle error, and it returns a future for now. await crt_stream.write_data_async(crt_body, True) else: # If the body is async, or potentially very large, start up a task to read @@ -217,6 +207,7 @@ async def send( body = AsyncBytesReader(body) # Start the read task in the background. + # TODO: consider some better way to convert the AsyncBytesReader to use `write_data_async` read_task = asyncio.create_task( self._consume_body_async(body, crt_stream)) @@ -231,16 +222,15 @@ async def send( async def _create_connection( self, url: core_interfaces.URI - ) -> "crt_http.HttpClientConnection": + ) -> "crt_http.Http2ClientConnectionAsync": """Builds and validates connection to ``url``""" - connect_future = self._build_new_connection(url) - connection = await connect_future + connection = await self._build_new_connection(url) self._validate_connection(connection) return connection async def _get_connection( self, url: core_interfaces.URI - ) -> "crt_http.HttpClientConnection": + ) -> "crt_http.Http2ClientConnectionAsync": # TODO: Use CRT connection pooling instead of this basic kind connection_key = (url.scheme, url.host, url.port) connection = self._connections.get(connection_key) @@ -252,9 +242,9 @@ async def _get_connection( self._connections[connection_key] = connection return connection - def _build_new_connection( + async def _build_new_connection( self, url: core_interfaces.URI - ) -> ConcurrentFuture["crt_http.HttpClientConnection"]: + ) -> "crt_http.Http2ClientConnectionAsync": if url.scheme == "http": port = self._HTTP_PORT tls_connection_options = None @@ -270,8 +260,8 @@ def _build_new_connection( ) if url.port is not None: port = url.port - - return crt_http.Http2ClientConnectionAsync.new( + # TODO: support HTTP/1,1 connections + return await crt_http.Http2ClientConnectionAsync.new( bootstrap=self._client_bootstrap, host_name=url.host, port=port, @@ -299,7 +289,7 @@ def _render_path(self, url: core_interfaces.URI) -> str: def _marshal_request( self, request: http_aio_interfaces.HTTPRequest - ) -> tuple["crt_http.HttpRequest", "BufferableByteStream | BytesIO"]: + ) -> "crt_http_base.HttpRequest": """Create :py:class:`awscrt.http.HttpRequest` from :py:class:`smithy_http.aio.HTTPRequest`""" headers_list = [] @@ -333,103 +323,15 @@ async def _consume_body_async( ) -> None: try: async for chunk in source: - # "###################### AWSCRTHTTPClient._consume_body_async chunk") await dest.write_data_async(BytesIO(chunk), False) except Exception: raise finally: + await dest.write_data_async(BytesIO(b''), True) await close(source) - # print("###################### AWSCRTHTTPClient._consume_body_async done") - await dest.write_data_async(BytesIO(b''), True) def __deepcopy__(self, memo: Any) -> "AWSCRTHTTPClient": return AWSCRTHTTPClient( eventloop=self._eventloop, client_config=deepcopy(self._config), ) - - -# This is adapted from the transcribe streaming sdk -class BufferableByteStream(BufferedIOBase): - """A non-blocking bytes buffer.""" - - def __init__(self) -> None: - # We're always manipulating the front and back of the buffer, so a deque - # will be much more efficient than a list. - self._chunks: deque[bytes] = deque() - self._closed = False - self._done = False - - def read(self, size: int | None = -1) -> bytes: - if self._closed: - return b"" - - if len(self._chunks) == 0: - if self._done: - self.close() - return b"" - else: - # When the CRT recieves this, it'll try again - raise BlockingIOError("read") - - # We could compile all the chunks here instead of just returning - # the one, BUT the CRT will keep calling read until empty bytes - # are returned. So it's actually better to just return one chunk - # since combining them would have some potentially bad memory - # usage issues. - result = self._chunks.popleft() - if size is not None and size > 0: - remainder = result[size:] - result = result[:size] - if remainder: - self._chunks.appendleft(remainder) - - if self._done and len(self._chunks) == 0: - self.close() - - return result - - def read1(self, size: int = -1) -> bytes: - return self.read(size) - - def readinto(self, buffer: "WriteableBuffer") -> int: - if not isinstance(buffer, memoryview): - buffer = memoryview(buffer).cast("B") - - data = self.read(len(buffer)) # type: ignore - n = len(data) - buffer[:n] = data - return n - - def write(self, buffer: "ReadableBuffer") -> int: - if not isinstance(buffer, bytes): - raise ValueError( - f"Unexpected value written to BufferableByteStream. " - f"Only bytes are support but {type(buffer)} was provided." - ) - - if self._closed: - raise OSError( - "Stream is completed and doesn't support further writes.") - - if buffer: - self._chunks.append(buffer) - return len(buffer) - - @property - def closed(self) -> bool: - return self._closed - - def close(self) -> None: - self._closed = True - self._done = True - - # Clear out the remaining chunks so that they don't sit around in memory. - self._chunks.clear() - - def end_stream(self) -> None: - """End the stream, letting any remaining chunks be read before it is closed.""" - if len(self._chunks) == 0: - self.close() - else: - self._done = True From 708cc92ea3557cd38de7296b162f1f86016dc394 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 12 Jun 2025 09:52:53 -0700 Subject: [PATCH 11/12] more clean up --- .../smithy-http/src/smithy_http/aio/crt.py | 73 +++++++------------ 1 file changed, 25 insertions(+), 48 deletions(-) diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index bb69eaa10..58b2520fd 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -57,11 +57,11 @@ def _initialize_default_loop(self) -> "crt_io.ClientBootstrap": class AWSCRTHTTPResponse(http_aio_interfaces.HTTPResponse): - def __init__(self, *, status: int, fields: Fields, body: "CRTResponseBody") -> None: + def __init__(self, *, status: int, fields: Fields, stream: "crt_http.HttpClientStreamAsync") -> None: _assert_crt() self._status = status self._fields = fields - self._body = body + self._stream = stream @property def status(self) -> int: @@ -83,7 +83,7 @@ def reason(self) -> str | None: async def chunks(self) -> AsyncGenerator[bytes, None]: while True: - chunk = await self._body.next() + chunk = await self._stream.get_next_response_chunk() if chunk: yield chunk else: @@ -97,46 +97,6 @@ def __repr__(self) -> str: ) -class CRTResponseBody: - def __init__(self) -> None: - self._stream: crt_http.HttpClientStreamAsync | None = None - - def set_stream(self, stream: "crt_http.HttpClientStreamAsync") -> None: - if self._stream is not None: - raise SmithyHTTPException( - "Stream already set on AWSCRTHTTPResponse object") - self._stream = stream - self._stream.activate() - - async def next(self) -> bytes: - return await self._stream.get_next_response_chunk() - - -class CRTResponseFactory: - def __init__(self, body: CRTResponseBody, stream: "crt_http.HttpClientStreamAsync") -> None: - self._body = body - self._stream = stream - - async def await_response(self) -> AWSCRTHTTPResponse: - status_code = await self._stream.get_response_status_code() - headers = await self._stream.get_response_headers() - fields = Fields() - for header_name, header_val in headers: - try: - fields[header_name].add(header_val) - except KeyError: - fields[header_name] = Field( - name=header_name, - values=[header_val], - kind=FieldPosition.HEADER, - ) - return AWSCRTHTTPResponse( - status=status_code, - fields=fields, - body=self._body, - ) - - ConnectionPoolKey = tuple[str, str, int | None] ConnectionPoolDict = dict[ConnectionPoolKey, "crt_http.HttpClientConnection"] @@ -183,13 +143,11 @@ async def send( """ crt_request = self._marshal_request(request) connection = await self._get_connection(request.destination) - response_body = CRTResponseBody() + crt_stream = connection.request( crt_request, manual_write=True # allow manual stream write. ) - response_factory = CRTResponseFactory(response_body, crt_stream) - response_body.set_stream(crt_stream) body = request.body if isinstance(body, bytes | bytearray): @@ -216,9 +174,28 @@ async def send( self._async_reads.add(read_task) read_task.add_done_callback(self._async_reads.discard) - response = await response_factory.await_response() + return await self.await_response(crt_stream) - return response + async def await_response( + self, stream: "crt_http.HttpClientStreamAsync" + ) -> AWSCRTHTTPResponse: + status_code = await stream.get_response_status_code() + headers = await stream.get_response_headers() + fields = Fields() + for header_name, header_val in headers: + try: + fields[header_name].add(header_val) + except KeyError: + fields[header_name] = Field( + name=header_name, + values=[header_val], + kind=FieldPosition.HEADER, + ) + return AWSCRTHTTPResponse( + status=status_code, + fields=fields, + stream=stream, + ) async def _create_connection( self, url: core_interfaces.URI From ec2e0766cf54c077f0d90a822c8570a54852dec0 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 12 Jun 2025 10:12:15 -0700 Subject: [PATCH 12/12] private --- packages/smithy-http/src/smithy_http/aio/crt.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/smithy-http/src/smithy_http/aio/crt.py b/packages/smithy-http/src/smithy_http/aio/crt.py index 58b2520fd..734565d01 100644 --- a/packages/smithy-http/src/smithy_http/aio/crt.py +++ b/packages/smithy-http/src/smithy_http/aio/crt.py @@ -165,7 +165,6 @@ async def send( body = AsyncBytesReader(body) # Start the read task in the background. - # TODO: consider some better way to convert the AsyncBytesReader to use `write_data_async` read_task = asyncio.create_task( self._consume_body_async(body, crt_stream)) @@ -174,9 +173,9 @@ async def send( self._async_reads.add(read_task) read_task.add_done_callback(self._async_reads.discard) - return await self.await_response(crt_stream) + return await self._await_response(crt_stream) - async def await_response( + async def _await_response( self, stream: "crt_http.HttpClientStreamAsync" ) -> AWSCRTHTTPResponse: status_code = await stream.get_response_status_code()