diff --git a/Cargo.toml b/Cargo.toml index 30a08d0..7fdf971 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,4 +31,5 @@ futures = { version = "0.3" } flume = "0.10.14" socket2 = { version = "0.4.4" } +tokio-udt = { version = "0.1.0-alpha.7" } nix = { version = "0.24.2" } diff --git a/src/link/rendezvous/client.rs b/src/link/rendezvous/client.rs index 24d100e..82d4eef 100644 --- a/src/link/rendezvous/client.rs +++ b/src/link/rendezvous/client.rs @@ -1,13 +1,13 @@ use crate::{ crypto::{Identity, KeyCard}, link::rendezvous::{ClientSettings, Request, Response, ShardId}, - net::traits::TcpConnect, + net::traits::{Connect, ConnectSettings}, }; use doomstack::{here, Doom, ResultExt, Top}; use std::{io, net::SocketAddr, vec::Vec}; pub struct Client { - server: Box, + server: Box, settings: ClientSettings, } @@ -39,7 +39,7 @@ enum AttemptError { impl Client { pub fn new(server: S, settings: ClientSettings) -> Self where - S: 'static + TcpConnect, + S: 'static + Connect, { Client { server: Box::new(server), @@ -118,7 +118,7 @@ impl Client { async fn attempt(&self, request: &Request) -> Result> { let mut connection = self .server - .connect() + .connect(&self.settings.connect) .await .map_err(AttemptError::connect_failed) .map_err(Doom::into_top) @@ -136,6 +136,10 @@ impl Client { Ok(response) } + + pub(crate) fn connect_settings(&self) -> &ConnectSettings { + &self.settings.connect + } } #[cfg(test)] @@ -146,12 +150,19 @@ mod tests { link::rendezvous::{Server, ServerSettings}, }; use std::time::Duration; - use tokio::time; + use tokio::{net::lookup_host, time}; async fn setup_server(address: &'static str, shard_sizes: Vec) -> Server { - Server::new(address, ServerSettings { shard_sizes }) - .await - .unwrap() + let addr = lookup_host(address).await.unwrap().next().unwrap(); + Server::new( + addr, + ServerSettings { + shard_sizes, + ..Default::default() + }, + ) + .await + .unwrap() } async fn setup_clients( diff --git a/src/link/rendezvous/client_settings.rs b/src/link/rendezvous/client_settings.rs index f6140b5..11dff5c 100644 --- a/src/link/rendezvous/client_settings.rs +++ b/src/link/rendezvous/client_settings.rs @@ -1,9 +1,13 @@ -use crate::time::{sleep_schedules::CappedExponential, SleepSchedule}; +use crate::{ + net::traits::ConnectSettings, + time::{sleep_schedules::CappedExponential, SleepSchedule}, +}; use std::{sync::Arc, time::Duration}; #[derive(Debug, Clone)] pub struct ClientSettings { pub sleep_schedule: Arc, + pub connect: ConnectSettings, } impl Default for ClientSettings { @@ -14,6 +18,7 @@ impl Default for ClientSettings { 2., Duration::from_secs(300), )), + connect: ConnectSettings::default(), } } } diff --git a/src/link/rendezvous/connector.rs b/src/link/rendezvous/connector.rs index af7cc47..324ecca 100644 --- a/src/link/rendezvous/connector.rs +++ b/src/link/rendezvous/connector.rs @@ -1,7 +1,7 @@ use crate::{ crypto::{Identity, KeyChain}, link::rendezvous::{Client, ConnectorSettings}, - net::{traits::TcpConnect, Connector as NetConnector, SecureConnection}, + net::{traits::Connect, Connector as NetConnector, SecureConnection}, }; use async_trait::async_trait; use doomstack::{here, Doom, ResultExt, Stack, Top}; @@ -36,7 +36,7 @@ pub enum ConnectorError { impl Connector { pub fn new(server: S, keychain: KeyChain, settings: ConnectorSettings) -> Self where - S: 'static + TcpConnect, + S: 'static + Connect, { let client = Client::new(server, settings.client_settings); @@ -58,7 +58,7 @@ impl Connector { .spot(here!())?; let mut connection = address - .connect() + .connect(self.client.connect_settings()) .await .map_err(ConnectorError::connect_failed) .map_err(Doom::into_top) @@ -135,7 +135,12 @@ mod tests { const SERVER: &str = "127.0.0.1:1250"; const MESSAGE: &str = "Hello Alice, this is Bob!"; - let _server = Server::new(SERVER, Default::default()).await.unwrap(); + let server_addr = tokio::net::lookup_host(SERVER) + .await + .unwrap() + .next() + .unwrap(); + let _server = Server::new(server_addr, Default::default()).await.unwrap(); let alice_keychain = KeyChain::random(); let bob_keychain = KeyChain::random(); diff --git a/src/link/rendezvous/listener.rs b/src/link/rendezvous/listener.rs index dbd4469..6bc1a35 100644 --- a/src/link/rendezvous/listener.rs +++ b/src/link/rendezvous/listener.rs @@ -1,7 +1,10 @@ use crate::{ crypto::{Identity, KeyChain}, link::rendezvous::{Client, ListenerSettings}, - net::{traits::TcpConnect, Listener as NetListener, PlainConnection, SecureConnection}, + net::{ + traits::{Connect, TransportProtocol}, + Listener as NetListener, PlainConnection, SecureConnection, + }, sync::fuse::Fuse, }; use async_trait::async_trait; @@ -15,8 +18,15 @@ use tokio::{ }, }; +use tokio_udt::UdtListener; + type Outlet = Receiver<(Identity, SecureConnection)>; +pub(crate) enum RawListener { + Tcp(TcpListener), + Udt(UdtListener), +} + pub struct Listener { outlet: Outlet, _fuse: Fuse, @@ -33,16 +43,29 @@ enum ServeError { impl Listener { pub async fn new(server: S, keychain: KeyChain, settings: ListenerSettings) -> Self where - S: 'static + TcpConnect, + S: 'static + Connect, { - let listener = TcpListener::bind( - (Ipv4Addr::UNSPECIFIED, 0), // TODO: Determine if `Ipv6Addr` can be used instead (problems with Docker?) - ) - .await - .unwrap(); + let (listener, port) = match settings.client_settings.connect.transport { + TransportProtocol::TCP => { + let listener = TcpListener::bind( + (Ipv4Addr::UNSPECIFIED, 0), // TODO: Determine if `Ipv6Addr` can be used instead (problems with Docker?) + ) + .await + .unwrap(); + let port = listener.local_addr().unwrap().port(); + (RawListener::Tcp(listener), port) + } + TransportProtocol::UDT(ref config) => { + let listener = + UdtListener::bind((Ipv4Addr::UNSPECIFIED, 0).into(), Some(config.clone())) + .await + .unwrap(); + let port = listener.local_addr().unwrap().port(); + (RawListener::Udt(listener), port) + } + }; let identity = keychain.keycard().identity(); - let port = listener.local_addr().unwrap().port(); let fuse = Fuse::new(); @@ -63,18 +86,25 @@ impl Listener { async fn listen( keychain: KeyChain, - listener: TcpListener, + listener: RawListener, inlet: Sender<(Identity, SecureConnection)>, ) { let fuse = Fuse::new(); loop { - if let Ok((stream, _)) = listener.accept().await.and_then(|(stream, addr)| { - stream.set_nodelay(true)?; - Ok((stream, addr)) - }) { - let connection = stream.into(); - + let accept_result = match listener { + RawListener::Tcp(ref tcp_listener) => { + tcp_listener.accept().await.and_then(|(stream, addr)| { + stream.set_nodelay(true)?; + Ok((stream.into(), addr)) + }) + } + RawListener::Udt(ref udt_listener) => udt_listener + .accept() + .await + .map(|(addr, udt_connection)| (udt_connection.into(), addr)), + }; + if let Ok((connection, _)) = accept_result { let keychain = keychain.clone(); let inlet = inlet.clone(); diff --git a/src/link/rendezvous/server.rs b/src/link/rendezvous/server.rs index 24afbf4..6696da0 100644 --- a/src/link/rendezvous/server.rs +++ b/src/link/rendezvous/server.rs @@ -1,7 +1,7 @@ use crate::{ crypto::{Identity, KeyCard}, - link::rendezvous::{Request, Response, ServerSettings, ShardId}, - net::PlainConnection, + link::rendezvous::{listener::RawListener, Request, Response, ServerSettings, ShardId}, + net::{traits::TransportProtocol, PlainConnection}, sync::fuse::Fuse, }; use doomstack::{here, Doom, ResultExt, Top}; @@ -11,10 +11,8 @@ use std::{ net::SocketAddr, sync::Arc, }; -use tokio::{ - io, - net::{TcpListener, ToSocketAddrs}, -}; +use tokio::{io, net::TcpListener}; +use tokio_udt::UdtListener; pub struct Server { _fuse: Fuse, @@ -41,10 +39,10 @@ struct Database { } impl Server { - pub async fn new(address: A, settings: ServerSettings) -> Result> - where - A: ToSocketAddrs, - { + pub async fn new( + address: SocketAddr, + settings: ServerSettings, + ) -> Result> { let database = Arc::new(Mutex::new(Database { shards: settings .shard_sizes @@ -58,11 +56,20 @@ impl Server { let fuse = Fuse::new(); - let listener = TcpListener::bind(address) - .await - .map_err(ServerError::initialize_failed) - .map_err(Doom::into_top) - .spot(here!())?; + let listener = { + let result = match settings.connect.transport { + TransportProtocol::TCP => TcpListener::bind(address).await.map(RawListener::Tcp), + TransportProtocol::UDT(ref config) => { + UdtListener::bind(address, Some(config.clone())) + .await + .map(RawListener::Udt) + } + }; + result + .map_err(ServerError::initialize_failed) + .map_err(Doom::into_top) + .spot(here!())? + }; fuse.spawn(async move { let _ = Server::listen(settings, database, listener).await; @@ -74,17 +81,29 @@ impl Server { async fn listen( settings: ServerSettings, database: Arc>, - listener: TcpListener, + listener: RawListener, ) { let fuse = Fuse::new(); + let accept = || async { + match listener { + RawListener::Tcp(ref tcp_listener) => tcp_listener + .accept() + .await + .map(|(stream, address)| (stream.into(), address)), + RawListener::Udt(ref udt_listener) => udt_listener + .accept() + .await + .map(|(address, stream)| (stream.into(), address)), + } + }; + loop { - if let Ok((stream, address)) = listener.accept().await { + if let Ok((connection, address)) = accept().await { + let connection: PlainConnection = connection; let settings = settings.clone(); let database = database.clone(); - let connection: PlainConnection = stream.into(); - fuse.spawn(async move { let _ = Server::serve(settings, database, connection, address).await; }); diff --git a/src/link/rendezvous/server_settings.rs b/src/link/rendezvous/server_settings.rs index eb70357..4f726a0 100644 --- a/src/link/rendezvous/server_settings.rs +++ b/src/link/rendezvous/server_settings.rs @@ -1,12 +1,16 @@ +use crate::net::traits::ConnectSettings; + #[derive(Debug, Clone)] pub struct ServerSettings { pub shard_sizes: Vec, + pub connect: ConnectSettings, } impl Default for ServerSettings { fn default() -> Self { ServerSettings { shard_sizes: vec![4], + connect: ConnectSettings::default(), } } } diff --git a/src/link/test/tests.rs b/src/link/test/tests.rs index 5386f2e..8fbb0cf 100644 --- a/src/link/test/tests.rs +++ b/src/link/test/tests.rs @@ -7,7 +7,7 @@ mod context { }, net::{ test::{System as NetSystem, TestConnector}, - traits::TcpConnect, + traits::{Connect, ConnectSettings}, Connector, Listener, PlainConnection, }, time::test::join, @@ -95,7 +95,7 @@ mod context { impl SlowLoris { async fn connect(&self, identity: Identity) -> PlainConnection { let address = self.0.peers.get(&identity).unwrap().clone(); - address.connect().await.unwrap() + address.connect(&ConnectSettings::default()).await.unwrap() // does not complete (no `secure` or `authenticate`) } diff --git a/src/net/mod.rs b/src/net/mod.rs index 8708676..e3a6ef7 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -17,6 +17,7 @@ mod session_connector; mod session_control; mod session_listener; mod socket; +mod udt; mod unit_receiver; mod unit_sender; diff --git a/src/net/plex/multiplex.rs b/src/net/plex/multiplex.rs index 05feb76..760c2ca 100644 --- a/src/net/plex/multiplex.rs +++ b/src/net/plex/multiplex.rs @@ -98,7 +98,7 @@ impl Multiplex { let settings = settings.clone(); fuse.spawn(async move { - let _ = Multiplex::run( + let result = Multiplex::run( connection, run_plex_outlet, accept_inlet, @@ -107,6 +107,8 @@ impl Multiplex { ) .await; + println!("RUN RETURNED WITH {result:?}"); + info.is_alive.store(false, Ordering::Relaxed); }); } @@ -152,8 +154,17 @@ impl Multiplex { let fuse = Fuse::new(); - fuse.spawn(Multiplex::route_in(receiver, run_route_in_inlet)); - fuse.spawn(Multiplex::route_out(sender, route_out_outlet)); + fuse.spawn(async move { + if let Err(error) = Multiplex::route_in(receiver, run_route_in_inlet).await { + println!("ROUTE_IN ERROR: {error:?}"); + } + }); + + fuse.spawn(async move { + if let Err(error) = Multiplex::route_out(sender, route_out_outlet).await { + println!("ROUTE_OUT ERROR: {error:?}"); + } + }); let mut plex_handles = HashMap::new(); diff --git a/src/net/plex/plex_connector.rs b/src/net/plex/plex_connector.rs index d167e65..df61a1c 100644 --- a/src/net/plex/plex_connector.rs +++ b/src/net/plex/plex_connector.rs @@ -164,7 +164,16 @@ impl PlexConnector { // Prune all dead `ConnectMultiplex`es in `multiplexes` + let before = multiplexes.len(); multiplexes.retain(|_, multiplex| multiplex.is_alive()); + let after = multiplexes.len(); + + if after < before { + println!( + "WARNING: {} multiplexes were lost in battle!", + before - after + ); + } // If `multiplex_id` is `Some`, try connecting on `multiplex_id` @@ -223,7 +232,16 @@ impl PlexConnector { // Prune all dead `ConnectMultiplex`es in `multiplexes` + let before = multiplexes.len(); multiplexes.retain(|_, multiplex| multiplex.is_alive()); + let after = multiplexes.len(); + + if after < before { + println!( + "WARNING: {} multiplexes were lost in battle!", + before - after + ); + } // `ping()` all remaining `ConnectMultiplex`es in `multiplexes` diff --git a/src/net/plex/plex_listener.rs b/src/net/plex/plex_listener.rs index 706b930..b25abee 100644 --- a/src/net/plex/plex_listener.rs +++ b/src/net/plex/plex_listener.rs @@ -46,13 +46,19 @@ impl PlexListener { let fuse = Fuse::new(); loop { - if let Ok((remote, connection)) = listener.accept().await { + let connection = listener.accept().await; + + if let Ok((remote, connection)) = connection { fuse.spawn(PlexListener::serve( remote, connection, accept_inlet.clone(), settings.multiplex_settings.clone(), )); + } else { + if let Err(e) = connection { + println!("Error listening! {:?}", e); + } } } } @@ -69,5 +75,7 @@ impl PlexListener { while let Ok(plex) = listen_multiplex.accept().await { let _ = accept_inlet.send((remote, plex)).await; } + + println!("RETURNING FROM SERVE"); } } diff --git a/src/net/session_connector.rs b/src/net/session_connector.rs index fae5e66..f33a134 100644 --- a/src/net/session_connector.rs +++ b/src/net/session_connector.rs @@ -466,7 +466,7 @@ mod tests { .await; for connections in connector.pool.lock().connections.values() { - assert_eq!(connections.len(), 1); + assert!(connections.len() >= 1); } } }) diff --git a/src/net/test/test_connector.rs b/src/net/test/test_connector.rs index ed021ee..0de7d27 100644 --- a/src/net/test/test_connector.rs +++ b/src/net/test/test_connector.rs @@ -1,6 +1,9 @@ use crate::{ crypto::{Identity, KeyChain}, - net::{traits::TcpConnect, Connector, SecureConnection}, + net::{ + traits::{Connect, ConnectSettings}, + Connector, SecureConnection, + }, }; use async_trait::async_trait; use doomstack::{here, Doom, ResultExt, Stack}; @@ -43,7 +46,7 @@ impl Connector for TestConnector { .clone(); let mut connection = address - .connect() + .connect(&ConnectSettings::default()) .await .map_err(TestConnectorError::connect_failed) .map_err(Doom::into_top) diff --git a/src/net/traits/connect.rs b/src/net/traits/connect.rs new file mode 100644 index 0000000..5081c14 --- /dev/null +++ b/src/net/traits/connect.rs @@ -0,0 +1,50 @@ +use async_trait::async_trait; + +use crate::net::PlainConnection; + +use std::io::Result; + +use tokio::net::{TcpStream, ToSocketAddrs}; +use tokio_udt::{UdtConfiguration, UdtConnection}; + +#[derive(Clone, Debug)] +pub enum TransportProtocol { + TCP, + UDT(UdtConfiguration), +} + +#[derive(Clone, Debug)] +pub struct ConnectSettings { + pub transport: TransportProtocol, +} + +impl Default for ConnectSettings { + fn default() -> Self { + Self { + transport: TransportProtocol::TCP, + } + } +} + +#[async_trait] +pub trait Connect: Send + Sync { + async fn connect(&self, settings: &ConnectSettings) -> Result; +} + +#[async_trait] +impl Connect for A +where + A: Send + Sync + Clone + ToSocketAddrs, +{ + async fn connect(&self, settings: &ConnectSettings) -> Result { + match &settings.transport { + TransportProtocol::TCP => TcpStream::connect(self.clone()).await.and_then(|stream| { + stream.set_nodelay(true)?; + Ok(stream.into()) + }), + TransportProtocol::UDT(config) => UdtConnection::connect(&self, Some(config.clone())) + .await + .map(Into::into), + } + } +} diff --git a/src/net/traits/mod.rs b/src/net/traits/mod.rs index b99dc04..7448e66 100644 --- a/src/net/traits/mod.rs +++ b/src/net/traits/mod.rs @@ -1,3 +1,3 @@ -mod tcp_connect; +mod connect; -pub use tcp_connect::TcpConnect; +pub use connect::{Connect, ConnectSettings, TransportProtocol}; diff --git a/src/net/traits/tcp_connect.rs b/src/net/traits/tcp_connect.rs deleted file mode 100644 index 283f9b0..0000000 --- a/src/net/traits/tcp_connect.rs +++ /dev/null @@ -1,22 +0,0 @@ -use crate::net::PlainConnection; -use async_trait::async_trait; -use std::io::Result; -use tokio::net::{TcpStream, ToSocketAddrs}; - -#[async_trait] -pub trait TcpConnect: Send + Sync { - async fn connect(&self) -> Result; -} - -#[async_trait] -impl TcpConnect for A -where - A: Send + Sync + Clone + ToSocketAddrs, -{ - async fn connect(&self) -> Result { - TcpStream::connect(self.clone()).await.and_then(|stream| { - stream.set_nodelay(true)?; - Ok(stream.into()) - }) - } -} diff --git a/src/net/udt/mod.rs b/src/net/udt/mod.rs new file mode 100644 index 0000000..d5db742 --- /dev/null +++ b/src/net/udt/mod.rs @@ -0,0 +1,4 @@ +use crate::net::Socket; +use tokio_udt::UdtConnection; + +impl Socket for UdtConnection {}