diff --git a/src/body/body.rs b/src/body/body.rs index 111867a5ed..9164320a0a 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -323,7 +323,12 @@ impl Body { ping.record_data(bytes.len()); Poll::Ready(Some(Ok(bytes))) } - Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), + Some(Err(e)) => match e.reason() { + // These reasons should cause stop of body reading, but nor fail it. + // The same logic as for `AsyncRead for H2Upgraded` is applied here. + Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(None), + _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))), + }, None => Poll::Ready(None), }, diff --git a/src/proto/mod.rs b/src/proto/mod.rs index f938bf532b..3628576dc1 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -50,7 +50,7 @@ pub(crate) enum BodyLength { Unknown, } -/// Status of when a Disaptcher future completes. +/// Status of when a Dispatcher future completes. pub(crate) enum Dispatched { /// Dispatcher completely shutdown connection. Shutdown, diff --git a/tests/client.rs b/tests/client.rs index 2953313798..88fcd3a564 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -3154,6 +3154,61 @@ mod conn { .expect("client should be open"); } + #[tokio::test] + async fn http2_responds_before_consuming_request_body() { + // Test that a early-response from server works correctly (request body wasn't fully consumed). + // https://github.com/hyperium/hyper/issues/2872 + use hyper::service::service_fn; + + let _ = pretty_env_logger::try_init(); + + let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); + + // Spawn an HTTP2 server that responds before reading the whole request body. + // It's normal case to decline the request due to headers or size of the body. + tokio::spawn(async move { + let sock = listener.accept().await.unwrap().0; + hyper::server::conn::Http::new() + .http2_only(true) + .serve_connection( + sock, + service_fn(|_req| async move { + Ok::<_, hyper::Error>(http::Response::new(hyper::Body::from( + "No bread for you!", + ))) + }), + ) + .await + .expect("serve_connection"); + }); + + let io = tcp_connect(&addr).await.expect("tcp connect"); + let (mut client, conn) = conn::Builder::new() + .http2_only(true) + .handshake::<_, Body>(io) + .await + .expect("http handshake"); + + tokio::spawn(async move { + conn.await.expect("client conn shouldn't error"); + }); + + // Use a channel to keep request stream open + let (_tx, body) = hyper::Body::channel(); + let req = Request::post("/a").body(body).unwrap(); + let resp = client.send_request(req).await.expect("send_request"); + assert!(resp.status().is_success()); + + let body = hyper::body::to_bytes(resp.into_body()) + .await + .expect("get response body with no error"); + + assert_eq!(body.as_ref(), b"No bread for you!"); + } + #[tokio::test] async fn h2_connect() { let _ = pretty_env_logger::try_init();