Skip to content

Commit

Permalink
fix(client): send an Error::Cancel if a queued request is dropped
Browse files Browse the repository at this point in the history
Adds `Error::Cancel` variant.
  • Loading branch information
seanmonstar committed Feb 7, 2018
1 parent a821a36 commit 88f0179
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 5 deletions.
8 changes: 7 additions & 1 deletion src/client/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ impl Cancel {
}
}

impl Canceled {
pub fn cancel(&self) {
self.inner.is_canceled.store(true, Ordering::SeqCst);
}
}

impl Future for Canceled {
type Item = ();
type Error = Never;
Expand Down Expand Up @@ -87,7 +93,7 @@ impl Future for Canceled {

impl Drop for Canceled {
fn drop(&mut self) {
self.inner.is_canceled.store(true, Ordering::SeqCst);
self.cancel();
}
}

Expand Down
53 changes: 49 additions & 4 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,66 @@ impl<T, U> Stream for Receiver<T, U> {
}
}

//TODO: Drop for Receiver should consume inner
impl<T, U> Drop for Receiver<T, U> {
fn drop(&mut self) {
self.canceled.cancel();
self.inner.close();

// This poll() is safe to call in `Drop`, because we've
// called, `close`, which promises that no new messages
// will arrive, and thus, once we reach the end, we won't
// see a `NotReady` (and try to park), but a Ready(None).
//
// All other variants:
// - Ready(None): the end. we want to stop looping
// - NotReady: unreachable
// - Err: unreachable
while let Ok(Async::Ready(Some((_val, cb)))) = self.inner.poll() {
// maybe in future, we pass the value along with the error?
let _ = cb.send(Err(::Error::new_canceled()));
}
}

}

#[cfg(test)]
mod tests {

extern crate pretty_env_logger;
#[cfg(feature = "nightly")]
extern crate test;

use futures::{future, Future};

#[cfg(feature = "nightly")]
use futures::{Future, Stream};
use futures::{Stream};

#[test]
fn drop_receiver_sends_cancel_errors() {
let _ = pretty_env_logger::try_init();

future::lazy(|| {
#[derive(Debug)]
struct Custom(i32);
let (tx, rx) = super::channel::<Custom, ()>();

let promise = tx.send(Custom(43)).unwrap();
drop(rx);

promise.then(|fulfilled| {
let res = fulfilled.expect("fulfilled");
match res.unwrap_err() {
::Error::Cancel(_) => (),
e => panic!("expected Error::Cancel(_), found {:?}", e),
}

Ok::<(), ()>(())
})
}).wait().unwrap();
}

#[cfg(feature = "nightly")]
#[bench]
fn cancelable_queue_throughput(b: &mut test::Bencher) {

let (tx, mut rx) = super::channel::<i32, ()>();

b.iter(move || {
Expand Down
48 changes: 48 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use self::Error::{
Status,
Timeout,
Upgrade,
Cancel,
Io,
TooLarge,
Incomplete,
Expand Down Expand Up @@ -47,6 +48,8 @@ pub enum Error {
Timeout,
/// A protocol upgrade was encountered, but not yet supported in hyper.
Upgrade,
/// A pending item was dropped before ever being processed.
Cancel(Canceled),
/// An `io::Error` that occurred while trying to read or write to a network stream.
Io(IoError),
/// Parsing a field as string failed
Expand All @@ -56,6 +59,45 @@ pub enum Error {
__Nonexhaustive(Void)
}

impl Error {
pub(crate) fn new_canceled() -> Error {
Error::Cancel(Canceled {
_inner: (),
})
}
}

/// A pending item was dropped before ever being processed.
///
/// For example, a `Request` could be queued in the `Client`, *just*
/// as the related connection gets closed by the remote. In that case,
/// when the connection drops, the pending response future will be
/// fulfilled with this error, signaling the `Request` was never started.
pub struct Canceled {
// maybe in the future this contains an optional value of
// what was canceled?
_inner: (),
}

impl Canceled {
fn description(&self) -> &str {
"an operation was canceled internally before starting"
}
}

impl fmt::Debug for Canceled {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Canceled")
.finish()
}
}

impl fmt::Display for Canceled {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad(self.description())
}
}

#[doc(hidden)]
pub struct Void(());

Expand Down Expand Up @@ -87,6 +129,7 @@ impl StdError for Error {
Incomplete => "message is incomplete",
Timeout => "timeout",
Upgrade => "unsupported protocol upgrade",
Cancel(ref e) => e.description(),
Uri(ref e) => e.description(),
Io(ref e) => e.description(),
Utf8(ref e) => e.description(),
Expand Down Expand Up @@ -143,6 +186,11 @@ impl From<httparse::Error> for Error {
}
}

#[doc(hidden)]
trait AssertSendSync: Send + Sync + 'static {}
#[doc(hidden)]
impl AssertSendSync for Error {}

#[cfg(test)]
mod tests {
use std::error::Error as StdError;
Expand Down

0 comments on commit 88f0179

Please sign in to comment.