Skip to content

Commit

Permalink
feat(body): rename Recv to Incoming (#3022)
Browse files Browse the repository at this point in the history
The concrete "recv stream" body type is renamed to `Incoming`.

Closes #2971
  • Loading branch information
seanmonstar committed Oct 25, 2022
1 parent 0888623 commit 95a153b
Show file tree
Hide file tree
Showing 29 changed files with 164 additions and 157 deletions.
7 changes: 4 additions & 3 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ use std::net::SocketAddr;

use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::body::Body as _;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Recv, Request, Response, StatusCode};
use hyper::{body::Body, Method, Request, Response, StatusCode};
use tokio::net::TcpListener;

/// This is our service handler. It receives a Request, routes on its
/// path, and returns a Future of a Response.
async fn echo(req: Request<Recv>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
async fn echo(
req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match (req.method(), req.uri().path()) {
// Serve some instructions at /
(&Method::GET, "/") => Ok(Response::new(full(
Expand Down
4 changes: 2 additions & 2 deletions examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use bytes::Bytes;
use http_body_util::Full;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Recv, Request, Response};
use hyper::{Request, Response};
use tokio::net::TcpListener;

async fn hello(_: Request<Recv>) -> Result<Response<Full<Bytes>>, Infallible> {
async fn hello(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello World!"))))
}

Expand Down
6 changes: 4 additions & 2 deletions examples/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use hyper::client::conn::http1::Builder;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::upgrade::Upgraded;
use hyper::{Method, Recv, Request, Response};
use hyper::{Method, Request, Response};

use tokio::net::{TcpListener, TcpStream};

Expand Down Expand Up @@ -43,7 +43,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

async fn proxy(req: Request<Recv>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
async fn proxy(
req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
println!("req: {:?}", req);

if Method::CONNECT == req.method() {
Expand Down
6 changes: 3 additions & 3 deletions examples/multi_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ use futures_util::future::join;
use http_body_util::Full;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Recv, Request, Response};
use hyper::{Request, Response};
use tokio::net::TcpListener;

static INDEX1: &[u8] = b"The 1st service!";
static INDEX2: &[u8] = b"The 2nd service!";

async fn index1(_: Request<Recv>) -> Result<Response<Full<Bytes>>, hyper::Error> {
async fn index1(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::new(Full::new(Bytes::from(INDEX1))))
}

async fn index2(_: Request<Recv>) -> Result<Response<Full<Bytes>>, hyper::Error> {
async fn index2(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::new(Full::new(Bytes::from(INDEX2))))
}

Expand Down
4 changes: 2 additions & 2 deletions examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Recv, Request, Response, StatusCode};
use hyper::{Method, Request, Response, StatusCode};
use tokio::net::TcpListener;

use std::collections::HashMap;
Expand All @@ -19,7 +19,7 @@ static NOTNUMERIC: &[u8] = b"Number field is not numeric";

// Using service_fn, we can turn this function into a `Service`.
async fn param_example(
req: Request<Recv>,
req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, Infallible>>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(full(INDEX))),
Expand Down
4 changes: 2 additions & 2 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::net::TcpListener;
use bytes::Bytes;
use http_body_util::Full;
use hyper::service::service_fn;
use hyper::{Method, Recv, Request, Response, Result, StatusCode};
use hyper::{Method, Request, Response, Result, StatusCode};

static INDEX: &str = "examples/send_file_index.html";
static NOTFOUND: &[u8] = b"Not Found";
Expand Down Expand Up @@ -36,7 +36,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
}
}

async fn response_examples(req: Request<Recv>) -> Result<Response<Full<Bytes>>> {
async fn response_examples(req: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/index.html") => simple_file_send(INDEX).await,
(&Method::GET, "/no_file.html") => {
Expand Down
6 changes: 3 additions & 3 deletions examples/service_struct_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bytes::Bytes;
use http_body_util::Full;
use hyper::server::conn::http1;
use hyper::service::Service;
use hyper::{Recv, Request, Response};
use hyper::{body::Incoming as IncomingBody, Request, Response};
use tokio::net::TcpListener;

use std::future::Future;
Expand Down Expand Up @@ -36,12 +36,12 @@ struct Svc {
counter: Counter,
}

impl Service<Request<Recv>> for Svc {
impl Service<Request<IncomingBody>> for Svc {
type Response = Response<Full<Bytes>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn call(&mut self, req: Request<Recv>) -> Self::Future {
fn call(&mut self, req: Request<IncomingBody>) -> Self::Future {
fn mk_response(s: String) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::builder().body(Full::new(Bytes::from(s))).unwrap())
}
Expand Down
4 changes: 2 additions & 2 deletions examples/upgrades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hyper::header::{HeaderValue, UPGRADE};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::upgrade::Upgraded;
use hyper::{Recv, Request, Response, StatusCode};
use hyper::{Request, Response, StatusCode};

// A simple type alias so as to DRY.
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand All @@ -38,7 +38,7 @@ async fn server_upgraded_io(mut upgraded: Upgraded) -> Result<()> {
}

/// Our server HTTP handler to initiate HTTP upgrades.
async fn server_upgrade(mut req: Request<Recv>) -> Result<Response<Empty<Bytes>>> {
async fn server_upgrade(mut req: Request<hyper::body::Incoming>) -> Result<Response<Empty<Bytes>>> {
let mut res = Response::new(Empty::new());

// Send a 400 to any request that doesn't have
Expand Down
6 changes: 3 additions & 3 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bytes::{Buf, Bytes};
use http_body_util::{BodyExt, Full};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{header, Method, Recv, Request, Response, StatusCode};
use hyper::{body::Incoming as IncomingBody, header, Method, Request, Response, StatusCode};
use tokio::net::{TcpListener, TcpStream};

type GenericError = Box<dyn std::error::Error + Send + Sync>;
Expand Down Expand Up @@ -46,7 +46,7 @@ async fn client_request_response() -> Result<Response<BoxBody>> {
Ok(Response::new(res_body))
}

async fn api_post_response(req: Request<Recv>) -> Result<Response<BoxBody>> {
async fn api_post_response(req: Request<IncomingBody>) -> Result<Response<BoxBody>> {
// Aggregate the body...
let whole_body = req.collect().await?.aggregate();
// Decode as JSON...
Expand Down Expand Up @@ -77,7 +77,7 @@ async fn api_get_response() -> Result<Response<BoxBody>> {
Ok(res)
}

async fn response_examples(req: Request<Recv>) -> Result<Response<BoxBody>> {
async fn response_examples(req: Request<IncomingBody>) -> Result<Response<BoxBody>> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/index.html") => Ok(Response::new(full(INDEX))),
(&Method::GET, "/test.html") => client_request_response().await,
Expand Down
57 changes: 29 additions & 28 deletions src/body/body.rs → src/body/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type TrailersSender = oneshot::Sender<HeaderMap>;

/// A stream of `Bytes`, used when receiving bodies from the network.
#[must_use = "streams do nothing unless polled"]
pub struct Recv {
pub struct Incoming {
kind: Kind,
}

Expand Down Expand Up @@ -65,17 +65,17 @@ pub(crate) struct Sender {
const WANT_PENDING: usize = 1;
const WANT_READY: usize = 2;

impl Recv {
impl Incoming {
/// Create a `Body` stream with an associated sender half.
///
/// Useful when wanting to stream chunks from another thread.
#[inline]
#[allow(unused)]
pub(crate) fn channel() -> (Sender, Recv) {
pub(crate) fn channel() -> (Sender, Incoming) {
Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
}

pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Recv) {
pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
let (data_tx, data_rx) = mpsc::channel(0);
let (trailers_tx, trailers_rx) = oneshot::channel();

Expand All @@ -90,7 +90,7 @@ impl Recv {
data_tx,
trailers_tx: Some(trailers_tx),
};
let rx = Recv::new(Kind::Chan {
let rx = Incoming::new(Kind::Chan {
content_length,
want_tx,
data_rx,
Expand All @@ -100,18 +100,18 @@ impl Recv {
(tx, rx)
}

fn new(kind: Kind) -> Recv {
Recv { kind }
fn new(kind: Kind) -> Incoming {
Incoming { kind }
}

#[allow(dead_code)]
pub(crate) fn empty() -> Recv {
Recv::new(Kind::Empty)
pub(crate) fn empty() -> Incoming {
Incoming::new(Kind::Empty)
}

#[cfg(feature = "ffi")]
pub(crate) fn ffi() -> Recv {
Recv::new(Kind::Ffi(crate::ffi::UserBody::new()))
pub(crate) fn ffi() -> Incoming {
Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
}

#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Expand All @@ -125,7 +125,7 @@ impl Recv {
if !content_length.is_exact() && recv.is_end_stream() {
content_length = DecodedLength::ZERO;
}
let body = Recv::new(Kind::H2 {
let body = Incoming::new(Kind::H2 {
data_done: false,
ping,
content_length,
Expand All @@ -151,7 +151,7 @@ impl Recv {
}
}

impl Body for Recv {
impl Body for Incoming {
type Data = Bytes;
type Error = crate::Error;

Expand Down Expand Up @@ -259,7 +259,7 @@ impl Body for Recv {
}
}

impl fmt::Debug for Recv {
impl fmt::Debug for Incoming {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[derive(Debug)]
struct Streaming;
Expand Down Expand Up @@ -375,15 +375,15 @@ mod tests {
use std::mem;
use std::task::Poll;

use super::{Body, DecodedLength, Recv, Sender, SizeHint};
use super::{Body, DecodedLength, Incoming, Sender, SizeHint};
use http_body_util::BodyExt;

#[test]
fn test_size_of() {
// These are mostly to help catch *accidentally* increasing
// the size by too much.

let body_size = mem::size_of::<Recv>();
let body_size = mem::size_of::<Incoming>();
let body_expected_size = mem::size_of::<u64>() * 5;
assert!(
body_size <= body_expected_size,
Expand All @@ -392,7 +392,7 @@ mod tests {
body_expected_size,
);

//assert_eq!(body_size, mem::size_of::<Option<Recv>>(), "Option<Recv>");
//assert_eq!(body_size, mem::size_of::<Option<Incoming>>(), "Option<Incoming>");

assert_eq!(
mem::size_of::<Sender>(),
Expand All @@ -409,18 +409,18 @@ mod tests {

#[test]
fn size_hint() {
fn eq(body: Recv, b: SizeHint, note: &str) {
fn eq(body: Incoming, b: SizeHint, note: &str) {
let a = body.size_hint();
assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
}

eq(Recv::empty(), SizeHint::with_exact(0), "empty");
eq(Incoming::empty(), SizeHint::with_exact(0), "empty");

eq(Recv::channel().1, SizeHint::new(), "channel");
eq(Incoming::channel().1, SizeHint::new(), "channel");

eq(
Recv::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
SizeHint::with_exact(4),
"channel with length",
);
Expand All @@ -429,7 +429,7 @@ mod tests {
#[cfg(not(miri))]
#[tokio::test]
async fn channel_abort() {
let (tx, mut rx) = Recv::channel();
let (tx, mut rx) = Incoming::channel();

tx.abort();

Expand All @@ -440,7 +440,7 @@ mod tests {
#[cfg(all(not(miri), feature = "http1"))]
#[tokio::test]
async fn channel_abort_when_buffer_is_full() {
let (mut tx, mut rx) = Recv::channel();
let (mut tx, mut rx) = Incoming::channel();

tx.try_send_data("chunk 1".into()).expect("send 1");
// buffer is full, but can still send abort
Expand All @@ -462,7 +462,7 @@ mod tests {
#[cfg(feature = "http1")]
#[test]
fn channel_buffers_one() {
let (mut tx, _rx) = Recv::channel();
let (mut tx, _rx) = Incoming::channel();

tx.try_send_data("chunk 1".into()).expect("send 1");

Expand All @@ -474,14 +474,14 @@ mod tests {
#[cfg(not(miri))]
#[tokio::test]
async fn channel_empty() {
let (_, mut rx) = Recv::channel();
let (_, mut rx) = Incoming::channel();

assert!(rx.frame().await.is_none());
}

#[test]
fn channel_ready() {
let (mut tx, _rx) = Recv::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);

let mut tx_ready = tokio_test::task::spawn(tx.ready());

Expand All @@ -490,7 +490,8 @@ mod tests {

#[test]
fn channel_wanter() {
let (mut tx, mut rx) = Recv::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
let (mut tx, mut rx) =
Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);

let mut tx_ready = tokio_test::task::spawn(tx.ready());
let mut rx_data = tokio_test::task::spawn(rx.frame());
Expand All @@ -511,7 +512,7 @@ mod tests {

#[test]
fn channel_notices_closure() {
let (mut tx, rx) = Recv::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);

let mut tx_ready = tokio_test::task::spawn(tx.ready());

Expand Down
Loading

0 comments on commit 95a153b

Please sign in to comment.