From f3e17d0b821cb27bba98a8441ac9466f86d4f5f9 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 27 Nov 2018 16:16:58 -0800 Subject: [PATCH] perf(http1): implement an adaptive read buffer strategy The default read strategy for HTTP/1 connections is now adaptive. It increases or decreases the size of the read buffer depending on the number of bytes that are received in a `read` call. If a transport continuously fills the read buffer, it will continue to grow (up to the `max_buf_size`), allowing for reading faster. If the transport consistently only fills a portion of the read buffer, it will be shrunk. This doesn't provide much benefit to small requests/responses, but benchmarks show it to be a noticeable improvement to throughput when streaming larger bodies. Closes #1708 --- benches/end_to_end.rs | 28 ++++++ src/proto/h1/io.rs | 203 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 202 insertions(+), 29 deletions(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 81ebbe3c64..70960e53dd 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -29,6 +29,26 @@ fn http1_post(b: &mut test::Bencher) { .bench(b) } +#[bench] +fn http1_body_100kb(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 100]; + opts() + .method(Method::POST) + .request_body(body) + .response_body(body) + .bench(b) +} + +#[bench] +fn http1_body_10mb(b: &mut test::Bencher) { + let body = &[b'x'; 1024 * 1024 * 10]; + opts() + .method(Method::POST) + .request_body(body) + .response_body(body) + .bench(b) +} + #[bench] fn http1_get_parallel(b: &mut test::Bencher) { opts() @@ -96,6 +116,11 @@ impl Opts { self } + fn response_body(mut self, body: &'static [u8]) -> Self { + self.response_body = body; + self + } + fn parallel(mut self, cnt: u32) -> Self { assert!(cnt > 0, "parallel count must be larger than 0"); self.parallel_cnt = cnt; @@ -105,6 +130,9 @@ impl Opts { fn bench(self, b: &mut test::Bencher) { let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().unwrap(); + + b.bytes = self.response_body.len() as u64 + self.request_body.map(|b| b.len()).unwrap_or(0) as u64; + let addr = spawn_hello(&mut rt, self.response_body); let connector = HttpConnector::new(1); diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 86664f5837..99435c435b 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -1,4 +1,5 @@ use std::cell::Cell; +use std::cmp; use std::collections::VecDeque; use std::fmt; use std::io; @@ -60,9 +61,7 @@ where io: io, read_blocked: false, read_buf: BytesMut::with_capacity(0), - read_buf_strategy: ReadStrategy::Adaptive { - max: DEFAULT_MAX_BUFFER_SIZE, - }, + read_buf_strategy: ReadStrategy::default(), write_buf: WriteBuf::new(), } } @@ -81,9 +80,7 @@ where "The max_buf_size cannot be smaller than {}.", MINIMUM_MAX_BUFFER_SIZE, ); - self.read_buf_strategy = ReadStrategy::Adaptive { - max, - }; + self.read_buf_strategy = ReadStrategy::with_max(max); self.write_buf.max_buf_size = max; } @@ -149,18 +146,11 @@ where debug!("parsed {} headers", msg.head.headers.len()); return Ok(Async::Ready(msg)) }, - None => match self.read_buf_strategy { - ReadStrategy::Adaptive { max } => { - if self.read_buf.len() >= max { - debug!("max_buf_size ({}) reached, closing", max); - return Err(::Error::new_too_large()); - } - }, - ReadStrategy::Exact(exact) => { - if self.read_buf.len() >= exact { - debug!("exact buf size ({}) filled, closing", exact); - return Err(::Error::new_too_large()); - } + None => { + let max = self.read_buf_strategy.max(); + if self.read_buf.len() >= max { + debug!("max_buf_size ({}) reached, closing", max); + return Err(::Error::new_too_large()); } }, } @@ -177,22 +167,15 @@ where pub fn read_from_io(&mut self) -> Poll { use bytes::BufMut; self.read_blocked = false; - match self.read_buf_strategy { - ReadStrategy::Adaptive { .. } => { - if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE { - self.read_buf.reserve(INIT_BUFFER_SIZE); - } - }, - ReadStrategy::Exact(exact) => { - if self.read_buf.capacity() < exact { - self.read_buf.reserve(exact); - } - }, + let next = self.read_buf_strategy.next(); + if self.read_buf.remaining_mut() < next { + self.read_buf.reserve(next); } self.io.read_buf(&mut self.read_buf).map(|ok| { match ok { Async::Ready(n) => { debug!("read {} bytes", n); + self.read_buf_strategy.record(n); Async::Ready(n) }, Async::NotReady => { @@ -285,11 +268,82 @@ where #[derive(Clone, Copy, Debug)] enum ReadStrategy { Adaptive { + decrease_now: bool, + next: usize, max: usize }, Exact(usize), } +impl ReadStrategy { + fn with_max(max: usize) -> ReadStrategy { + ReadStrategy::Adaptive { + decrease_now: false, + next: INIT_BUFFER_SIZE, + max, + } + } + + fn next(&self) -> usize { + match *self { + ReadStrategy::Adaptive { next, .. } => next, + ReadStrategy::Exact(exact) => exact, + } + } + + fn max(&self) -> usize { + match *self { + ReadStrategy::Adaptive { max, .. } => max, + ReadStrategy::Exact(exact) => exact, + } + } + + fn record(&mut self, bytes_read: usize) { + match *self { + ReadStrategy::Adaptive { ref mut decrease_now, ref mut next, max, .. } => { + if bytes_read >= *next { + *next = cmp::min(incr_power_of_two(*next), max); + *decrease_now = false; + } else { + let decr_to = prev_power_of_two(*next); + if bytes_read < decr_to { + if *decrease_now { + *next = cmp::max(decr_to, INIT_BUFFER_SIZE); + *decrease_now = false; + } else { + // Decreasing is a two "record" process. + *decrease_now = true; + } + } else { + // A read within the current range should cancel + // a potential decrease, since we just saw proof + // that we still need this size. + *decrease_now = false; + } + } + }, + _ => (), + } + } +} + +fn incr_power_of_two(n: usize) -> usize { + n.saturating_mul(2) +} + +fn prev_power_of_two(n: usize) -> usize { + // Only way this shift can underflow is if n is less than 4. + // (Which would means `usize::MAX >> 64` and underflowed!) + debug_assert!(n >= 4); + (::std::usize::MAX >> (n.leading_zeros() + 2)) + 1 +} + +impl Default for ReadStrategy { + fn default() -> ReadStrategy { + ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE) + } +} + #[derive(Clone)] pub struct Cursor { bytes: T, @@ -637,6 +691,97 @@ mod tests { assert!(buffered.io.blocked()); } + #[test] + fn read_strategy_adaptive_increments() { + let mut strategy = ReadStrategy::default(); + assert_eq!(strategy.next(), 8192); + + // Grows if record == next + strategy.record(8192); + assert_eq!(strategy.next(), 16384); + + strategy.record(16384); + assert_eq!(strategy.next(), 32768); + + // Enormous records still increment at same rate + strategy.record(::std::usize::MAX); + assert_eq!(strategy.next(), 65536); + + let max = strategy.max(); + while strategy.next() < max { + strategy.record(max); + } + + assert_eq!(strategy.next(), max, "never goes over max"); + strategy.record(max + 1); + assert_eq!(strategy.next(), max, "never goes over max"); + } + + #[test] + fn read_strategy_adaptive_decrements() { + let mut strategy = ReadStrategy::default(); + strategy.record(8192); + assert_eq!(strategy.next(), 16384); + + strategy.record(1); + assert_eq!(strategy.next(), 16384, "first smaller record doesn't decrement yet"); + strategy.record(8192); + assert_eq!(strategy.next(), 16384, "record was with range"); + + strategy.record(1); + assert_eq!(strategy.next(), 16384, "in-range record should make this the 'first' again"); + + strategy.record(1); + assert_eq!(strategy.next(), 8192, "second smaller record decrements"); + + strategy.record(1); + assert_eq!(strategy.next(), 8192, "first doesn't decrement"); + strategy.record(1); + assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum"); + } + + #[test] + fn read_strategy_adaptive_stays_the_same() { + let mut strategy = ReadStrategy::default(); + strategy.record(8192); + assert_eq!(strategy.next(), 16384); + + strategy.record(8193); + assert_eq!(strategy.next(), 16384, "first smaller record doesn't decrement yet"); + + strategy.record(8193); + assert_eq!(strategy.next(), 16384, "with current step does not decrement"); + } + + #[test] + fn read_strategy_adaptive_max_fuzz() { + fn fuzz(max: usize) { + let mut strategy = ReadStrategy::with_max(max); + while strategy.next() < max { + strategy.record(::std::usize::MAX); + } + let mut next = strategy.next(); + while next > 8192 { + strategy.record(1); + strategy.record(1); + next = strategy.next(); + assert!( + next.is_power_of_two(), + "decrement should be powers of two: {} (max = {})", + next, + max, + ); + } + } + + let mut max = 8192; + while max < ::std::usize::MAX { + fuzz(max); + max = (max / 2).saturating_mul(3); + } + fuzz(::std::usize::MAX); + } + #[test] #[should_panic] fn write_buf_requires_non_empty_bufs() {