Skip to content

Commit

Permalink
feat(client): add ability to include SO_REUSEADDR option on sockets
Browse files Browse the repository at this point in the history
Closes #1599
  • Loading branch information
Eli Snow authored and seanmonstar committed Jul 11, 2018
1 parent 02a9c29 commit 13862d1
Showing 1 changed file with 34 additions and 10 deletions.
44 changes: 34 additions & 10 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,16 @@ mod http {
use self::http_connector::HttpConnectorBlockingTask;


fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>) -> io::Result<ConnectFuture> {
fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>, reuse_address: bool) -> io::Result<ConnectFuture> {
let builder = match addr {
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
};

if reuse_address {
builder.reuse_address(reuse_address)?;
}

if let Some(ref local_addr) = *local_addr {
// Caller has requested this socket be bound before calling connect
builder.bind(SocketAddr::new(local_addr.clone(), 0))?;
Expand Down Expand Up @@ -446,6 +450,7 @@ mod http {
nodelay: bool,
local_address: Option<IpAddr>,
happy_eyeballs_timeout: Option<Duration>,
reuse_address: bool,
}

impl HttpConnector {
Expand Down Expand Up @@ -484,6 +489,7 @@ mod http {
nodelay: false,
local_address: None,
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
reuse_address: false,
}
}

Expand Down Expand Up @@ -539,6 +545,15 @@ mod http {
pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
self.happy_eyeballs_timeout = dur;
}

/// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`.
///
/// Default is `false`.
#[inline]
pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self {
self.reuse_address = reuse_address;
self
}
}

impl fmt::Debug for HttpConnector {
Expand Down Expand Up @@ -585,6 +600,7 @@ mod http {
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
reuse_address: self.reuse_address,
}
}
}
Expand All @@ -597,6 +613,7 @@ mod http {
keep_alive_timeout: None,
nodelay: false,
happy_eyeballs_timeout: None,
reuse_address: false,
}
}

Expand Down Expand Up @@ -630,6 +647,7 @@ mod http {
keep_alive_timeout: Option<Duration>,
nodelay: bool,
happy_eyeballs_timeout: Option<Duration>,
reuse_address: bool,
}

enum State {
Expand All @@ -652,7 +670,7 @@ mod http {
// skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
state = State::Connecting(ConnectingTcp::new(
local_addr, addrs, self.happy_eyeballs_timeout));
local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address));
} else {
let host = mem::replace(host, String::new());
let work = dns::Work::new(host, port);
Expand All @@ -664,7 +682,7 @@ mod http {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(addrs) => {
state = State::Connecting(ConnectingTcp::new(
local_addr, addrs, self.happy_eyeballs_timeout));
local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address));
}
};
},
Expand Down Expand Up @@ -696,13 +714,15 @@ mod http {
local_addr: Option<IpAddr>,
preferred: ConnectingTcpRemote,
fallback: Option<ConnectingTcpFallback>,
reuse_address: bool,
}

impl ConnectingTcp {
fn new(
local_addr: Option<IpAddr>,
remote_addrs: dns::IpAddrs,
fallback_timeout: Option<Duration>,
reuse_address: bool,
) -> ConnectingTcp {
if let Some(fallback_timeout) = fallback_timeout {
let (preferred_addrs, fallback_addrs) = remote_addrs.split_by_preference();
Expand All @@ -711,6 +731,7 @@ mod http {
local_addr,
preferred: ConnectingTcpRemote::new(preferred_addrs),
fallback: None,
reuse_address,
};
}

Expand All @@ -721,12 +742,14 @@ mod http {
delay: Delay::new(Instant::now() + fallback_timeout),
remote: ConnectingTcpRemote::new(fallback_addrs),
}),
reuse_address,
}
} else {
ConnectingTcp {
local_addr,
preferred: ConnectingTcpRemote::new(remote_addrs),
fallback: None,
reuse_address,
}
}
}
Expand Down Expand Up @@ -757,6 +780,7 @@ mod http {
&mut self,
local_addr: &Option<IpAddr>,
handle: &Option<Handle>,
reuse_address: bool,
) -> Poll<TcpStream, io::Error> {
let mut err = None;
loop {
Expand All @@ -768,14 +792,14 @@ mod http {
err = Some(e);
if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr);
*current = connect(&addr, local_addr, handle)?;
*current = connect(&addr, local_addr, handle, reuse_address)?;
continue;
}
}
}
} else if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr);
self.current = Some(connect(&addr, local_addr, handle)?);
self.current = Some(connect(&addr, local_addr, handle, reuse_address)?);
continue;
}

Expand All @@ -788,14 +812,14 @@ mod http {
// not a Future, since passing a &Handle to poll
fn poll(&mut self, handle: &Option<Handle>) -> Poll<TcpStream, io::Error> {
match self.fallback.take() {
None => self.preferred.poll(&self.local_addr, handle),
Some(mut fallback) => match self.preferred.poll(&self.local_addr, handle) {
None => self.preferred.poll(&self.local_addr, handle, self.reuse_address),
Some(mut fallback) => match self.preferred.poll(&self.local_addr, handle, self.reuse_address) {
Ok(Async::Ready(stream)) => {
// Preferred successful - drop fallback.
Ok(Async::Ready(stream))
}
Ok(Async::NotReady) => match fallback.delay.poll() {
Ok(Async::Ready(_)) => match fallback.remote.poll(&self.local_addr, handle) {
Ok(Async::Ready(_)) => match fallback.remote.poll(&self.local_addr, handle, self.reuse_address) {
Ok(Async::Ready(stream)) => {
// Fallback successful - drop current preferred,
// but keep fallback as new preferred.
Expand Down Expand Up @@ -825,7 +849,7 @@ mod http {
Err(_) => {
// Preferred failed - use fallback as new preferred.
self.preferred = fallback.remote;
self.preferred.poll(&self.local_addr, handle)
self.preferred.poll(&self.local_addr, handle, self.reuse_address)
}
}
}
Expand Down Expand Up @@ -980,7 +1004,7 @@ mod http {
}

let addrs = hosts.iter().map(|host| (host.clone(), addr.port()).into()).collect();
let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout));
let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout), false);
let fut = ConnectingTcpFuture(connecting_tcp);

let start = Instant::now();
Expand Down

0 comments on commit 13862d1

Please sign in to comment.