diff --git a/src/http.rs b/src/http.rs index 08ba686..dbdc73c 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,9 +1,9 @@ use crate::executor; use core::fmt; -use std::io; use std::pin::Pin; use std::task::{Context, Poll}; +use std::{cmp, debug_assert, io}; use futures_core::Stream; use hyper::body::HttpBody; @@ -109,6 +109,14 @@ impl Body { { Body(hyper::Body::wrap_stream(ReaderStream::new(reader))) } + + /// Create a [`BodyReader`] that implements [`std::io::Read`]. + pub fn reader(&mut self) -> BodyReader<'_> { + BodyReader { + body: self, + prev_bytes: Bytes::new(), + } + } } impl From for Body @@ -134,6 +142,48 @@ impl Iterator for Body { } } +/// Wraps [`Body`] and implements [`std::io::Read`] +pub struct BodyReader<'b> { + body: &'b mut Body, + prev_bytes: Bytes, +} + +impl<'b> std::io::Read for BodyReader<'b> { + fn read(&mut self, mut buf: &mut [u8]) -> io::Result { + let mut written = 0; + loop { + if buf.is_empty() { + return Ok(written); + } + + if !self.prev_bytes.is_empty() { + let chunk_size = cmp::min(buf.len(), self.prev_bytes.len()); + let prev_bytes_rest = self.prev_bytes.split_to(chunk_size); + buf[..chunk_size].copy_from_slice(&self.prev_bytes[..chunk_size]); + self.prev_bytes = prev_bytes_rest; + buf = &mut buf[chunk_size..]; + written += chunk_size; + continue; + } + + if written != 0 { + // pulling from an interator can block, and we have something to return + // already, so return it + return Ok(written); + } + + debug_assert!(self.prev_bytes.is_empty()); + debug_assert!(written == 0); + + self.prev_bytes = if let Some(next) = self.body.next() { + next? + } else { + return Ok(written); + } + } + } +} + impl fmt::Debug for Body { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f)