Skip to content

Commit

Permalink
feat(body): rename Entity to Payload
Browse files Browse the repository at this point in the history
Closes #1464
  • Loading branch information
seanmonstar committed Apr 10, 2018
1 parent 313f82d commit dfdca25
Show file tree
Hide file tree
Showing 18 changed files with 105 additions and 160 deletions.
5 changes: 2 additions & 3 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn get_one_at_a_time(b: &mut test::Bencher) {
b.iter(move || {
client.get(url.clone())
.and_then(|res| {
res.into_body().into_stream().for_each(|_chunk| {
res.into_body().for_each(|_chunk| {
Ok(())
})
})
Expand All @@ -55,7 +55,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
*req.method_mut() = Method::POST;
*req.uri_mut() = url.clone();
client.request(req).and_then(|res| {
res.into_body().into_stream().for_each(|_chunk| {
res.into_body().for_each(|_chunk| {
Ok(())
})
}).wait().expect("client wait");
Expand All @@ -75,7 +75,6 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {

let service = const_service(service_fn(|req: Request<Body>| {
req.into_body()
.into_stream()
.concat2()
.map(|_| {
Response::new(Body::from(PHRASE))
Expand Down
2 changes: 1 addition & 1 deletion examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn main() {
println!("Response: {}", res.status());
println!("Headers: {:#?}", res.headers());

res.into_body().into_stream().for_each(|chunk| {
res.into_body().for_each(|chunk| {
io::stdout().write_all(&chunk)
.map_err(|e| panic!("example expects stdout is open, error={}", e))
})
Expand Down
2 changes: 1 addition & 1 deletion examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Service for ParamExample {
Box::new(futures::future::ok(Response::new(INDEX.into())))
},
(&Method::POST, "/post") => {
Box::new(req.into_parts().1.into_stream().concat2().map(|b| {
Box::new(req.into_body().concat2().map(|b| {
// Parse the request body. form_urlencoded::parse
// always succeeds, but in general parsing may
// fail (for example, an invalid post of json), so
Expand Down
4 changes: 2 additions & 2 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Service for ResponseExamples {
let web_res_future = client.request(req);

Box::new(web_res_future.map(|web_res| {
let body = Body::wrap_stream(web_res.into_body().into_stream().map(|b| {
let body = Body::wrap_stream(web_res.into_body().map(|b| {
Chunk::from(format!("before: '{:?}'<br>after: '{:?}'",
std::str::from_utf8(LOWERCASE).unwrap(),
std::str::from_utf8(&b).unwrap()))
Expand All @@ -54,7 +54,7 @@ impl Service for ResponseExamples {
},
(&Method::POST, "/web_api") => {
// A web api to run against. Simple upcasing of the body.
let body = Body::wrap_stream(req.into_body().into_stream().map(|chunk| {
let body = Body::wrap_stream(req.into_body().map(|chunk| {
let upper = chunk.iter().map(|byte| byte.to_ascii_uppercase())
.collect::<Vec<u8>>();
Chunk::from(upper)
Expand Down
85 changes: 16 additions & 69 deletions src/proto/body.rs → src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::Chunk;
type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;

/// This trait represents a streaming body of a `Request` or `Response`.
pub trait Entity {
pub trait Payload {
/// A buffer of bytes representing a single chunk of a body.
type Data: AsRef<[u8]>;

Expand Down Expand Up @@ -63,7 +63,7 @@ pub trait Entity {
}
}

impl<E: Entity> Entity for Box<E> {
impl<E: Payload> Payload for Box<E> {
type Data = E::Data;
type Error = E::Error;

Expand All @@ -84,43 +84,10 @@ impl<E: Entity> Entity for Box<E> {
}
}

/// A wrapper to consume an `Entity` as a futures `Stream`.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct EntityStream<E> {
is_data_eof: bool,
entity: E,
}

impl<E: Entity> Stream for EntityStream<E> {
type Item = E::Data;
type Error = E::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if self.is_data_eof {
return self.entity.poll_trailers()
.map(|async| {
async.map(|_opt| {
// drop the trailers and return that Stream is done
None
})
});
}

let opt = try_ready!(self.entity.poll_data());
if let Some(data) = opt {
return Ok(Async::Ready(Some(data)));
} else {
self.is_data_eof = true;
}
}
}
}

/// An `Entity` of `Chunk`s, used when receiving bodies.
/// A `Payload` of `Chunk`s, used when receiving bodies.
///
/// Also a good default `Entity` to use in many applications.
/// Also a good default `Payload` to use in many applications.
#[must_use = "streams do nothing unless polled"]
pub struct Body {
kind: Kind,
Expand Down Expand Up @@ -229,35 +196,6 @@ impl Body {
Body::new(Kind::Wrapped(Box::new(mapped)))
}

/// Convert this `Body` into a `Stream<Item=Chunk, Error=hyper::Error>`.
///
/// # Example
///
/// ```
/// # extern crate futures;
/// # extern crate hyper;
/// # use futures::{Future, Stream};
/// # use hyper::{Body, Request};
/// # fn request_concat(some_req: Request<Body>) {
/// let req: Request<Body> = some_req;
/// let body = req.into_body();
///
/// let stream = body.into_stream();
/// stream.concat2()
/// .map(|buf| {
/// println!("body length: {}", buf.len());
/// });
/// # }
/// # fn main() {}
/// ```
#[inline]
pub fn into_stream(self) -> EntityStream<Body> {
EntityStream {
is_data_eof: false,
entity: self,
}
}

/// Returns if this body was constructed via `Body::empty()`.
///
/// # Note
Expand Down Expand Up @@ -345,7 +283,7 @@ impl Default for Body {
}
}

impl Entity for Body {
impl Payload for Body {
type Data = Chunk;
type Error = ::Error;

Expand Down Expand Up @@ -373,6 +311,15 @@ impl Entity for Body {
}
}

impl Stream for Body {
type Item = Chunk;
type Error = ::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.poll_data()
}
}

impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Body")
Expand Down Expand Up @@ -489,10 +436,10 @@ fn test_body_stream_concat() {

let body = Body::from("hello world");

let total = body.into_stream()
let total = body
.concat2()
.wait()
.unwrap();
assert_eq!(total.as_ref(), b"hello world");

}

File renamed without changes.
26 changes: 13 additions & 13 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use futures::{Async, Future, Poll};
use futures::future::{self, Either};
use tokio_io::{AsyncRead, AsyncWrite};

use body::Payload;
use proto;
use proto::body::Entity;
use super::dispatch;
use {Body, Request, Response, StatusCode};

Expand Down Expand Up @@ -45,7 +45,7 @@ pub struct SendRequest<B> {
pub struct Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity + 'static,
B: Payload + 'static,
{
inner: proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
Expand Down Expand Up @@ -138,7 +138,7 @@ impl<B> SendRequest<B>

impl<B> SendRequest<B>
where
B: Entity + 'static,
B: Payload + 'static,
{
/// Sends a `Request` on the associated connection.
///
Expand Down Expand Up @@ -262,7 +262,7 @@ impl<B> fmt::Debug for SendRequest<B> {
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity + 'static,
B: Payload + 'static,
{
/// Return the inner IO object, and additional information.
pub fn into_parts(self) -> Parts<T> {
Expand All @@ -289,7 +289,7 @@ where
impl<T, B> Future for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity + 'static,
B: Payload + 'static,
{
type Item = ();
type Error = ::Error;
Expand All @@ -302,7 +302,7 @@ where
impl<T, B> fmt::Debug for Connection<T, B>
where
T: AsyncRead + AsyncWrite + fmt::Debug,
B: Entity + 'static,
B: Payload + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Connection")
Expand Down Expand Up @@ -331,7 +331,7 @@ impl Builder {
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity + 'static,
B: Payload + 'static,
{
Handshake {
inner: HandshakeInner {
Expand All @@ -345,7 +345,7 @@ impl Builder {
pub(super) fn handshake_no_upgrades<T, B>(&self, io: T) -> HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity + 'static,
B: Payload + 'static,
{
HandshakeNoUpgrades {
inner: HandshakeInner {
Expand All @@ -362,7 +362,7 @@ impl Builder {
impl<T, B> Future for Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity + 'static,
B: Payload + 'static,
{
type Item = (SendRequest<B>, Connection<T, B>);
type Error = ::Error;
Expand All @@ -387,7 +387,7 @@ impl<T, B> fmt::Debug for Handshake<T, B> {
impl<T, B> Future for HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity + 'static,
B: Payload + 'static,
{
type Item = (SendRequest<B>, proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
Expand All @@ -406,7 +406,7 @@ where
impl<T, B, R> Future for HandshakeInner<T, B, R>
where
T: AsyncRead + AsyncWrite,
B: Entity + 'static,
B: Payload + 'static,
R: proto::Http1Transaction<
Incoming=StatusCode,
Outgoing=proto::RequestLine,
Expand Down Expand Up @@ -470,15 +470,15 @@ impl<B: Send> AssertSendSync for SendRequest<B> {}
impl<T: Send, B: Send> AssertSend for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity + 'static,
B: Payload + 'static,
B::Data: Send + 'static,
{}

#[doc(hidden)]
impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity + 'static,
B: Payload + 'static,
B::Data: Send + Sync + 'static,
{}

Expand Down
Loading

0 comments on commit dfdca25

Please sign in to comment.