From 0766d3f78d116ea243222cea134cfe7f418e6a3c Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 17 Oct 2022 13:23:47 -0400 Subject: [PATCH] feat(server): remove `server::conn::{Http, Connection}` types (#3013) The connection types have been split into version-specific types, in the `server::conn::{http1, http2}` modules. Closes #3012 BREAKING CHANGE: Either choose a version-specific `Connection` type, or look for the auto-version type in `hyper-util`. --- benches/pipeline.rs | 4 +- benches/server.rs | 4 +- examples/echo.rs | 7 +- examples/gateway.rs | 7 +- examples/hello.rs | 4 +- examples/http_proxy.rs | 4 +- examples/multi_server.rs | 6 +- examples/params.rs | 4 +- examples/send_file.rs | 4 +- examples/service_struct_impl.rs | 4 +- examples/single_threaded.rs | 7 +- examples/state.rs | 7 +- examples/upgrades.rs | 4 +- examples/web_api.rs | 7 +- src/common/exec.rs | 2 +- src/common/io/rewind.rs | 4 +- src/server/conn/http1.rs | 71 +-- src/server/conn/http2.rs | 24 - src/server/conn/mod.rs | 1054 +------------------------------ tests/client.rs | 11 +- tests/server.rs | 144 ++--- tests/support/mod.rs | 36 +- 22 files changed, 159 insertions(+), 1260 deletions(-) diff --git a/benches/pipeline.rs b/benches/pipeline.rs index 29625edfa0..a60100fa51 100644 --- a/benches/pipeline.rs +++ b/benches/pipeline.rs @@ -14,7 +14,7 @@ use http_body_util::Full; use tokio::net::TcpListener; use tokio::sync::oneshot; -use hyper::server::conn::Http; +use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::Response; @@ -41,7 +41,7 @@ fn hello_world_16(b: &mut test::Bencher) { loop { let (stream, _addr) = listener.accept().await.expect("accept"); - Http::new() + http1::Builder::new() .pipeline_flush(true) .serve_connection( stream, diff --git a/benches/server.rs b/benches/server.rs index 1b7f050fac..b387918800 100644 --- a/benches/server.rs +++ b/benches/server.rs @@ -13,7 +13,7 @@ use futures_util::{stream, StreamExt}; use http_body_util::{BodyExt, Full, StreamBody}; use tokio::sync::oneshot; -use hyper::server::conn::Http; +use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::Response; @@ -38,7 +38,7 @@ macro_rules! bench_server { loop { let (stream, _) = listener.accept().await.expect("accept"); - Http::new() + http1::Builder::new() .serve_connection( stream, service_fn(|_| async { diff --git a/examples/echo.rs b/examples/echo.rs index b2b601724b..ba5096e7a6 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -5,7 +5,7 @@ use std::net::SocketAddr; use bytes::Bytes; use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; use hyper::body::Body as _; -use hyper::server::conn::Http; +use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Recv, Request, Response, StatusCode}; use tokio::net::TcpListener; @@ -87,7 +87,10 @@ async fn main() -> Result<(), Box> { let (stream, _) = listener.accept().await?; tokio::task::spawn(async move { - if let Err(err) = Http::new().serve_connection(stream, service_fn(echo)).await { + if let Err(err) = http1::Builder::new() + .serve_connection(stream, service_fn(echo)) + .await + { println!("Error serving connection: {:?}", err); } }); diff --git a/examples/gateway.rs b/examples/gateway.rs index 22de700eea..2c8b8e17d5 100644 --- a/examples/gateway.rs +++ b/examples/gateway.rs @@ -1,6 +1,6 @@ #![deny(warnings)] -use hyper::{server::conn::Http, service::service_fn}; +use hyper::{server::conn::http1, service::service_fn}; use std::net::SocketAddr; use tokio::net::{TcpListener, TcpStream}; @@ -56,7 +56,10 @@ async fn main() -> Result<(), Box> { }); tokio::task::spawn(async move { - if let Err(err) = Http::new().serve_connection(stream, service).await { + if let Err(err) = http1::Builder::new() + .serve_connection(stream, service) + .await + { println!("Failed to servce connection: {:?}", err); } }); diff --git a/examples/hello.rs b/examples/hello.rs index 4f23172b40..28a145dee4 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -5,7 +5,7 @@ use std::net::SocketAddr; use bytes::Bytes; use http_body_util::Full; -use hyper::server::conn::Http; +use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Recv, Request, Response}; use tokio::net::TcpListener; @@ -26,7 +26,7 @@ pub async fn main() -> Result<(), Box> { let (stream, _) = listener.accept().await?; tokio::task::spawn(async move { - if let Err(err) = Http::new() + if let Err(err) = http1::Builder::new() .serve_connection(stream, service_fn(hello)) .await { diff --git a/examples/http_proxy.rs b/examples/http_proxy.rs index 6bb0a8d24e..792e1e470d 100644 --- a/examples/http_proxy.rs +++ b/examples/http_proxy.rs @@ -5,7 +5,7 @@ use std::net::SocketAddr; use bytes::Bytes; use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; use hyper::client::conn::http1::Builder; -use hyper::server::conn::Http; +use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::upgrade::Upgraded; use hyper::{Method, Recv, Request, Response}; @@ -30,7 +30,7 @@ async fn main() -> Result<(), Box> { let (stream, _) = listener.accept().await?; tokio::task::spawn(async move { - if let Err(err) = Http::new() + if let Err(err) = http1::Builder::new() .http1_preserve_header_case(true) .http1_title_case_headers(true) .serve_connection(stream, service_fn(proxy)) diff --git a/examples/multi_server.rs b/examples/multi_server.rs index 302525b29f..00232133b6 100644 --- a/examples/multi_server.rs +++ b/examples/multi_server.rs @@ -6,7 +6,7 @@ use std::net::SocketAddr; use bytes::Bytes; use futures_util::future::join; use http_body_util::Full; -use hyper::server::conn::Http; +use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Recv, Request, Response}; use tokio::net::TcpListener; @@ -35,7 +35,7 @@ async fn main() -> Result<(), Box> { let (stream, _) = listener.accept().await.unwrap(); tokio::task::spawn(async move { - if let Err(err) = Http::new() + if let Err(err) = http1::Builder::new() .serve_connection(stream, service_fn(index1)) .await { @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box> { let (stream, _) = listener.accept().await.unwrap(); tokio::task::spawn(async move { - if let Err(err) = Http::new() + if let Err(err) = http1::Builder::new() .serve_connection(stream, service_fn(index2)) .await { diff --git a/examples/params.rs b/examples/params.rs index da5a182bb2..44bf877bdf 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; -use hyper::server::conn::Http; +use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Recv, Request, Response, StatusCode}; use tokio::net::TcpListener; @@ -126,7 +126,7 @@ async fn main() -> Result<(), Box> { let (stream, _) = listener.accept().await?; tokio::task::spawn(async move { - if let Err(err) = Http::new() + if let Err(err) = http1::Builder::new() .serve_connection(stream, service_fn(param_example)) .await { diff --git a/examples/send_file.rs b/examples/send_file.rs index 1249a0d7f2..5d6700f2b3 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; -use hyper::server::conn::Http; +use hyper::server::conn::http1; use tokio::net::TcpListener; use bytes::Bytes; @@ -26,7 +26,7 @@ async fn main() -> std::result::Result<(), Box> { let (stream, _) = listener.accept().await?; tokio::task::spawn(async move { - if let Err(err) = Http::new() + if let Err(err) = http1::Builder::new() .serve_connection(stream, service_fn(response_examples)) .await { diff --git a/examples/service_struct_impl.rs b/examples/service_struct_impl.rs index ad73c6855c..5a9e6ab61b 100644 --- a/examples/service_struct_impl.rs +++ b/examples/service_struct_impl.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use http_body_util::Full; -use hyper::server::conn::Http; +use hyper::server::conn::http1; use hyper::service::Service; use hyper::{Recv, Request, Response}; use tokio::net::TcpListener; @@ -22,7 +22,7 @@ async fn main() -> Result<(), Box> { let (stream, _) = listener.accept().await?; tokio::task::spawn(async move { - if let Err(err) = Http::new() + if let Err(err) = http1::Builder::new() .serve_connection(stream, Svc { counter: 81818 }) .await { diff --git a/examples/single_threaded.rs b/examples/single_threaded.rs index 14989c91a2..2d991d0148 100644 --- a/examples/single_threaded.rs +++ b/examples/single_threaded.rs @@ -1,6 +1,6 @@ #![deny(warnings)] -use hyper::server::conn::Http; +use hyper::server::conn::http2; use std::cell::Cell; use std::net::SocketAddr; use std::rc::Rc; @@ -84,8 +84,7 @@ async fn run() -> Result<(), Box> { }); tokio::task::spawn_local(async move { - if let Err(err) = Http::new() - .with_executor(LocalExec) + if let Err(err) = http2::Builder::new(LocalExec) .serve_connection(stream, service) .await { @@ -95,6 +94,8 @@ async fn run() -> Result<(), Box> { } } +// NOTE: This part is only needed for HTTP/2. HTTP/1 doesn't need an executor. +// // Since the Server needs to spawn some background tasks, we needed // to configure an Executor that can spawn !Send futures... #[derive(Clone, Copy, Debug)] diff --git a/examples/state.rs b/examples/state.rs index ed3974c1e4..7d060efe1d 100644 --- a/examples/state.rs +++ b/examples/state.rs @@ -8,7 +8,7 @@ use std::sync::{ use bytes::Bytes; use http_body_util::Full; -use hyper::{server::conn::Http, service::service_fn}; +use hyper::{server::conn::http1, service::service_fn}; use hyper::{Error, Response}; use tokio::net::TcpListener; @@ -46,7 +46,10 @@ async fn main() -> Result<(), Box> { } }); - if let Err(err) = Http::new().serve_connection(stream, service).await { + if let Err(err) = http1::Builder::new() + .serve_connection(stream, service) + .await + { println!("Error serving connection: {:?}", err); } } diff --git a/examples/upgrades.rs b/examples/upgrades.rs index e00d86f2d9..e5494e7bbb 100644 --- a/examples/upgrades.rs +++ b/examples/upgrades.rs @@ -11,7 +11,7 @@ use tokio::sync::watch; use bytes::Bytes; use http_body_util::Empty; use hyper::header::{HeaderValue, UPGRADE}; -use hyper::server::conn::Http; +use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::upgrade::Upgraded; use hyper::{Recv, Request, Response, StatusCode}; @@ -149,7 +149,7 @@ async fn main() { let mut rx = rx.clone(); tokio::task::spawn(async move { - let conn = Http::new().serve_connection(stream, service_fn(server_upgrade)); + let conn = http1::Builder::new().serve_connection(stream, service_fn(server_upgrade)); // Don't forget to enable upgrades on the connection. let mut conn = conn.with_upgrades(); diff --git a/examples/web_api.rs b/examples/web_api.rs index 9c4f5c12e1..b01e355665 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -4,7 +4,7 @@ use std::net::SocketAddr; use bytes::{Buf, Bytes}; use http_body_util::{BodyExt, Full}; -use hyper::server::conn::Http; +use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{header, Method, Recv, Request, Response, StatusCode}; use tokio::net::{TcpListener, TcpStream}; @@ -113,7 +113,10 @@ async fn main() -> Result<()> { tokio::task::spawn(async move { let service = service_fn(move |req| response_examples(req)); - if let Err(err) = Http::new().serve_connection(stream, service).await { + if let Err(err) = http1::Builder::new() + .serve_connection(stream, service) + .await + { println!("Failed to serve connection: {:?}", err); } }); diff --git a/src/common/exec.rs b/src/common/exec.rs index 4c5eb62a2f..b7e3e9d7f7 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -20,7 +20,7 @@ pub(crate) type BoxSendFuture = Pin + Send>>; // TODO: with the `runtime`feature, `Exec::Default` used `tokio::spawn`. With the // removal of the opt-in default runtime, this should be refactored. #[derive(Clone)] -pub enum Exec { +pub(crate) enum Exec { Default, Executor(Arc + Send + Sync>), } diff --git a/src/common/io/rewind.rs b/src/common/io/rewind.rs index 8da4885f1e..5642d897d1 100644 --- a/src/common/io/rewind.rs +++ b/src/common/io/rewind.rs @@ -14,7 +14,7 @@ pub(crate) struct Rewind { } impl Rewind { - #[cfg(any(all(feature = "http2", feature = "server"), test))] + #[cfg(test)] pub(crate) fn new(io: T) -> Self { Rewind { pre: None, @@ -29,7 +29,7 @@ impl Rewind { } } - #[cfg(any(all(feature = "http1", feature = "http2", feature = "server"), test))] + #[cfg(test)] pub(crate) fn rewind(&mut self, bs: Bytes) { debug_assert!(self.pre.is_none()); self.pre = Some(bs); diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index 006028f0ca..48e0872e4a 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -2,7 +2,6 @@ use std::error::Error as StdError; use std::fmt; -use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; @@ -10,7 +9,6 @@ use bytes::Bytes; use tokio::io::{AsyncRead, AsyncWrite}; use crate::body::{Body, Recv}; -use crate::common::exec::{ConnStreamExec, Exec}; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::{common::time::Time, rt::Timer}; use crate::proto; @@ -25,21 +23,18 @@ pin_project_lite::pin_project! { /// /// Polling this future will drive HTTP forward. #[must_use = "futures do nothing unless polled"] - pub struct Connection + pub struct Connection where S: HttpService, { conn: Option>, - // can we remove this? - _exec: PhantomData, } } /// A configuration builder for HTTP/1 server connections. #[derive(Clone, Debug)] -pub struct Builder { - pub(crate) _exec: E, +pub struct Builder { pub(crate) timer: Time, h1_half_close: bool, h1_keep_alive: bool, @@ -75,7 +70,7 @@ pub struct Parts { // ===== impl Connection ===== -impl fmt::Debug for Connection +impl fmt::Debug for Connection where S: HttpService, { @@ -84,14 +79,13 @@ where } } -impl Connection +impl Connection where S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Body + 'static, B::Error: Into>, - E: ConnStreamExec, { /// Start a graceful shutdown process for this connection. /// @@ -187,7 +181,7 @@ where /// Enable this connection to support higher-level HTTP upgrades. /// /// See [the `upgrade` module](crate::upgrade) for more. - pub fn with_upgrades(self) -> upgrades::UpgradeableConnection + pub fn with_upgrades(self) -> upgrades::UpgradeableConnection where I: Send, { @@ -196,14 +190,13 @@ where } -impl Future for Connection +impl Future for Connection where S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static, B::Error: Into>, - E: ConnStreamExec, { type Output = crate::Result<()>; @@ -229,13 +222,10 @@ where // ===== impl Builder ===== -impl Builder { +impl Builder { /// Create a new connection builder. - /// - /// This starts with the default options, and an executor. - pub fn new(exec: E) -> Self { + pub fn new() -> Self { Self { - _exec: exec, timer: Time::Empty, h1_half_close: false, h1_keep_alive: true, @@ -351,24 +341,6 @@ impl Builder { self } - /// Set the executor used to spawn background tasks. - /// - /// Default uses implicit default (like `tokio::spawn`). - pub fn with_executor(self, exec: E2) -> Builder { - Builder { - _exec: exec, - timer: self.timer, - h1_half_close: self.h1_half_close, - h1_keep_alive: self.h1_keep_alive, - h1_title_case_headers: self.h1_title_case_headers, - h1_preserve_header_case: self.h1_preserve_header_case, - h1_header_read_timeout: self.h1_header_read_timeout, - h1_writev: self.h1_writev, - max_buf_size: self.max_buf_size, - pipeline_flush: self.pipeline_flush, - } - } - /// Set the timer used in background tasks. pub fn timer(&mut self, timer: M) -> &mut Self where @@ -388,7 +360,7 @@ impl Builder { /// ``` /// # use hyper::{Recv, Request, Response}; /// # use hyper::service::Service; - /// # use hyper::server::conn::Http; + /// # use hyper::server::conn::http1::Builder; /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # async fn run(some_io: I, some_service: S) /// # where @@ -397,7 +369,7 @@ impl Builder { /// # S::Error: Into>, /// # S::Future: Send, /// # { - /// let http = Http::new(); + /// let http = Builder::new(); /// let conn = http.serve_connection(some_io, some_service); /// /// if let Err(e) = conn.await { @@ -406,14 +378,13 @@ impl Builder { /// # } /// # fn main() {} /// ``` - pub fn serve_connection(&self, io: I, service: S) -> Connection + pub fn serve_connection(&self, io: I, service: S) -> Connection where - S: HttpService, + S: HttpService, S::Error: Into>, - Bd: Body + 'static, - Bd::Error: Into>, + S::ResBody: 'static, + ::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, - E: ConnStreamExec, { let mut conn = proto::Conn::new(io); conn.set_timer(self.timer.clone()); @@ -447,7 +418,6 @@ impl Builder { let proto = proto::h1::Dispatcher::new(sd, conn); Connection { conn: Some(proto), - _exec: PhantomData, } } } @@ -459,25 +429,23 @@ mod upgrades { // A future binding a connection with a Service with Upgrade support. // - // This type is unnameable outside the crate, and so basically just an - // `impl Future`, without requiring Rust 1.26. + // This type is unnameable outside the crate. #[must_use = "futures do nothing unless polled"] #[allow(missing_debug_implementations)] - pub struct UpgradeableConnection + pub struct UpgradeableConnection where S: HttpService, { - pub(super) inner: Connection, + pub(super) inner: Connection, } - impl UpgradeableConnection + impl UpgradeableConnection where S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Body + 'static, B::Error: Into>, - E: ConnStreamExec, { /// Start a graceful shutdown process for this connection. /// @@ -488,14 +456,13 @@ mod upgrades { } } - impl Future for UpgradeableConnection + impl Future for UpgradeableConnection where S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Error: Into>, - E: ConnStreamExec, { type Output = crate::Result<()>; diff --git a/src/server/conn/http2.rs b/src/server/conn/http2.rs index e6a5f17e67..e39aaf80e9 100644 --- a/src/server/conn/http2.rs +++ b/src/server/conn/http2.rs @@ -284,30 +284,6 @@ impl Builder { /// /// This returns a Future that must be polled in order for HTTP to be /// driven on the connection. - /// - /// # Example - /// - /// ``` - /// # use hyper::{Recv, Request, Response}; - /// # use hyper::service::Service; - /// # use hyper::server::conn::Http; - /// # use tokio::io::{AsyncRead, AsyncWrite}; - /// # async fn run(some_io: I, some_service: S) - /// # where - /// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - /// # S: Service, Response=hyper::Response> + Send + 'static, - /// # S::Error: Into>, - /// # S::Future: Send, - /// # { - /// let http = Http::new(); - /// let conn = http.serve_connection(some_io, some_service); - /// - /// if let Err(e) = conn.await { - /// eprintln!("server connection error: {}", e); - /// } - /// # } - /// # fn main() {} - /// ``` pub fn serve_connection(&self, io: I, service: S) -> Connection where S: HttpService, diff --git a/src/server/conn/mod.rs b/src/server/conn/mod.rs index d3ed55d4f3..41f9d0363f 100644 --- a/src/server/conn/mod.rs +++ b/src/server/conn/mod.rs @@ -1,18 +1,23 @@ -//! Lower-level Server connection API. +//! Server connection API. //! //! The types in this module are to provide a lower-level API based around a //! single connection. Accepting a connection and binding it with a service //! are not handled at this level. This module provides the building blocks to //! customize those things externally. //! +//! This module is split by HTTP version. Both work similarly, but do have +//! specific options on each builder. +//! //! ## Example -//! A simple example that uses the `Http` struct to talk HTTP over a Tokio TCP stream +//! +//! A simple example that prepares an HTTP/1 connection over a Tokio TCP stream. +//! //! ```no_run //! # #[cfg(feature = "http1")] //! # mod rt { //! use http::{Request, Response, StatusCode}; //! use http_body_util::Full; -//! use hyper::{server::conn::Http, service::service_fn, body::Bytes}; +//! use hyper::{server::conn::http1, service::service_fn, body::Bytes}; //! use std::{net::SocketAddr, convert::Infallible}; //! use tokio::net::TcpListener; //! @@ -24,8 +29,7 @@ //! loop { //! let (tcp_stream, _) = tcp_listener.accept().await?; //! tokio::task::spawn(async move { -//! if let Err(http_err) = Http::new() -//! .http1_only(true) +//! if let Err(http_err) = http1::Builder::new() //! .http1_keep_alive(true) //! .serve_connection(tcp_stream, service_fn(hello)) //! .await { @@ -41,1048 +45,8 @@ //! # } //! ``` -#[cfg(all( - any(feature = "http1", feature = "http2"), - not(all(feature = "http1", feature = "http2")) -))] -use std::marker::PhantomData; -use std::sync::Arc; -#[cfg(any(feature = "http1", feature = "http2"))] -use std::time::Duration; - -#[cfg(feature = "http2")] -use crate::common::io::Rewind; -#[cfg(all(feature = "http1", feature = "http2"))] -use crate::error::{Kind, Parse}; -#[cfg(feature = "http1")] -use crate::upgrade::Upgraded; -use crate::{common::time::Time, rt::Timer}; - #[cfg(feature = "http1")] pub mod http1; #[cfg(feature = "http2")] pub mod http2; -cfg_feature! { - #![any(feature = "http1", feature = "http2")] - - use std::error::Error as StdError; - use std::fmt; - - use bytes::Bytes; - use pin_project_lite::pin_project; - use tokio::io::{AsyncRead, AsyncWrite}; - use tracing::trace; - - use crate::body::{Recv, Body}; - use crate::common::{task, Future, Pin, Poll, Unpin}; - #[cfg(not(all(feature = "http1", feature = "http2")))] - use crate::common::Never; - use crate::common::exec::{ConnStreamExec, Exec}; - use crate::proto; - use crate::service::HttpService; - - pub(super) use self::upgrades::UpgradeableConnection; -} - -/// A lower-level configuration of the HTTP protocol. -/// -/// This structure is used to configure options for an HTTP server connection. -#[derive(Clone, Debug)] -#[cfg(any(feature = "http1", feature = "http2"))] -#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] -pub struct Http { - pub(crate) exec: E, - pub(crate) timer: Time, - h1_half_close: bool, - h1_keep_alive: bool, - h1_title_case_headers: bool, - h1_preserve_header_case: bool, - #[cfg(feature = "http1")] - h1_header_read_timeout: Option, - h1_writev: Option, - #[cfg(feature = "http2")] - h2_builder: proto::h2::server::Config, - mode: ConnectionMode, - max_buf_size: Option, - pipeline_flush: bool, -} - -/// The internal mode of HTTP protocol which indicates the behavior when a parse error occurs. -#[cfg(any(feature = "http1", feature = "http2"))] -#[derive(Clone, Debug, PartialEq)] -enum ConnectionMode { - /// Always use HTTP/1 and do not upgrade when a parse error occurs. - #[cfg(feature = "http1")] - H1Only, - /// Always use HTTP/2. - #[cfg(feature = "http2")] - H2Only, - /// Use HTTP/1 and try to upgrade to h2 when a parse error occurs. - #[cfg(all(feature = "http1", feature = "http2"))] - Fallback, -} - -#[cfg(any(feature = "http1", feature = "http2"))] -pin_project! { - /// A future binding a connection with a Service. - /// - /// Polling this future will drive HTTP forward. - #[must_use = "futures do nothing unless polled"] - #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] - pub struct Connection - where - S: HttpService, - { - pub(super) conn: Option>, - fallback: Fallback, - } -} - -#[cfg(feature = "http1")] -type Http1Dispatcher = - proto::h1::Dispatcher, B, T, proto::ServerTransaction>; - -#[cfg(all(not(feature = "http1"), feature = "http2"))] -type Http1Dispatcher = (Never, PhantomData<(T, Box>, Box>)>); - -#[cfg(feature = "http2")] -type Http2Server = proto::h2::Server, S, B, E>; - -#[cfg(all(not(feature = "http2"), feature = "http1"))] -type Http2Server = ( - Never, - PhantomData<(T, Box>, Box>, Box>)>, -); - -#[cfg(any(feature = "http1", feature = "http2"))] -pin_project! { - #[project = ProtoServerProj] - pub(super) enum ProtoServer - where - S: HttpService, - B: Body, - { - H1 { - #[pin] - h1: Http1Dispatcher, - }, - H2 { - #[pin] - h2: Http2Server, - }, - } -} - -#[cfg(all(feature = "http1", feature = "http2"))] -#[derive(Clone, Debug)] -enum Fallback { - ToHttp2(proto::h2::server::Config, E, Time), - Http1Only, -} - -#[cfg(all( - any(feature = "http1", feature = "http2"), - not(all(feature = "http1", feature = "http2")) -))] -type Fallback = PhantomData; - -#[cfg(all(feature = "http1", feature = "http2"))] -impl Fallback { - fn to_h2(&self) -> bool { - match *self { - Fallback::ToHttp2(..) => true, - Fallback::Http1Only => false, - } - } -} - -#[cfg(all(feature = "http1", feature = "http2"))] -impl Unpin for Fallback {} - -/// Deconstructed parts of a `Connection`. -/// -/// This allows taking apart a `Connection` at a later time, in order to -/// reclaim the IO object, and additional related pieces. -#[derive(Debug)] -#[cfg(any(feature = "http1", feature = "http2"))] -#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] -pub struct Parts { - /// The original IO object used in the handshake. - pub io: T, - /// A buffer of bytes that have been read but not processed as HTTP. - /// - /// If the client sent additional bytes after its last request, and - /// this connection "ended" with an upgrade, the read buffer will contain - /// those bytes. - /// - /// You will want to check for any existing bytes if you plan to continue - /// communicating on the IO object. - pub read_buf: Bytes, - /// The `Service` used to serve this connection. - pub service: S, - _inner: (), -} - -// ===== impl Http ===== - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Http { - /// Creates a new instance of the HTTP protocol, ready to spawn a server or - /// start accepting connections. - pub fn new() -> Http { - Http { - exec: Exec::Default, - timer: Time::Empty, - h1_half_close: false, - h1_keep_alive: true, - h1_title_case_headers: false, - h1_preserve_header_case: false, - #[cfg(feature = "http1")] - h1_header_read_timeout: None, - h1_writev: None, - #[cfg(feature = "http2")] - h2_builder: Default::default(), - mode: ConnectionMode::default(), - max_buf_size: None, - pipeline_flush: false, - } - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Http { - /// Sets whether HTTP1 is required. - /// - /// Default is false - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_only(&mut self, val: bool) -> &mut Self { - if val { - self.mode = ConnectionMode::H1Only; - } else { - #[cfg(feature = "http2")] - { - self.mode = ConnectionMode::Fallback; - } - } - self - } - - /// Set whether HTTP/1 connections should support half-closures. - /// - /// Clients can chose to shutdown their write-side while waiting - /// for the server to respond. Setting this to `true` will - /// prevent closing the connection immediately if `read` - /// detects an EOF in the middle of a request. - /// - /// Default is `false`. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_half_close(&mut self, val: bool) -> &mut Self { - self.h1_half_close = val; - self - } - - /// Enables or disables HTTP/1 keep-alive. - /// - /// Default is true. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self { - self.h1_keep_alive = val; - self - } - - /// Set whether HTTP/1 connections will write header names as title case at - /// the socket level. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Self { - self.h1_title_case_headers = enabled; - self - } - - /// Set whether to support preserving original header cases. - /// - /// Currently, this will record the original cases received, and store them - /// in a private extension on the `Request`. It will also look for and use - /// such an extension in any provided `Response`. - /// - /// Since the relevant extension is still private, there is no way to - /// interact with the original cases. The only effect this can have now is - /// to forward the cases in a proxy-like fashion. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self { - self.h1_preserve_header_case = enabled; - self - } - - /// Set a timeout for reading client request headers. If a client does not - /// transmit the entire header within this time, the connection is closed. - /// - /// Default is None. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self { - self.h1_header_read_timeout = Some(read_timeout); - self - } - - /// Set whether HTTP/1 connections should try to use vectored writes, - /// or always flatten into a single buffer. - /// - /// Note that setting this to false may mean more copies of body data, - /// but may also improve performance when an IO transport doesn't - /// support vectored writes well, such as most TLS implementations. - /// - /// Setting this to true will force hyper to use queued strategy - /// which may eliminate unnecessary cloning on some TLS backends - /// - /// Default is `auto`. In this mode hyper will try to guess which - /// mode to use - #[inline] - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_writev(&mut self, val: bool) -> &mut Self { - self.h1_writev = Some(val); - self - } - - /// Sets whether HTTP2 is required. - /// - /// Default is false - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_only(&mut self, val: bool) -> &mut Self { - if val { - self.mode = ConnectionMode::H2Only; - } else { - #[cfg(feature = "http1")] - { - self.mode = ConnectionMode::Fallback; - } - } - self - } - - /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 - /// stream-level flow control. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.adaptive_window = false; - self.h2_builder.initial_stream_window_size = sz; - } - self - } - - /// Sets the max connection-level flow control for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_connection_window_size( - &mut self, - sz: impl Into>, - ) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.adaptive_window = false; - self.h2_builder.initial_conn_window_size = sz; - } - self - } - - /// Sets whether to use an adaptive flow control. - /// - /// Enabling this will override the limits set in - /// `http2_initial_stream_window_size` and - /// `http2_initial_connection_window_size`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { - use proto::h2::SPEC_WINDOW_SIZE; - - self.h2_builder.adaptive_window = enabled; - if enabled { - self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; - self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; - } - self - } - - /// Sets the maximum frame size to use for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_frame_size(&mut self, sz: impl Into>) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.max_frame_size = sz; - } - self - } - - /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 - /// connections. - /// - /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_concurrent_streams(&mut self, max: impl Into>) -> &mut Self { - self.h2_builder.max_concurrent_streams = max.into(); - self - } - - /// Sets an interval for HTTP2 Ping frames should be sent to keep a - /// connection alive. - /// - /// Pass `None` to disable HTTP2 keep-alive. - /// - /// Default is currently disabled. - /// - /// # Cargo Feature - /// - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_interval( - &mut self, - interval: impl Into>, - ) -> &mut Self { - self.h2_builder.keep_alive_interval = interval.into(); - self - } - - /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. - /// - /// If the ping is not acknowledged within the timeout, the connection will - /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. - /// - /// Default is 20 seconds. - /// - /// # Cargo Feature - /// - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { - self.h2_builder.keep_alive_timeout = timeout; - self - } - - /// Set the maximum write buffer size for each HTTP/2 stream. - /// - /// Default is currently ~400KB, but may change. - /// - /// # Panics - /// - /// The value must be no larger than `u32::MAX`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self { - assert!(max <= std::u32::MAX as usize); - self.h2_builder.max_send_buffer_size = max; - self - } - - /// Enables the [extended CONNECT protocol]. - /// - /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 - #[cfg(feature = "http2")] - pub fn http2_enable_connect_protocol(&mut self) -> &mut Self { - self.h2_builder.enable_connect_protocol = true; - self - } - - /// Sets the max size of received header frames. - /// - /// Default is currently ~16MB, but may change. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self { - self.h2_builder.max_header_list_size = max; - self - } - - /// Set the maximum buffer size for the connection. - /// - /// Default is ~400kb. - /// - /// # Panics - /// - /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn max_buf_size(&mut self, max: usize) -> &mut Self { - assert!( - max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE, - "the max_buf_size cannot be smaller than the minimum that h1 specifies." - ); - self.max_buf_size = Some(max); - self - } - - /// Aggregates flushes to better support pipelined responses. - /// - /// Experimental, may have bugs. - /// - /// Default is false. - pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self { - self.pipeline_flush = enabled; - self - } - - /// Set the executor used to spawn background tasks. - /// - /// Default uses implicit default (like `tokio::spawn`). - pub fn with_executor(self, exec: E2) -> Http { - Http { - exec, - timer: self.timer, - h1_half_close: self.h1_half_close, - h1_keep_alive: self.h1_keep_alive, - h1_title_case_headers: self.h1_title_case_headers, - h1_preserve_header_case: self.h1_preserve_header_case, - #[cfg(feature = "http1")] - h1_header_read_timeout: self.h1_header_read_timeout, - h1_writev: self.h1_writev, - #[cfg(feature = "http2")] - h2_builder: self.h2_builder, - mode: self.mode, - max_buf_size: self.max_buf_size, - pipeline_flush: self.pipeline_flush, - } - } - - /// Set the timer used in background tasks. - pub fn with_timer(self, timer: M) -> Http - where - M: Timer + Send + Sync + 'static, - { - Http { - exec: self.exec, - timer: Time::Timer(Arc::new(timer)), - h1_half_close: self.h1_half_close, - h1_keep_alive: self.h1_keep_alive, - h1_title_case_headers: self.h1_title_case_headers, - h1_preserve_header_case: self.h1_preserve_header_case, - #[cfg(feature = "http1")] - h1_header_read_timeout: self.h1_header_read_timeout, - h1_writev: self.h1_writev, - #[cfg(feature = "http2")] - h2_builder: self.h2_builder, - mode: self.mode, - max_buf_size: self.max_buf_size, - pipeline_flush: self.pipeline_flush, - } - } - - /// Bind a connection together with a [`Service`](crate::service::Service). - /// - /// This returns a Future that must be polled in order for HTTP to be - /// driven on the connection. - /// - /// # Example - /// - /// ``` - /// # use hyper::{Recv, Request, Response}; - /// # use hyper::service::Service; - /// # use hyper::server::conn::Http; - /// # use tokio::io::{AsyncRead, AsyncWrite}; - /// # async fn run(some_io: I, some_service: S) - /// # where - /// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - /// # S: Service, Response=hyper::Response> + Send + 'static, - /// # S::Error: Into>, - /// # S::Future: Send, - /// # { - /// let http = Http::new(); - /// let conn = http.serve_connection(some_io, some_service); - /// - /// if let Err(e) = conn.await { - /// eprintln!("server connection error: {}", e); - /// } - /// # } - /// # fn main() {} - /// ``` - pub fn serve_connection(&self, io: I, service: S) -> Connection - where - S: HttpService, - S::Error: Into>, - Bd: Body + 'static, - Bd::Error: Into>, - I: AsyncRead + AsyncWrite + Unpin, - E: ConnStreamExec, - { - #[cfg(feature = "http1")] - macro_rules! h1 { - () => {{ - let mut conn = proto::Conn::new(io); - { - conn.set_timer(self.timer.clone()); - } - if !self.h1_keep_alive { - conn.disable_keep_alive(); - } - if self.h1_half_close { - conn.set_allow_half_close(); - } - if self.h1_title_case_headers { - conn.set_title_case_headers(); - } - if self.h1_preserve_header_case { - conn.set_preserve_header_case(); - } - #[cfg(feature = "http1")] - if let Some(header_read_timeout) = self.h1_header_read_timeout { - conn.set_http1_header_read_timeout(header_read_timeout); - } - if let Some(writev) = self.h1_writev { - if writev { - conn.set_write_strategy_queue(); - } else { - conn.set_write_strategy_flatten(); - } - } - conn.set_flush_pipeline(self.pipeline_flush); - if let Some(max) = self.max_buf_size { - conn.set_max_buf_size(max); - } - let sd = proto::h1::dispatch::Server::new(service); - ProtoServer::H1 { - h1: proto::h1::Dispatcher::new(sd, conn), - } - }}; - } - - let proto = match self.mode { - #[cfg(feature = "http1")] - #[cfg(not(feature = "http2"))] - ConnectionMode::H1Only => h1!(), - #[cfg(feature = "http2")] - #[cfg(feature = "http1")] - ConnectionMode::H1Only | ConnectionMode::Fallback => h1!(), - #[cfg(feature = "http2")] - ConnectionMode::H2Only => { - let rewind_io = Rewind::new(io); - let h2 = proto::h2::Server::new( - rewind_io, - service, - &self.h2_builder, - self.exec.clone(), - self.timer.clone(), - ); - ProtoServer::H2 { h2 } - } - }; - - Connection { - conn: Some(proto), - #[cfg(all(feature = "http1", feature = "http2"))] - fallback: if self.mode == ConnectionMode::Fallback { - Fallback::ToHttp2( - self.h2_builder.clone(), - self.exec.clone(), - self.timer.clone(), - ) - } else { - Fallback::Http1Only - }, - #[cfg(not(all(feature = "http1", feature = "http2")))] - fallback: PhantomData, - } - } -} - -// ===== impl Connection ===== - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Connection -where - S: HttpService, - S::Error: Into>, - I: AsyncRead + AsyncWrite + Unpin, - B: Body + 'static, - B::Error: Into>, - E: ConnStreamExec, -{ - /// Start a graceful shutdown process for this connection. - /// - /// This `Connection` should continue to be polled until shutdown - /// can finish. - /// - /// # Note - /// - /// This should only be called while the `Connection` future is still - /// pending. If called after `Connection::poll` has resolved, this does - /// nothing. - pub fn graceful_shutdown(mut self: Pin<&mut Self>) { - match self.conn { - #[cfg(feature = "http1")] - Some(ProtoServer::H1 { ref mut h1, .. }) => { - h1.disable_keep_alive(); - } - #[cfg(feature = "http2")] - Some(ProtoServer::H2 { ref mut h2 }) => { - h2.graceful_shutdown(); - } - None => (), - - #[cfg(not(feature = "http1"))] - Some(ProtoServer::H1 { ref mut h1, .. }) => match h1.0 {}, - #[cfg(not(feature = "http2"))] - Some(ProtoServer::H2 { ref mut h2 }) => match h2.0 {}, - } - } - - /// Return the inner IO object, and additional information. - /// - /// If the IO object has been "rewound" the io will not contain those bytes rewound. - /// This should only be called after `poll_without_shutdown` signals - /// that the connection is "done". Otherwise, it may not have finished - /// flushing all necessary HTTP bytes. - /// - /// # Panics - /// This method will panic if this connection is using an h2 protocol. - pub fn into_parts(self) -> Parts { - self.try_into_parts() - .unwrap_or_else(|| panic!("h2 cannot into_inner")) - } - - /// Return the inner IO object, and additional information, if available. - /// - /// This method will return a `None` if this connection is using an h2 protocol. - pub fn try_into_parts(self) -> Option> { - match self.conn.unwrap() { - #[cfg(feature = "http1")] - ProtoServer::H1 { h1, .. } => { - let (io, read_buf, dispatch) = h1.into_inner(); - Some(Parts { - io, - read_buf, - service: dispatch.into_service(), - _inner: (), - }) - } - ProtoServer::H2 { .. } => None, - - #[cfg(not(feature = "http1"))] - ProtoServer::H1 { h1, .. } => match h1.0 {}, - } - } - - /// Poll the connection for completion, but without calling `shutdown` - /// on the underlying IO. - /// - /// This is useful to allow running a connection while doing an HTTP - /// upgrade. Once the upgrade is completed, the connection would be "done", - /// but it is not desired to actually shutdown the IO object. Instead you - /// would take it back using `into_parts`. - pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> - where - S: Unpin, - S::Future: Unpin, - B: Unpin, - { - loop { - match *self.conn.as_mut().unwrap() { - #[cfg(feature = "http1")] - ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) { - Ok(()) => return Poll::Ready(Ok(())), - Err(e) => { - #[cfg(feature = "http2")] - match *e.kind() { - Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => { - self.upgrade_h2(); - continue; - } - _ => (), - } - - return Poll::Ready(Err(e)); - } - }, - #[cfg(feature = "http2")] - ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()), - - #[cfg(not(feature = "http1"))] - ProtoServer::H1 { ref mut h1, .. } => match h1.0 {}, - #[cfg(not(feature = "http2"))] - ProtoServer::H2 { ref mut h2 } => match h2.0 {}, - }; - } - } - - /// Prevent shutdown of the underlying IO object at the end of service the request, - /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. - /// - /// # Error - /// - /// This errors if the underlying connection protocol is not HTTP/1. - pub fn without_shutdown(self) -> impl Future>> - where - S: Unpin, - S::Future: Unpin, - B: Unpin, - { - let mut conn = Some(self); - futures_util::future::poll_fn(move |cx| { - ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?; - Poll::Ready( - conn.take() - .unwrap() - .try_into_parts() - .ok_or_else(crate::Error::new_without_shutdown_not_h1), - ) - }) - } - - #[cfg(all(feature = "http1", feature = "http2"))] - fn upgrade_h2(&mut self) { - trace!("Trying to upgrade connection to h2"); - let conn = self.conn.take(); - - let (io, read_buf, dispatch) = match conn.unwrap() { - ProtoServer::H1 { h1, .. } => h1.into_inner(), - ProtoServer::H2 { .. } => { - panic!("h2 cannot into_inner"); - } - }; - let mut rewind_io = Rewind::new(io); - rewind_io.rewind(read_buf); - let (builder, exec, timer) = match self.fallback { - Fallback::ToHttp2(ref builder, ref exec, ref timer) => (builder, exec, timer), - Fallback::Http1Only => unreachable!("upgrade_h2 with Fallback::Http1Only"), - }; - let h2 = proto::h2::Server::new( - rewind_io, - dispatch.into_service(), - builder, - exec.clone(), - timer.clone(), - ); - - debug_assert!(self.conn.is_none()); - self.conn = Some(ProtoServer::H2 { h2 }); - } - - /// Enable this connection to support higher-level HTTP upgrades. - /// - /// See [the `upgrade` module](crate::upgrade) for more. - pub fn with_upgrades(self) -> UpgradeableConnection - where - I: Send, - { - UpgradeableConnection { inner: self } - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Future for Connection -where - S: HttpService, - S::Error: Into>, - I: AsyncRead + AsyncWrite + Unpin + 'static, - B: Body + 'static, - B::Error: Into>, - E: ConnStreamExec, -{ - type Output = crate::Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - loop { - match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) { - Ok(done) => { - match done { - proto::Dispatched::Shutdown => {} - #[cfg(feature = "http1")] - proto::Dispatched::Upgrade(pending) => { - // With no `Send` bound on `I`, we can't try to do - // upgrades here. In case a user was trying to use - // `Body::on_upgrade` with this API, send a special - // error letting them know about that. - pending.manual(); - } - }; - return Poll::Ready(Ok(())); - } - Err(e) => { - #[cfg(feature = "http1")] - #[cfg(feature = "http2")] - match *e.kind() { - Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => { - self.upgrade_h2(); - continue; - } - _ => (), - } - - return Poll::Ready(Err(e)); - } - } - } - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl fmt::Debug for Connection -where - S: HttpService, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Connection").finish() - } -} - -// ===== impl ConnectionMode ===== - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Default for ConnectionMode { - #[cfg(all(feature = "http1", feature = "http2"))] - fn default() -> ConnectionMode { - ConnectionMode::Fallback - } - - #[cfg(all(feature = "http1", not(feature = "http2")))] - fn default() -> ConnectionMode { - ConnectionMode::H1Only - } - - #[cfg(all(not(feature = "http1"), feature = "http2"))] - fn default() -> ConnectionMode { - ConnectionMode::H2Only - } -} - -// ===== impl ProtoServer ===== - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Future for ProtoServer -where - T: AsyncRead + AsyncWrite + Unpin, - S: HttpService, - S::Error: Into>, - B: Body + 'static, - B::Error: Into>, - E: ConnStreamExec, -{ - type Output = crate::Result; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match self.project() { - #[cfg(feature = "http1")] - ProtoServerProj::H1 { h1, .. } => h1.poll(cx), - #[cfg(feature = "http2")] - ProtoServerProj::H2 { h2 } => h2.poll(cx), - - #[cfg(not(feature = "http1"))] - ProtoServerProj::H1 { h1, .. } => match h1.0 {}, - #[cfg(not(feature = "http2"))] - ProtoServerProj::H2 { h2 } => match h2.0 {}, - } - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -mod upgrades { - use super::*; - - // A future binding a connection with a Service with Upgrade support. - // - // This type is unnameable outside the crate, and so basically just an - // `impl Future`, without requiring Rust 1.26. - #[must_use = "futures do nothing unless polled"] - #[allow(missing_debug_implementations)] - pub struct UpgradeableConnection - where - S: HttpService, - { - pub(super) inner: Connection, - } - - impl UpgradeableConnection - where - S: HttpService, - S::Error: Into>, - I: AsyncRead + AsyncWrite + Unpin, - B: Body + 'static, - B::Error: Into>, - E: ConnStreamExec, - { - /// Start a graceful shutdown process for this connection. - /// - /// This `Connection` should continue to be polled until shutdown - /// can finish. - pub fn graceful_shutdown(mut self: Pin<&mut Self>) { - Pin::new(&mut self.inner).graceful_shutdown() - } - } - - impl Future for UpgradeableConnection - where - S: HttpService, - S::Error: Into>, - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - B: Body + 'static, - B::Error: Into>, - E: ConnStreamExec, - { - type Output = crate::Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - loop { - match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) { - Ok(proto::Dispatched::Shutdown) => return Poll::Ready(Ok(())), - #[cfg(feature = "http1")] - Ok(proto::Dispatched::Upgrade(pending)) => { - match self.inner.conn.take() { - Some(ProtoServer::H1 { h1, .. }) => { - let (io, buf, _) = h1.into_inner(); - pending.fulfill(Upgraded::new(io, buf)); - return Poll::Ready(Ok(())); - } - _ => { - drop(pending); - unreachable!("Upgrade expects h1") - } - }; - } - Err(e) => { - #[cfg(feature = "http1")] - #[cfg(feature = "http2")] - match *e.kind() { - Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => { - self.inner.upgrade_h2(); - continue; - } - _ => (), - } - - return Poll::Ready(Err(e)); - } - } - } - } - } -} diff --git a/tests/client.rs b/tests/client.rs index f9ee93e3cc..63cbe8d270 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1880,7 +1880,7 @@ mod conn { let addr = listener.local_addr().unwrap(); let (shdn_tx, mut shdn_rx) = tokio::sync::watch::channel(false); tokio::task::spawn(async move { - use hyper::server::conn::Http; + use hyper::server::conn::http2; use hyper::service::service_fn; loop { @@ -1892,7 +1892,8 @@ mod conn { let mut shdn_rx = shdn_rx.clone(); tokio::task::spawn(async move { - let mut conn = Http::new().with_executor(TokioExecutor).http2_only(true).serve_connection(stream, service); + let mut conn = http2::Builder::new(TokioExecutor) + .serve_connection(stream, service); tokio::select! { res = &mut conn => { @@ -2093,10 +2094,8 @@ mod conn { // Spawn an HTTP2 server that reads the whole body and responds tokio::spawn(async move { let sock = listener.accept().await.unwrap().0; - hyper::server::conn::Http::new() - .with_executor(TokioExecutor) - .with_timer(TokioTimer) - .http2_only(true) + hyper::server::conn::http2::Builder::new(TokioExecutor) + .timer(TokioTimer) .serve_connection( sock, service_fn(|req| async move { diff --git a/tests/server.rs b/tests/server.rs index da13f2b1cf..f2b65042aa 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -16,7 +16,7 @@ use std::time::Duration; use bytes::Bytes; use futures_channel::oneshot; -use futures_util::future::{self, Either, FutureExt, TryFutureExt}; +use futures_util::future::{self, Either, FutureExt}; use h2::client::SendRequest; use h2::{RecvStream, SendStream}; use http::header::{HeaderName, HeaderValue}; @@ -28,7 +28,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpStream}; use hyper::body::Body; -use hyper::server::conn::Http; +use hyper::server::conn::{http1, http2}; use hyper::service::{service_fn, Service}; use hyper::{Method, Recv, Request, Response, StatusCode, Uri, Version}; @@ -321,7 +321,7 @@ mod response_body_lengths { #[tokio::test] async fn http2_auto_response_with_known_length() { - let server = serve(); + let server = serve_opts().http2().serve(); let addr_str = format!("http://{}", server.addr()); server.reply().body("Hello, World!"); @@ -337,7 +337,7 @@ mod response_body_lengths { #[tokio::test] async fn http2_auto_response_with_conflicting_lengths() { - let server = serve(); + let server = serve_opts().http2().serve(); let addr_str = format!("http://{}", server.addr()); server .reply() @@ -356,7 +356,7 @@ mod response_body_lengths { #[tokio::test] async fn http2_implicit_empty_size_hint() { - let server = serve(); + let server = serve_opts().http2().serve(); let addr_str = format!("http://{}", server.addr()); server.reply(); @@ -954,7 +954,7 @@ async fn expect_continue_waits_for_body_poll() { let (socket, _) = listener.accept().await.expect("accept"); - Http::new() + http1::Builder::new() .serve_connection( socket, service_fn(|req| { @@ -1129,7 +1129,7 @@ async fn disable_keep_alive_mid_request() { }); let (socket, _) = listener.accept().await.unwrap(); - let srv = Http::new().serve_connection(socket, HelloWorld); + let srv = http1::Builder::new().serve_connection(socket, HelloWorld); future::try_select(srv, rx1) .then(|r| match r { Ok(Either::Left(_)) => panic!("expected rx first"), @@ -1182,7 +1182,7 @@ async fn disable_keep_alive_post_request() { stream: socket, _debug: dropped2, }; - let server = Http::new().serve_connection(transport, HelloWorld); + let server = http1::Builder::new().serve_connection(transport, HelloWorld); let fut = future::try_select(server, rx1).then(|r| match r { Ok(Either::Left(_)) => panic!("expected rx first"), Ok(Either::Right(((), mut conn))) => { @@ -1210,7 +1210,7 @@ async fn empty_parse_eof_does_not_return_error() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + http1::Builder::new() .serve_connection(socket, HelloWorld) .await .expect("empty parse eof is ok"); @@ -1227,7 +1227,7 @@ async fn nonempty_parse_eof_returns_error() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + http1::Builder::new() .serve_connection(socket, HelloWorld) .await .expect_err("partial parse eof is error"); @@ -1252,7 +1252,7 @@ async fn http1_allow_half_close() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + http1::Builder::new() .http1_half_close(true) .serve_connection( socket, @@ -1281,7 +1281,7 @@ async fn disconnect_after_reading_request_before_responding() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + http1::Builder::new() .http1_half_close(false) .serve_connection( socket, @@ -1313,7 +1313,7 @@ async fn returning_1xx_response_is_error() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + http1::Builder::new() .serve_connection( socket, service_fn(|_| async move { @@ -1378,8 +1378,8 @@ async fn header_read_timeout_slow_writes() { }); let (socket, _) = listener.accept().await.unwrap(); - let conn = Http::new() - .with_timer(TokioTimer) + let conn = http1::Builder::new() + .timer(TokioTimer) .http1_header_read_timeout(Duration::from_secs(5)) .serve_connection( socket, @@ -1454,8 +1454,8 @@ async fn header_read_timeout_slow_writes_multiple_requests() { }); let (socket, _) = listener.accept().await.unwrap(); - let conn = Http::new() - .with_timer(TokioTimer) + let conn = http1::Builder::new() + .timer(TokioTimer) .http1_header_read_timeout(Duration::from_secs(5)) .serve_connection( socket, @@ -1502,7 +1502,7 @@ async fn upgrades() { }); let (socket, _) = listener.accept().await.unwrap(); - let conn = Http::new().serve_connection( + let conn = http1::Builder::new().serve_connection( socket, service_fn(|_| { let res = Response::builder() @@ -1557,7 +1557,7 @@ async fn http_connect() { }); let (socket, _) = listener.accept().await.unwrap(); - let conn = Http::new().serve_connection( + let conn = http1::Builder::new().serve_connection( socket, service_fn(|_| { let res = Response::builder() @@ -1629,7 +1629,7 @@ async fn upgrades_new() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + http1::Builder::new() .serve_connection(socket, svc) .with_upgrades() .await @@ -1666,7 +1666,7 @@ async fn upgrades_ignored() { loop { let (socket, _) = listener.accept().await.unwrap(); tokio::task::spawn(async move { - Http::new() + http1::Builder::new() .serve_connection(socket, svc) .with_upgrades() .await @@ -1737,7 +1737,7 @@ async fn http_connect_new() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + http1::Builder::new() .serve_connection(socket, svc) .with_upgrades() .await @@ -1819,11 +1819,9 @@ async fn h2_connect() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() - .with_executor(TokioExecutor) - .http2_only(true) + http2::Builder::new(TokioExecutor) .serve_connection(socket, svc) - .with_upgrades() + //.with_upgrades() .await .unwrap(); } @@ -1932,11 +1930,9 @@ async fn h2_connect_multiplex() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() - .with_executor(TokioExecutor) - .http2_only(true) + http2::Builder::new(TokioExecutor) .serve_connection(socket, svc) - .with_upgrades() + //.with_upgrades() .await .unwrap(); } @@ -2008,11 +2004,9 @@ async fn h2_connect_large_body() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() - .with_executor(TokioExecutor) - .http2_only(true) + http2::Builder::new(TokioExecutor) .serve_connection(socket, svc) - .with_upgrades() + //.with_upgrades() .await .unwrap(); } @@ -2081,11 +2075,9 @@ async fn h2_connect_empty_frames() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() - .with_executor(TokioExecutor) - .http2_only(true) + http2::Builder::new(TokioExecutor) .serve_connection(socket, svc) - .with_upgrades() + //.with_upgrades() .await .unwrap(); } @@ -2106,7 +2098,7 @@ async fn parse_errors_send_4xx_response() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + http1::Builder::new() .serve_connection(socket, HelloWorld) .await .expect_err("HTTP parse error"); @@ -2129,7 +2121,7 @@ async fn illegal_request_length_returns_400_response() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + http1::Builder::new() .serve_connection(socket, HelloWorld) .await .expect_err("illegal Content-Length should error"); @@ -2140,14 +2132,14 @@ async fn illegal_request_length_returns_400_response() { #[should_panic] fn max_buf_size_panic_too_small() { const MAX: usize = 8191; - Http::new().max_buf_size(MAX); + http1::Builder::new().max_buf_size(MAX); } #[cfg(feature = "http1")] #[test] fn max_buf_size_no_panic() { const MAX: usize = 8193; - Http::new().max_buf_size(MAX); + http1::Builder::new().max_buf_size(MAX); } #[cfg(feature = "http1")] @@ -2171,7 +2163,7 @@ async fn max_buf_size() { }); let (socket, _) = listener.accept().await.unwrap(); - Http::new() + http1::Builder::new() .max_buf_size(MAX) .serve_connection(socket, HelloWorld) .await @@ -2220,27 +2212,9 @@ fn http1_response_with_http2_version() { .unwrap(); } -#[test] -fn try_h2() { - let server = serve(); - let addr_str = format!("http://{}", server.addr()); - - let rt = support::runtime(); - - let client = TestClient::new().http2_only(); - rt.block_on({ - let uri = addr_str.parse().expect("server addr should parse"); - - client.get(uri).map_ok(|_| ()).map_err(|_e| ()) - }) - .unwrap(); - - assert_eq!(server.body(), b""); -} - #[test] fn http1_only() { - let server = serve_opts().http1_only().serve(); + let server = serve_opts().serve(); let addr_str = format!("http://{}", server.addr()); let rt = support::runtime(); @@ -2257,7 +2231,7 @@ fn http1_only() { async fn http2_service_error_sends_reset_reason() { use std::error::Error; - let server = serve(); + let server = serve_opts().http2().serve(); let addr_str = format!("http://{}", server.addr()); server @@ -2284,7 +2258,7 @@ async fn http2_service_error_sends_reset_reason() { #[test] fn http2_body_user_error_sends_reset_reason() { use std::error::Error; - let server = serve(); + let server = serve_opts().http2().serve(); let addr_str = format!("http://{}", server.addr()); let b = futures_util::stream::once(future::err::(Box::new(h2::Error::from( @@ -2423,9 +2397,8 @@ async fn http2_keep_alive_detects_unresponsive_client() { let (socket, _) = listener.accept().await.expect("accept"); - let err = Http::new() - .with_timer(TokioTimer) - .http2_only(true) + let err = http2::Builder::new(TokioExecutor) + .timer(TokioTimer) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) .serve_connection(socket, unreachable_service()) @@ -2445,10 +2418,8 @@ async fn http2_keep_alive_with_responsive_client() { tokio::spawn(async move { let (socket, _) = listener.accept().await.expect("accept"); - Http::new() - .with_executor(TokioExecutor) - .with_timer(TokioTimer) - .http2_only(true) + http2::Builder::new(TokioExecutor) + .timer(TokioTimer) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) .serve_connection(socket, HelloWorld) @@ -2514,9 +2485,8 @@ async fn http2_keep_alive_count_server_pings() { tokio::spawn(async move { let (socket, _) = listener.accept().await.expect("accept"); - Http::new() - .with_timer(TokioTimer) - .http2_only(true) + http2::Builder::new(TokioExecutor) + .timer(TokioTimer) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) .serve_connection(socket, unreachable_service()) @@ -2827,24 +2797,24 @@ fn serve_opts() -> ServeOptions { #[derive(Clone, Copy)] struct ServeOptions { + http2: bool, keep_alive: bool, - http1_only: bool, pipeline: bool, } impl Default for ServeOptions { fn default() -> Self { ServeOptions { + http2: false, keep_alive: true, - http1_only: false, pipeline: false, } } } impl ServeOptions { - fn http1_only(mut self) -> Self { - self.http1_only = true; + fn http2(mut self) -> Self { + self.http2 = true; self } @@ -2894,14 +2864,6 @@ impl ServeOptions { let (stream, _) = res.unwrap(); tokio::task::spawn(async move { - let mut http = Http::new().with_executor(TokioExecutor); - - #[cfg(feature = "http1")] - let http = http - .http1_only(_options.http1_only) - .http1_keep_alive(_options.keep_alive) - .pipeline_flush(_options.pipeline); - let msg_tx = msg_tx.clone(); let reply_rx = reply_rx.clone(); let service = TestService { @@ -2909,7 +2871,15 @@ impl ServeOptions { reply: reply_rx, }; - http.serve_connection(stream, service).await.unwrap(); + if _options.http2 { + http2::Builder::new(TokioExecutor) + .serve_connection(stream, service).await.unwrap(); + } else { + http1::Builder::new() + .http1_keep_alive(_options.keep_alive) + .pipeline_flush(_options.pipeline) + .serve_connection(stream, service).await.unwrap(); + } }); } _ = &mut shutdown_rx => { diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 89ef72e4b7..f19275febb 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -8,7 +8,7 @@ use std::sync::{ use bytes::Bytes; use http_body_util::Full; -use hyper::server::conn::Http; +use hyper::server; use tokio::net::{TcpListener, TcpStream}; use hyper::service::service_fn; @@ -383,12 +383,17 @@ async fn async_test(cfg: __TestConfig) { }); tokio::task::spawn(async move { - Http::new() - .with_executor(TokioExecutor) - .http2_only(http2_only) - .serve_connection(stream, service) - .await - .expect("server error"); + if http2_only { + server::conn::http2::Builder::new(TokioExecutor) + .serve_connection(stream, service) + .await + .expect("server error"); + } else { + server::conn::http1::Builder::new() + .serve_connection(stream, service) + .await + .expect("server error"); + } }); } }); @@ -560,12 +565,17 @@ async fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future) } }); - Http::new() - .with_executor(TokioExecutor) - .http2_only(http2_only) - .serve_connection(stream, service) - .await - .unwrap(); + if http2_only { + server::conn::http2::Builder::new(TokioExecutor) + .serve_connection(stream, service) + .await + .unwrap(); + } else { + server::conn::http1::Builder::new() + .serve_connection(stream, service) + .await + .unwrap(); + } } }); };