Skip to content

Commit

Permalink
refactor converting body to data stream
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Mar 31, 2024
1 parent 02c99f2 commit a019950
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 28 deletions.
18 changes: 7 additions & 11 deletions kube-client/src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ impl Body {
{
Body::new(Kind::Wrap(body.map_err(Into::into).boxed_unsync()))
}

pub(crate) fn into_stream(self) -> BodyDataStream<Self>
where
Self: Sized,
{
BodyDataStream::new(self)
}
}

impl From<Bytes> for Body {
Expand Down Expand Up @@ -136,14 +143,3 @@ where
}
}
}

pub trait IntoBodyDataStream: HttpBody {
fn into_stream(self) -> BodyDataStream<Self>
where
Self: Sized,
{
BodyDataStream::new(self)
}
}

impl<T> IntoBodyDataStream for T where T: HttpBody {}
26 changes: 9 additions & 17 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result};
mod auth;
mod body;
mod builder;
// Add `into_stream()` to `http::Body`
use body::IntoBodyDataStream as _;
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
#[cfg(feature = "unstable-client")]
mod client_ext;
Expand Down Expand Up @@ -271,10 +269,7 @@ impl Client {
let res = handle_api_errors(res).await?;
// Map the error, since we want to convert this into an `AsyncBufReader` using
// `into_async_read` which specifies `std::io::Error` as the stream's error type.
let body = BodyExt::map_err(res.into_body(), |e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})
.into_stream();
let body = res.into_body().into_stream().map_err(std::io::Error::other);
Ok(body.into_async_read())
}

Expand Down Expand Up @@ -314,17 +309,14 @@ impl Client {
tracing::trace!("headers: {:?}", res.headers());

let frames = FramedRead::new(
StreamReader::new(
BodyExt::map_err(res.into_body(), |e| {
// Unexpected EOF from chunked decoder.
// Tends to happen when watching for 300+s. This will be ignored.
if e.to_string().contains("unexpected EOF during chunk") {
return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
}
std::io::Error::new(std::io::ErrorKind::Other, e)
})
.into_stream(),
),
StreamReader::new(res.into_body().into_stream().map_err(|e| {
// Unexpected EOF from chunked decoder.
// Tends to happen when watching for 300+s. This will be ignored.
if e.to_string().contains("unexpected EOF during chunk") {
return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);

Check warning on line 316 in kube-client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/mod.rs#L315-L316

Added lines #L315 - L316 were not covered by tests
}
std::io::Error::other(e)

Check warning on line 318 in kube-client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/mod.rs#L318

Added line #L318 was not covered by tests
})),
LinesCodec::new(),
);

Expand Down

0 comments on commit a019950

Please sign in to comment.