Skip to content

Commit

Permalink
feat(client): replace default dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Dec 29, 2017
1 parent 6fde13f commit 0892cb2
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 201 deletions.
4 changes: 1 addition & 3 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ fn main() {

let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let client = Client::configure()
.no_proto()
.build(&handle);
let client = Client::new(&handle);

let work = client.get(url).and_then(|res| {
println!("Response: {}", res.status());
Expand Down
232 changes: 55 additions & 177 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,17 @@ use std::marker::PhantomData;
use std::rc::Rc;
use std::time::Duration;

use futures::{future, Poll, Async, Future, Stream};
use futures::unsync::oneshot;
use futures::{future, Poll, Future, Stream};
#[cfg(feature = "compat")]
use http;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio_proto::BindClient;
use tokio_proto::streaming::Message;
use tokio_proto::streaming::pipeline::ClientProto;
use tokio_proto::util::client_proxy::ClientProxy;
pub use tokio_service::Service;

use header::{Headers, Host};
use proto::{self, RequestHead, TokioBody};
use proto::response;
use proto;
use proto::request;
use method::Method;
use self::pool::{Pool, Pooled};
use self::pool::Pool;
use uri::{self, Uri};
use version::HttpVersion;

Expand All @@ -45,7 +38,7 @@ pub mod compat;
pub struct Client<C, B = proto::Body> {
connector: C,
handle: Handle,
pool: Dispatch<B>,
pool: Pool<HyperClient<B>>,
}

impl Client<HttpConnector, proto::Body> {
Expand Down Expand Up @@ -93,11 +86,7 @@ impl<C, B> Client<C, B> {
Client {
connector: config.connector,
handle: handle.clone(),
pool: if config.no_proto {
Dispatch::Hyper(Pool::new(config.keep_alive, config.keep_alive_timeout))
} else {
Dispatch::Proto(Pool::new(config.keep_alive, config.keep_alive_timeout))
}
pool: Pool::new(config.keep_alive, config.keep_alive_timeout)
}
}
}
Expand Down Expand Up @@ -191,105 +180,54 @@ where C: Connect,
headers.extend(head.headers.iter());
head.headers = headers;

match self.pool {
Dispatch::Proto(ref pool) => {
trace!("proto_dispatch");
let checkout = pool.checkout(domain.as_ref());
let connect = {
let handle = self.handle.clone();
let pool = pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = oneshot::channel();
let client = HttpClient {
client_rx: RefCell::new(Some(rx)),
}.bind_client(&handle, io);
let pooled = pool.pooled(pool_key, client);
drop(tx.send(pooled.clone()));
pooled
})
};

let race = checkout.select(connect)
.map(|(client, _work)| client)
.map_err(|(e, _work)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
e.into()
});
let resp = race.and_then(move |client| {
let msg = match body {
Some(body) => {
Message::WithBody(head, body.into())
},
None => Message::WithoutBody(head),
use futures::Sink;
use futures::sync::{mpsc, oneshot};

let checkout = self.pool.checkout(domain.as_ref());
let connect = {
let handle = self.handle.clone();
let pool = self.pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = mpsc::channel(0);
let tx = HyperClient {
tx: RefCell::new(tx),
should_close: true,
};
client.call(msg)
});
FutureResponse(Box::new(resp.map(|msg| {
match msg {
Message::WithoutBody(head) => response::from_wire(head, None),
Message::WithBody(head, body) => response::from_wire(head, Some(body.into())),
}
})))
},
Dispatch::Hyper(ref pool) => {
trace!("no_proto dispatch");
use futures::Sink;
use futures::sync::{mpsc, oneshot};

let checkout = pool.checkout(domain.as_ref());
let connect = {
let handle = self.handle.clone();
let pool = pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = mpsc::channel(0);
let tx = HyperClient {
tx: RefCell::new(tx),
should_close: true,
};
let pooled = pool.pooled(pool_key, tx);
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err)));
pooled
})
};

let race = checkout.select(connect)
.map(|(client, _work)| client)
.map_err(|(e, _work)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
e.into()
});

