From 569106ae10a34e735186c24655a8a0e953a65ea0 Mon Sep 17 00:00:00 2001 From: tottoto Date: Mon, 1 Apr 2024 01:19:26 +0900 Subject: [PATCH] refactor converting body to data stream Signed-off-by: tottoto --- kube-client/src/client/body.rs | 18 +++++++----------- kube-client/src/client/mod.rs | 26 +++++++++----------------- 2 files changed, 16 insertions(+), 28 deletions(-) diff --git a/kube-client/src/client/body.rs b/kube-client/src/client/body.rs index 1d4186992..bc81121d5 100644 --- a/kube-client/src/client/body.rs +++ b/kube-client/src/client/body.rs @@ -44,6 +44,13 @@ impl Body { { Body::new(Kind::Wrap(body.map_err(Into::into).boxed_unsync())) } + + pub(crate) fn into_stream(self) -> BodyDataStream + where + Self: Sized, + { + BodyDataStream::new(self) + } } impl From for Body { @@ -136,14 +143,3 @@ where } } } - -pub trait IntoBodyDataStream: HttpBody { - fn into_stream(self) -> BodyDataStream - where - Self: Sized, - { - BodyDataStream::new(self) - } -} - -impl IntoBodyDataStream for T where T: HttpBody {} diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index d17da9679..b284816c8 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -31,8 +31,6 @@ use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result}; mod auth; mod body; mod builder; -// Add `into_stream()` to `http::Body` -use body::IntoBodyDataStream as _; #[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))] #[cfg(feature = "unstable-client")] mod client_ext; @@ -271,10 +269,7 @@ impl Client { let res = handle_api_errors(res).await?; // Map the error, since we want to convert this into an `AsyncBufReader` using // `into_async_read` which specifies `std::io::Error` as the stream's error type. - let body = BodyExt::map_err(res.into_body(), |e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - }) - .into_stream(); + let body = res.into_body().into_stream().map_err(std::io::Error::other); Ok(body.into_async_read()) } @@ -314,17 +309,14 @@ impl Client { tracing::trace!("headers: {:?}", res.headers()); let frames = FramedRead::new( - StreamReader::new( - BodyExt::map_err(res.into_body(), |e| { - // Unexpected EOF from chunked decoder. - // Tends to happen when watching for 300+s. This will be ignored. - if e.to_string().contains("unexpected EOF during chunk") { - return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e); - } - std::io::Error::new(std::io::ErrorKind::Other, e) - }) - .into_stream(), - ), + StreamReader::new(res.into_body().into_stream().map_err(|e| { + // Unexpected EOF from chunked decoder. + // Tends to happen when watching for 300+s. This will be ignored. + if e.to_string().contains("unexpected EOF during chunk") { + return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e); + } + std::io::Error::other(e) + })), LinesCodec::new(), );