Skip to content

Commit

Permalink
fix(http1): try to drain connection buffer if user drops Body
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Mar 10, 2020
1 parent 5b046a1 commit d838d54
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 3 deletions.
14 changes: 14 additions & 0 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,20 @@ where
}
}

/// If the read side can be cheaply drained, do so. Otherwise, close.
pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) {
let _ = self.poll_read_body(cx);

// If still in Reading::Body, just give up
match self.state.reading {
Reading::Init | Reading::KeepAlive => {
trace!("body drained");
return;
}
_ => self.close_read(),
}
}

pub fn close_read(&mut self) {
self.state.close_read();
}
Expand Down
6 changes: 3 additions & 3 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ where
Poll::Ready(Err(_canceled)) => {
// user doesn't care about the body
// so we should stop reading
trace!("body receiver dropped before eof, closing");
self.conn.close_read();
return Poll::Ready(Ok(()));
trace!("body receiver dropped before eof, draining or closing");
self.conn.poll_drain_or_close_read(cx);
continue;
}
}
match self.conn.poll_read_body(cx) {
Expand Down
90 changes: 90 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,7 @@ mod dispatch_impl {
use tokio::net::TcpStream;
use tokio::runtime::Runtime;

use hyper::body::HttpBody;
use hyper::client::connect::{Connected, Connection, HttpConnector};
use hyper::Client;

Expand Down Expand Up @@ -1574,6 +1575,95 @@ mod dispatch_impl {
assert_eq!(connects.load(Ordering::Relaxed), 1);
}

#[tokio::test]
async fn client_keep_alive_eager_when_chunked() {
// If a response body has been read to completion, with completion
// determined by some other factor, like decompression, and thus
// it is in't polled a final time to clear the final 0-len chunk,
// try to eagerly clear it so the connection can still be used.

let _ = pretty_env_logger::try_init();
let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let connector = DebugConnector::new();
let connects = connector.connects.clone();

let client = Client::builder().build(connector);

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
//drop(server);
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
sock.set_write_timeout(Some(Duration::from_secs(5)))
.unwrap();
let mut buf = [0; 4096];
sock.read(&mut buf).expect("read 1");
sock.write_all(
b"\
HTTP/1.1 200 OK\r\n\
transfer-encoding: chunked\r\n\
\r\n\
5\r\n\
hello\r\n\
0\r\n\r\n\
",
)
.expect("write 1");
let _ = tx1.send(());

let n2 = sock.read(&mut buf).expect("read 2");
assert_ne!(n2, 0, "bytes of second request");
let second_get = "GET /b HTTP/1.1\r\n";
assert_eq!(s(&buf[..second_get.len()]), second_get);
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
.expect("write 2");
let _ = tx2.send(());
});

assert_eq!(connects.load(Ordering::SeqCst), 0);

let rx = rx1.expect("thread panicked");
let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
.body(Body::empty())
.unwrap();
let fut = client.request(req);

let mut resp = future::join(fut, rx).map(|r| r.0).await.unwrap();
assert_eq!(connects.load(Ordering::SeqCst), 1);
assert_eq!(resp.status(), 200);
assert_eq!(resp.headers()["transfer-encoding"], "chunked");

// Read the "hello" chunk...
let chunk = resp.body_mut().data().await.unwrap().unwrap();
assert_eq!(chunk, "hello");

// With our prior knowledge, we know that's the end of the body.
// So just drop the body, without polling for the `0\r\n\r\n` end.
drop(resp);

// sleep real quick to let the threadpool put connection in ready
// state and back into client pool
tokio::time::delay_for(Duration::from_millis(50)).await;

let rx = rx2.expect("thread panicked");
let req = Request::builder()
.uri(&*format!("http://{}/b", addr))
.body(Body::empty())
.unwrap();
let fut = client.request(req);
future::join(fut, rx).map(|r| r.0).await.unwrap();

assert_eq!(
connects.load(Ordering::SeqCst),
1,
"second request should still only have 1 connect"
);
drop(client);
}

#[test]
fn connect_proxy_sends_absolute_uri() {
let _ = pretty_env_logger::try_init();
Expand Down

0 comments on commit d838d54

Please sign in to comment.