let resp = race.and_then(move |mut client| {
let (callback, rx) = oneshot::channel();
client.tx.borrow_mut().start_send(proto::dispatch::ClientMsg::Request(head, body, callback)).unwrap();
client.should_close = false;
rx.then(|res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
Err(_) => panic!("dispatch dropped without returning error"),
}
})
});

FutureResponse(Box::new(resp))
let pooled = pool.pooled(pool_key, tx);
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err)));
pooled
})
};

}
}
let race = checkout.select(connect)
.map(|(client, _work)| client)
.map_err(|(e, _work)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
e.into()
});

let resp = race.and_then(move |mut client| {
let (callback, rx) = oneshot::channel();
client.tx.borrow_mut().start_send(proto::dispatch::ClientMsg::Request(head, body, callback)).unwrap();
client.should_close = false;
rx.then(|res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
Err(_) => panic!("dispatch dropped without returning error"),
}
})
});

FutureResponse(Box::new(resp))
}

}
Expand All @@ -299,10 +237,7 @@ impl<C: Clone, B> Clone for Client<C, B> {
Client {
connector: self.connector.clone(),
handle: self.handle.clone(),
pool: match self.pool {
Dispatch::Proto(ref pool) => Dispatch::Proto(pool.clone()),
Dispatch::Hyper(ref pool) => Dispatch::Hyper(pool.clone()),
}
pool: self.pool.clone(),
}
}
}
Expand All @@ -313,8 +248,6 @@ impl<C, B> fmt::Debug for Client<C, B> {
}
}

type ProtoClient<B> = ClientProxy<Message<RequestHead, B>, Message<proto::ResponseHead, TokioBody>, ::Error>;

struct HyperClient<B> {
tx: RefCell<::futures::sync::mpsc::Sender<proto::dispatch::ClientMsg<B>>>,
should_close: bool,
Expand All @@ -338,60 +271,6 @@ impl<B> Drop for HyperClient<B> {
}
}

enum Dispatch<B> {
Proto(Pool<ProtoClient<B>>),
Hyper(Pool<HyperClient<B>>),
}

struct HttpClient<B> {
client_rx: RefCell<Option<oneshot::Receiver<Pooled<ProtoClient<B>>>>>,
}

impl<T, B> ClientProto<T> for HttpClient<B>
where T: AsyncRead + AsyncWrite + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Request = proto::RequestHead;
type RequestBody = B::Item;
type Response = proto::ResponseHead;
type ResponseBody = proto::Chunk;
type Error = ::Error;
type Transport = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<ProtoClient<B>>>;
type BindTransport = BindingClient<T, B>;

fn bind_transport(&self, io: T) -> Self::BindTransport {
BindingClient {
rx: self.client_rx.borrow_mut().take().expect("client_rx was lost"),
io: Some(io),
}
}
}

struct BindingClient<T, B> {
rx: oneshot::Receiver<Pooled<ProtoClient<B>>>,
io: Option<T>,
}

impl<T, B> Future for BindingClient<T, B>
where T: AsyncRead + AsyncWrite + 'static,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
type Item = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<ProtoClient<B>>>;
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.rx.poll() {
Ok(Async::Ready(client)) => Ok(Async::Ready(
proto::Conn::new(self.io.take().expect("binding client io lost"), client)
)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_canceled) => unreachable!(),
}
}
}

/// Configuration for a Client
pub struct Config<C, B> {
_body_type: PhantomData<B>,
Expand Down Expand Up @@ -490,10 +369,9 @@ impl<C, B> Config<C, B> {
}
*/

/// Disable tokio-proto internal usage.
#[inline]
pub fn no_proto(mut self) -> Config<C, B> {
self.no_proto = true;
#[doc(hidden)]
#[deprecated(since="0.11.11", note="no_proto is always enabled")]
pub fn no_proto(self) -> Config<C, B> {
self
}
}
Expand Down
Loading

0 comments on commit 0892cb2

Please sign in to comment.