From 26be57e12f7d6b0195afc2a948c9df02b2490d6b Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Mon, 13 Jun 2022 18:13:55 +0200 Subject: [PATCH 01/16] init tokio-udt --- Cargo.toml | 2 ++ src/net/udt/mod.rs | 4 ++++ 2 files changed, 6 insertions(+) create mode 100644 src/net/udt/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 07ee786..60622c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,3 +32,5 @@ futures = { version = "0.3" } socket2 = { version = "0.4.4" } nix = { version = "0.24.2" } num_cpus = { version = "1.13" } + +tokio-udt = { git = "https://github.com/amatissart/tokio-udt", rev = "99545ed" } diff --git a/src/net/udt/mod.rs b/src/net/udt/mod.rs new file mode 100644 index 0000000..8e8a0c5 --- /dev/null +++ b/src/net/udt/mod.rs @@ -0,0 +1,4 @@ +use tokio_udt::UdtConnection; +use crate::net::Socket; + +impl Socket for UdtConnection {}; From ce4717ac5af87ed0312ac5de22df262ae5ce91c6 Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Fri, 1 Jul 2022 10:47:09 +0200 Subject: [PATCH 02/16] use tokio-udt published version --- Cargo.toml | 2 +- src/net/mod.rs | 1 + src/net/udt/mod.rs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 60622c2..1570660 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,4 +33,4 @@ socket2 = { version = "0.4.4" } nix = { version = "0.24.2" } num_cpus = { version = "1.13" } -tokio-udt = { git = "https://github.com/amatissart/tokio-udt", rev = "99545ed" } +tokio-udt = "0.1.0-alpha.1" diff --git a/src/net/mod.rs b/src/net/mod.rs index ae4b4a8..72f0100 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -16,6 +16,7 @@ mod session_connector; mod session_control; mod session_listener; mod socket; +mod udt; mod unit_receiver; mod unit_sender; diff --git a/src/net/udt/mod.rs b/src/net/udt/mod.rs index 8e8a0c5..873cb91 100644 --- a/src/net/udt/mod.rs +++ b/src/net/udt/mod.rs @@ -1,4 +1,4 @@ use tokio_udt::UdtConnection; use crate::net::Socket; -impl Socket for UdtConnection {}; +impl Socket for UdtConnection {} From e6d8dd9248f7a5bf2830cbad24fd1599ab8a647e Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Thu, 14 Jul 2022 15:55:02 +0200 Subject: [PATCH 03/16] wip: run tests with UDT connections --- Cargo.toml | 2 +- src/net/test/test_listener.rs | 15 +++++++++++---- src/net/traits/tcp_connect.rs | 31 ++++++++++++++++++++++++++----- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1570660..293a376 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,4 +33,4 @@ socket2 = { version = "0.4.4" } nix = { version = "0.24.2" } num_cpus = { version = "1.13" } -tokio-udt = "0.1.0-alpha.1" +tokio-udt = { git = "https://github.com/amatissart/tokio-udt/", rev="f5f3f40" } diff --git a/src/net/test/test_listener.rs b/src/net/test/test_listener.rs index 9b3c044..8ca8400 100644 --- a/src/net/test/test_listener.rs +++ b/src/net/test/test_listener.rs @@ -18,6 +18,8 @@ use tokio::{ }, }; +use tokio_udt::{UdtListener, UdtConfiguration}; + const CHANNEL_CAPACITY: usize = 32; type Outlet = Receiver<(Identity, SecureConnection)>; @@ -37,7 +39,11 @@ enum ServeError { impl TestListener { pub async fn new(keychain: KeyChain) -> (Self, SocketAddr) { - let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap(); + let bind_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + let listener = UdtListener::bind(bind_addr, Some(UdtConfiguration { + reuse_mux: false, + ..Default::default() + })).await.unwrap(); let address = listener.local_addr().unwrap(); @@ -60,14 +66,15 @@ impl TestListener { async fn listen( keychain: KeyChain, - listener: TcpListener, + listener: UdtListener, inlet: Sender<(Identity, SecureConnection)>, ) { let fuse = Fuse::new(); loop { - if let Ok((stream, _)) = listener.accept().await { - let connection = stream.into(); + if let Ok((_addr, udt_connection)) = listener.accept().await { + println!("Accepted {}", _addr); + let connection = udt_connection.into(); let keychain = keychain.clone(); let inlet = inlet.clone(); diff --git a/src/net/traits/tcp_connect.rs b/src/net/traits/tcp_connect.rs index 104a65e..31e4f02 100644 --- a/src/net/traits/tcp_connect.rs +++ b/src/net/traits/tcp_connect.rs @@ -4,7 +4,25 @@ use crate::net::PlainConnection; use std::io::Result; -use tokio::net::{TcpStream, ToSocketAddrs}; +use tokio::net::{TcpStream, ToSocketAddrs, lookup_host}; + +use tokio_udt::{UdtConnection, UdtConfiguration}; + +// TODO: Define generic Connect +// pub enum TransportType { +// TCP, +// UDT, +// } + +// struct ConnectSettings { +// transport: // TCP / UDT +// } + +// #[async_trait] +// pub trait Connect: Send + Sync { +// async fn connect(&self, settings: ConnectSettings) -> Result; +// } + #[async_trait] pub trait TcpConnect: Send + Sync { @@ -17,9 +35,12 @@ where A: Send + Sync + Clone + ToSocketAddrs, { async fn connect(&self) -> Result { - TcpStream::connect(self.clone()).await.and_then(|stream| { - stream.set_nodelay(true)?; - Ok(stream.into()) - }) + // TcpStream::connect(self.clone()).await.and_then(|stream| { + // stream.set_nodelay(true)?; + // Ok(stream.into()) + // }) + + let addr = lookup_host(self).await?.next().expect("no addr found"); + UdtConnection::connect(addr, None).await.map(Into::into) } } From 178f3672e59a23a5487a4b7568a609460b654a38 Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Wed, 20 Jul 2022 18:13:43 +0200 Subject: [PATCH 04/16] poc: use UDT instead of TCP --- Cargo.toml | 3 ++- src/link/rendezvous/client.rs | 4 +++- src/link/rendezvous/connector.rs | 7 ++++++- src/link/rendezvous/listener.rs | 30 +++++++++++++++------------ src/link/rendezvous/server.rs | 35 ++++++++++++++++++-------------- src/net/session_connector.rs | 2 +- src/net/test/test_listener.rs | 25 ++++++++++++----------- src/net/traits/tcp_connect.rs | 15 ++++++++++---- src/net/udt/mod.rs | 2 +- 9 files changed, 74 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 293a376..6eea338 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,4 +33,5 @@ socket2 = { version = "0.4.4" } nix = { version = "0.24.2" } num_cpus = { version = "1.13" } -tokio-udt = { git = "https://github.com/amatissart/tokio-udt/", rev="f5f3f40" } +#tokio-udt = { git = "https://github.com/amatissart/tokio-udt/", rev="f9fdae" } +tokio-udt = "0.1.0-alpha.4" diff --git a/src/link/rendezvous/client.rs b/src/link/rendezvous/client.rs index 40dbc7a..0f73aba 100644 --- a/src/link/rendezvous/client.rs +++ b/src/link/rendezvous/client.rs @@ -151,10 +151,12 @@ mod tests { use std::time::Duration; + use tokio::net::lookup_host; use tokio::time; async fn setup_server(address: &'static str, shard_sizes: Vec) -> Server { - Server::new(address, ServerSettings { shard_sizes }) + let addr = lookup_host(address).await.unwrap().next().unwrap(); + Server::new(addr, ServerSettings { shard_sizes }) .await .unwrap() } diff --git a/src/link/rendezvous/connector.rs b/src/link/rendezvous/connector.rs index 4b68dc7..a977ace 100644 --- a/src/link/rendezvous/connector.rs +++ b/src/link/rendezvous/connector.rs @@ -140,7 +140,12 @@ mod tests { const SERVER: &str = "127.0.0.1:1250"; const MESSAGE: &str = "Hello Alice, this is Bob!"; - let _server = Server::new(SERVER, Default::default()).await.unwrap(); + let server_addr = tokio::net::lookup_host(SERVER) + .await + .unwrap() + .next() + .unwrap(); + let _server = Server::new(server_addr, Default::default()).await.unwrap(); let alice_keychain = KeyChain::random(); let bob_keychain = KeyChain::random(); diff --git a/src/link/rendezvous/listener.rs b/src/link/rendezvous/listener.rs index 3cd816e..08fe587 100644 --- a/src/link/rendezvous/listener.rs +++ b/src/link/rendezvous/listener.rs @@ -11,14 +11,13 @@ use doomstack::{here, Doom, ResultExt, Stack, Top}; use std::net::Ipv4Addr; -use tokio::{ - net::TcpListener, - sync::{ - mpsc, - mpsc::{Receiver, Sender}, - }, +use tokio::sync::{ + mpsc, + mpsc::{Receiver, Sender}, }; +use tokio_udt::{UdtConfiguration, UdtListener}; + type Outlet = Receiver<(Identity, SecureConnection)>; pub struct Listener { @@ -39,8 +38,12 @@ impl Listener { where S: 'static + TcpConnect, { - let listener = TcpListener::bind( - (Ipv4Addr::UNSPECIFIED, 0), // TODO: Determine if `Ipv6Addr` can be used instead (problems with Docker?) + let listener = UdtListener::bind( + (Ipv4Addr::UNSPECIFIED, 0).into(), + Some(UdtConfiguration { + reuse_mux: false, + ..Default::default() + }), // TODO: Determine if `Ipv6Addr` can be used instead (problems with Docker?) ) .await .unwrap(); @@ -67,16 +70,17 @@ impl Listener { async fn listen( keychain: KeyChain, - listener: TcpListener, + listener: UdtListener, inlet: Sender<(Identity, SecureConnection)>, ) { let fuse = Fuse::new(); loop { - if let Ok((stream, _)) = listener.accept().await.and_then(|(stream, addr)| { - stream.set_nodelay(true)?; - Ok((stream, addr)) - }) { + // if let Ok((stream, _)) = listener.accept().await.and_then(|(stream, addr)| { + // stream.set_nodelay(true)?; + // Ok((stream, addr)) + // }) { + if let Ok((_addr, stream)) = listener.accept().await { let connection = stream.into(); let keychain = keychain.clone(); diff --git a/src/link/rendezvous/server.rs b/src/link/rendezvous/server.rs index 5aad6b2..31f11b4 100644 --- a/src/link/rendezvous/server.rs +++ b/src/link/rendezvous/server.rs @@ -15,10 +15,9 @@ use std::{ sync::Arc, }; -use tokio::{ - io, - net::{TcpListener, ToSocketAddrs}, -}; +use tokio::{io, net::ToSocketAddrs}; + +use tokio_udt::{UdtConfiguration, UdtListener}; pub struct Server { _fuse: Fuse, @@ -45,10 +44,10 @@ struct Database { } impl Server { - pub async fn new(address: A, settings: ServerSettings) -> Result> - where - A: ToSocketAddrs, - { + pub async fn new( + address: SocketAddr, + settings: ServerSettings, + ) -> Result> { let database = Arc::new(Mutex::new(Database { shards: settings .shard_sizes @@ -62,11 +61,17 @@ impl Server { let fuse = Fuse::new(); - let listener = TcpListener::bind(address) - .await - .map_err(ServerError::initialize_failed) - .map_err(Doom::into_top) - .spot(here!())?; + let listener = UdtListener::bind( + address, + Some(UdtConfiguration { + reuse_mux: false, + ..Default::default() + }), + ) + .await + .map_err(ServerError::initialize_failed) + .map_err(Doom::into_top) + .spot(here!())?; fuse.spawn(async move { let _ = Server::listen(settings, database, listener).await; @@ -78,12 +83,12 @@ impl Server { async fn listen( settings: ServerSettings, database: Arc>, - listener: TcpListener, + listener: UdtListener, ) { let fuse = Fuse::new(); loop { - if let Ok((stream, address)) = listener.accept().await { + if let Ok((address, stream)) = listener.accept().await { let settings = settings.clone(); let database = database.clone(); diff --git a/src/net/session_connector.rs b/src/net/session_connector.rs index 712beea..c0fa4df 100644 --- a/src/net/session_connector.rs +++ b/src/net/session_connector.rs @@ -472,7 +472,7 @@ mod tests { .await; for connections in connector.pool.lock().connections.values() { - assert_eq!(connections.len(), 1); + assert!(connections.len() >= 1); } } }) diff --git a/src/net/test/test_listener.rs b/src/net/test/test_listener.rs index 8ca8400..ff492e3 100644 --- a/src/net/test/test_listener.rs +++ b/src/net/test/test_listener.rs @@ -10,15 +10,12 @@ use doomstack::{here, Doom, ResultExt, Stack, Top}; use std::net::{Ipv4Addr, SocketAddr}; -use tokio::{ - net::TcpListener, - sync::{ - mpsc, - mpsc::{Receiver, Sender}, - }, +use tokio::sync::{ + mpsc, + mpsc::{Receiver, Sender}, }; -use tokio_udt::{UdtListener, UdtConfiguration}; +use tokio_udt::{UdtConfiguration, UdtListener}; const CHANNEL_CAPACITY: usize = 32; @@ -40,10 +37,15 @@ enum ServeError { impl TestListener { pub async fn new(keychain: KeyChain) -> (Self, SocketAddr) { let bind_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); - let listener = UdtListener::bind(bind_addr, Some(UdtConfiguration { - reuse_mux: false, - ..Default::default() - })).await.unwrap(); + let listener = UdtListener::bind( + bind_addr, + Some(UdtConfiguration { + reuse_mux: false, + ..Default::default() + }), + ) + .await + .unwrap(); let address = listener.local_addr().unwrap(); @@ -73,7 +75,6 @@ impl TestListener { loop { if let Ok((_addr, udt_connection)) = listener.accept().await { - println!("Accepted {}", _addr); let connection = udt_connection.into(); let keychain = keychain.clone(); diff --git a/src/net/traits/tcp_connect.rs b/src/net/traits/tcp_connect.rs index 31e4f02..b03311f 100644 --- a/src/net/traits/tcp_connect.rs +++ b/src/net/traits/tcp_connect.rs @@ -4,9 +4,9 @@ use crate::net::PlainConnection; use std::io::Result; -use tokio::net::{TcpStream, ToSocketAddrs, lookup_host}; +use tokio::net::{lookup_host, TcpStream, ToSocketAddrs}; -use tokio_udt::{UdtConnection, UdtConfiguration}; +use tokio_udt::{UdtConfiguration, UdtConnection}; // TODO: Define generic Connect // pub enum TransportType { @@ -23,7 +23,6 @@ use tokio_udt::{UdtConnection, UdtConfiguration}; // async fn connect(&self, settings: ConnectSettings) -> Result; // } - #[async_trait] pub trait TcpConnect: Send + Sync { async fn connect(&self) -> Result; @@ -41,6 +40,14 @@ where // }) let addr = lookup_host(self).await?.next().expect("no addr found"); - UdtConnection::connect(addr, None).await.map(Into::into) + UdtConnection::connect( + addr, + Some(UdtConfiguration { + reuse_mux: false, + ..Default::default() + }), + ) + .await + .map(Into::into) } } diff --git a/src/net/udt/mod.rs b/src/net/udt/mod.rs index 873cb91..d5db742 100644 --- a/src/net/udt/mod.rs +++ b/src/net/udt/mod.rs @@ -1,4 +1,4 @@ -use tokio_udt::UdtConnection; use crate::net::Socket; +use tokio_udt::UdtConnection; impl Socket for UdtConnection {} From 7ebde03ff73366a33463e4e4605075c15d37718b Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Fri, 5 Aug 2022 18:12:18 +0200 Subject: [PATCH 05/16] make transport protocol generic (tcp/udt) --- Cargo.toml | 2 +- src/link/rendezvous/client.rs | 20 ++++++--- src/link/rendezvous/client_settings.rs | 7 ++- src/link/rendezvous/connector.rs | 2 +- src/link/rendezvous/listener.rs | 60 ++++++++++++++++++------- src/link/rendezvous/server.rs | 53 ++++++++++++++-------- src/link/rendezvous/server_settings.rs | 4 ++ src/link/test/tests.rs | 4 +- src/net/test/test_connector.rs | 7 ++- src/net/traits/mod.rs | 2 +- src/net/traits/tcp_connect.rs | 62 +++++++++++++------------- 11 files changed, 145 insertions(+), 78 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6eea338..45e24dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,4 +34,4 @@ nix = { version = "0.24.2" } num_cpus = { version = "1.13" } #tokio-udt = { git = "https://github.com/amatissart/tokio-udt/", rev="f9fdae" } -tokio-udt = "0.1.0-alpha.4" +tokio-udt = "0.1.0-alpha.5" diff --git a/src/link/rendezvous/client.rs b/src/link/rendezvous/client.rs index 0f73aba..c4e106d 100644 --- a/src/link/rendezvous/client.rs +++ b/src/link/rendezvous/client.rs @@ -1,7 +1,7 @@ use crate::{ crypto::{Identity, KeyCard}, link::rendezvous::{ClientSettings, Request, Response, ShardId}, - net::traits::TcpConnect, + net::traits::{ConnectSettings, TcpConnect}, }; use doomstack::{here, Doom, ResultExt, Top}; @@ -120,7 +120,7 @@ impl Client { async fn attempt(&self, request: &Request) -> Result> { let mut connection = self .server - .connect() + .connect(&self.settings.connect) .await .map_err(AttemptError::connect_failed) .map_err(Doom::into_top) @@ -138,6 +138,10 @@ impl Client { Ok(response) } + + pub(crate) fn connect_settings(&self) -> &ConnectSettings { + &self.settings.connect + } } #[cfg(test)] @@ -156,9 +160,15 @@ mod tests { async fn setup_server(address: &'static str, shard_sizes: Vec) -> Server { let addr = lookup_host(address).await.unwrap().next().unwrap(); - Server::new(addr, ServerSettings { shard_sizes }) - .await - .unwrap() + Server::new( + addr, + ServerSettings { + shard_sizes, + ..Default::default() + }, + ) + .await + .unwrap() } async fn setup_clients( diff --git a/src/link/rendezvous/client_settings.rs b/src/link/rendezvous/client_settings.rs index e20c3a6..b332536 100644 --- a/src/link/rendezvous/client_settings.rs +++ b/src/link/rendezvous/client_settings.rs @@ -1,10 +1,14 @@ -use crate::time::{sleep_schedules::CappedExponential, SleepSchedule}; +use crate::{ + net::traits::ConnectSettings, + time::{sleep_schedules::CappedExponential, SleepSchedule}, +}; use std::{sync::Arc, time::Duration}; #[derive(Debug, Clone)] pub struct ClientSettings { pub sleep_schedule: Arc, + pub connect: ConnectSettings, } impl Default for ClientSettings { @@ -15,6 +19,7 @@ impl Default for ClientSettings { 2., Duration::from_secs(300), )), + connect: ConnectSettings::default(), } } } diff --git a/src/link/rendezvous/connector.rs b/src/link/rendezvous/connector.rs index a977ace..57af859 100644 --- a/src/link/rendezvous/connector.rs +++ b/src/link/rendezvous/connector.rs @@ -62,7 +62,7 @@ impl Connector { .spot(here!())?; let mut connection = address - .connect() + .connect(&self.client.connect_settings()) .await .map_err(ConnectorError::connect_failed) .map_err(Doom::into_top) diff --git a/src/link/rendezvous/listener.rs b/src/link/rendezvous/listener.rs index 08fe587..a8fddf0 100644 --- a/src/link/rendezvous/listener.rs +++ b/src/link/rendezvous/listener.rs @@ -3,7 +3,10 @@ use async_trait::async_trait; use crate::{ crypto::{Identity, KeyChain}, link::rendezvous::{Client, ListenerSettings}, - net::{traits::TcpConnect, Listener as NetListener, PlainConnection, SecureConnection}, + net::{ + traits::{TcpConnect, TransportProtocol}, + Listener as NetListener, PlainConnection, SecureConnection, + }, sync::fuse::Fuse, }; @@ -11,15 +14,21 @@ use doomstack::{here, Doom, ResultExt, Stack, Top}; use std::net::Ipv4Addr; +use tokio::net::TcpListener; use tokio::sync::{ mpsc, mpsc::{Receiver, Sender}, }; -use tokio_udt::{UdtConfiguration, UdtListener}; +use tokio_udt::UdtListener; type Outlet = Receiver<(Identity, SecureConnection)>; +pub(crate) enum RawListener { + TCP(TcpListener), + UDT(UdtListener), +} + pub struct Listener { outlet: Outlet, _fuse: Fuse, @@ -38,18 +47,27 @@ impl Listener { where S: 'static + TcpConnect, { - let listener = UdtListener::bind( - (Ipv4Addr::UNSPECIFIED, 0).into(), - Some(UdtConfiguration { - reuse_mux: false, - ..Default::default() - }), // TODO: Determine if `Ipv6Addr` can be used instead (problems with Docker?) - ) - .await - .unwrap(); + let (listener, port) = match settings.client_settings.connect.transport { + TransportProtocol::TCP => { + let listener = TcpListener::bind( + (Ipv4Addr::UNSPECIFIED, 0), // TODO: Determine if `Ipv6Addr` can be used instead (problems with Docker?) + ) + .await + .unwrap(); + let port = listener.local_addr().unwrap().port(); + (RawListener::TCP(listener), port) + } + TransportProtocol::UDT(ref config) => { + let listener = + UdtListener::bind((Ipv4Addr::UNSPECIFIED, 0).into(), Some(config.clone())) + .await + .unwrap(); + let port = listener.local_addr().unwrap().port(); + (RawListener::UDT(listener), port) + } + }; let identity = keychain.keycard().identity(); - let port = listener.local_addr().unwrap().port(); let fuse = Fuse::new(); @@ -70,19 +88,29 @@ impl Listener { async fn listen( keychain: KeyChain, - listener: UdtListener, + listener: RawListener, inlet: Sender<(Identity, SecureConnection)>, ) { let fuse = Fuse::new(); loop { + let accept_result = match listener { + RawListener::TCP(ref tcp_listener) => { + tcp_listener.accept().await.and_then(|(stream, addr)| { + stream.set_nodelay(true)?; + Ok((stream.into(), addr)) + }) + } + RawListener::UDT(ref udt_listener) => udt_listener + .accept() + .await + .map(|(addr, udt_connection)| (udt_connection.into(), addr)), + }; // if let Ok((stream, _)) = listener.accept().await.and_then(|(stream, addr)| { // stream.set_nodelay(true)?; // Ok((stream, addr)) // }) { - if let Ok((_addr, stream)) = listener.accept().await { - let connection = stream.into(); - + if let Ok((connection, _)) = accept_result { let keychain = keychain.clone(); let inlet = inlet.clone(); diff --git a/src/link/rendezvous/server.rs b/src/link/rendezvous/server.rs index 31f11b4..b4651b0 100644 --- a/src/link/rendezvous/server.rs +++ b/src/link/rendezvous/server.rs @@ -1,7 +1,8 @@ use crate::{ crypto::{Identity, KeyCard}, + link::rendezvous::listener::RawListener, link::rendezvous::{Request, Response, ServerSettings, ShardId}, - net::PlainConnection, + net::{traits::TransportProtocol, PlainConnection}, sync::fuse::Fuse, }; @@ -15,9 +16,8 @@ use std::{ sync::Arc, }; -use tokio::{io, net::ToSocketAddrs}; - -use tokio_udt::{UdtConfiguration, UdtListener}; +use tokio::{io, net::TcpListener}; +use tokio_udt::UdtListener; pub struct Server { _fuse: Fuse, @@ -61,17 +61,20 @@ impl Server { let fuse = Fuse::new(); - let listener = UdtListener::bind( - address, - Some(UdtConfiguration { - reuse_mux: false, - ..Default::default() - }), - ) - .await - .map_err(ServerError::initialize_failed) - .map_err(Doom::into_top) - .spot(here!())?; + let listener = { + let result = match settings.connect.transport { + TransportProtocol::TCP => TcpListener::bind(address).await.map(RawListener::TCP), + TransportProtocol::UDT(ref config) => { + UdtListener::bind(address, Some(config.clone())) + .await + .map(RawListener::UDT) + } + }; + result + .map_err(ServerError::initialize_failed) + .map_err(Doom::into_top) + .spot(here!())? + }; fuse.spawn(async move { let _ = Server::listen(settings, database, listener).await; @@ -83,17 +86,29 @@ impl Server { async fn listen( settings: ServerSettings, database: Arc>, - listener: UdtListener, + listener: RawListener, ) { let fuse = Fuse::new(); + let accept = || async { + match listener { + RawListener::TCP(ref tcp_listener) => tcp_listener + .accept() + .await + .map(|(stream, address)| (stream.into(), address)), + RawListener::UDT(ref udt_listener) => udt_listener + .accept() + .await + .map(|(address, stream)| (stream.into(), address)), + } + }; + loop { - if let Ok((address, stream)) = listener.accept().await { + if let Ok((connection, address)) = accept().await { + let connection: PlainConnection = connection; let settings = settings.clone(); let database = database.clone(); - let connection: PlainConnection = stream.into(); - fuse.spawn(async move { let _ = Server::serve(settings, database, connection, address).await; }); diff --git a/src/link/rendezvous/server_settings.rs b/src/link/rendezvous/server_settings.rs index eb70357..4f726a0 100644 --- a/src/link/rendezvous/server_settings.rs +++ b/src/link/rendezvous/server_settings.rs @@ -1,12 +1,16 @@ +use crate::net::traits::ConnectSettings; + #[derive(Debug, Clone)] pub struct ServerSettings { pub shard_sizes: Vec, + pub connect: ConnectSettings, } impl Default for ServerSettings { fn default() -> Self { ServerSettings { shard_sizes: vec![4], + connect: ConnectSettings::default(), } } } diff --git a/src/link/test/tests.rs b/src/link/test/tests.rs index 5386f2e..82eda67 100644 --- a/src/link/test/tests.rs +++ b/src/link/test/tests.rs @@ -7,7 +7,7 @@ mod context { }, net::{ test::{System as NetSystem, TestConnector}, - traits::TcpConnect, + traits::{ConnectSettings, TcpConnect}, Connector, Listener, PlainConnection, }, time::test::join, @@ -95,7 +95,7 @@ mod context { impl SlowLoris { async fn connect(&self, identity: Identity) -> PlainConnection { let address = self.0.peers.get(&identity).unwrap().clone(); - address.connect().await.unwrap() + address.connect(&ConnectSettings::default()).await.unwrap() // does not complete (no `secure` or `authenticate`) } diff --git a/src/net/test/test_connector.rs b/src/net/test/test_connector.rs index 9ffbb9f..0be2bba 100644 --- a/src/net/test/test_connector.rs +++ b/src/net/test/test_connector.rs @@ -2,7 +2,10 @@ use async_trait::async_trait; use crate::{ crypto::{Identity, KeyChain}, - net::{traits::TcpConnect, Connector, SecureConnection}, + net::{ + traits::{ConnectSettings, TcpConnect}, + Connector, SecureConnection, + }, }; use doomstack::{here, Doom, ResultExt, Stack}; @@ -46,7 +49,7 @@ impl Connector for TestConnector { .clone(); let mut connection = address - .connect() + .connect(&ConnectSettings::default()) .await .map_err(TestConnectorError::connect_failed) .map_err(Doom::into_top) diff --git a/src/net/traits/mod.rs b/src/net/traits/mod.rs index b99dc04..f1a0921 100644 --- a/src/net/traits/mod.rs +++ b/src/net/traits/mod.rs @@ -1,3 +1,3 @@ mod tcp_connect; -pub use tcp_connect::TcpConnect; +pub use tcp_connect::{ConnectSettings, TcpConnect, TransportProtocol}; diff --git a/src/net/traits/tcp_connect.rs b/src/net/traits/tcp_connect.rs index b03311f..7fa2187 100644 --- a/src/net/traits/tcp_connect.rs +++ b/src/net/traits/tcp_connect.rs @@ -4,28 +4,36 @@ use crate::net::PlainConnection; use std::io::Result; -use tokio::net::{lookup_host, TcpStream, ToSocketAddrs}; - +use tokio::net::{TcpStream, ToSocketAddrs}; use tokio_udt::{UdtConfiguration, UdtConnection}; -// TODO: Define generic Connect -// pub enum TransportType { -// TCP, -// UDT, -// } +/* TODO: + * Define generic Connect + * Retry handshake / RDV queue in UDT +*/ + +#[derive(Clone, Debug)] +pub enum TransportProtocol { + TCP, + UDT(UdtConfiguration), +} -// struct ConnectSettings { -// transport: // TCP / UDT -// } +#[derive(Clone, Debug)] +pub struct ConnectSettings { + pub transport: TransportProtocol, +} -// #[async_trait] -// pub trait Connect: Send + Sync { -// async fn connect(&self, settings: ConnectSettings) -> Result; -// } +impl Default for ConnectSettings { + fn default() -> Self { + Self { + transport: TransportProtocol::UDT(UdtConfiguration::default()), + } + } +} #[async_trait] pub trait TcpConnect: Send + Sync { - async fn connect(&self) -> Result; + async fn connect(&self, settings: &ConnectSettings) -> Result; } #[async_trait] @@ -33,21 +41,15 @@ impl TcpConnect for A where A: Send + Sync + Clone + ToSocketAddrs, { - async fn connect(&self) -> Result { - // TcpStream::connect(self.clone()).await.and_then(|stream| { - // stream.set_nodelay(true)?; - // Ok(stream.into()) - // }) - - let addr = lookup_host(self).await?.next().expect("no addr found"); - UdtConnection::connect( - addr, - Some(UdtConfiguration { - reuse_mux: false, - ..Default::default() + async fn connect(&self, settings: &ConnectSettings) -> Result { + match &settings.transport { + TransportProtocol::TCP => TcpStream::connect(self.clone()).await.and_then(|stream| { + stream.set_nodelay(true)?; + Ok(stream.into()) }), - ) - .await - .map(Into::into) + TransportProtocol::UDT(config) => UdtConnection::connect(&self, Some(config.clone())) + .await + .map(Into::into), + } } } From aa97ce7a86a5e1bf4777a62ce863daaa852d94f5 Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Fri, 5 Aug 2022 18:15:03 +0200 Subject: [PATCH 06/16] rename trait TcpConnect into Connect --- src/link/rendezvous/client.rs | 6 +++--- src/link/rendezvous/connector.rs | 4 ++-- src/link/rendezvous/listener.rs | 4 ++-- src/link/test/tests.rs | 2 +- src/net/test/test_connector.rs | 2 +- src/net/traits/{tcp_connect.rs => connect.rs} | 4 ++-- src/net/traits/mod.rs | 4 ++-- 7 files changed, 13 insertions(+), 13 deletions(-) rename src/net/traits/{tcp_connect.rs => connect.rs} (95%) diff --git a/src/link/rendezvous/client.rs b/src/link/rendezvous/client.rs index c4e106d..f4b72de 100644 --- a/src/link/rendezvous/client.rs +++ b/src/link/rendezvous/client.rs @@ -1,7 +1,7 @@ use crate::{ crypto::{Identity, KeyCard}, link::rendezvous::{ClientSettings, Request, Response, ShardId}, - net::traits::{ConnectSettings, TcpConnect}, + net::traits::{ConnectSettings, Connect}, }; use doomstack::{here, Doom, ResultExt, Top}; @@ -9,7 +9,7 @@ use doomstack::{here, Doom, ResultExt, Top}; use std::{io, net::SocketAddr, vec::Vec}; pub struct Client { - server: Box, + server: Box, settings: ClientSettings, } @@ -41,7 +41,7 @@ enum AttemptError { impl Client { pub fn new(server: S, settings: ClientSettings) -> Self where - S: 'static + TcpConnect, + S: 'static + Connect, { Client { server: Box::new(server), diff --git a/src/link/rendezvous/connector.rs b/src/link/rendezvous/connector.rs index 57af859..59548f1 100644 --- a/src/link/rendezvous/connector.rs +++ b/src/link/rendezvous/connector.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use crate::{ crypto::{Identity, KeyChain}, link::rendezvous::{Client, ConnectorSettings}, - net::{traits::TcpConnect, Connector as NetConnector, SecureConnection}, + net::{traits::Connect, Connector as NetConnector, SecureConnection}, }; use doomstack::{here, Doom, ResultExt, Stack, Top}; @@ -40,7 +40,7 @@ pub enum ConnectorError { impl Connector { pub fn new(server: S, keychain: KeyChain, settings: ConnectorSettings) -> Self where - S: 'static + TcpConnect, + S: 'static + Connect, { let client = Client::new(server, settings.client_settings); diff --git a/src/link/rendezvous/listener.rs b/src/link/rendezvous/listener.rs index a8fddf0..9ab7f2f 100644 --- a/src/link/rendezvous/listener.rs +++ b/src/link/rendezvous/listener.rs @@ -4,7 +4,7 @@ use crate::{ crypto::{Identity, KeyChain}, link::rendezvous::{Client, ListenerSettings}, net::{ - traits::{TcpConnect, TransportProtocol}, + traits::{Connect, TransportProtocol}, Listener as NetListener, PlainConnection, SecureConnection, }, sync::fuse::Fuse, @@ -45,7 +45,7 @@ enum ServeError { impl Listener { pub async fn new(server: S, keychain: KeyChain, settings: ListenerSettings) -> Self where - S: 'static + TcpConnect, + S: 'static + Connect, { let (listener, port) = match settings.client_settings.connect.transport { TransportProtocol::TCP => { diff --git a/src/link/test/tests.rs b/src/link/test/tests.rs index 82eda67..c8e1fe2 100644 --- a/src/link/test/tests.rs +++ b/src/link/test/tests.rs @@ -7,7 +7,7 @@ mod context { }, net::{ test::{System as NetSystem, TestConnector}, - traits::{ConnectSettings, TcpConnect}, + traits::{ConnectSettings, Connect}, Connector, Listener, PlainConnection, }, time::test::join, diff --git a/src/net/test/test_connector.rs b/src/net/test/test_connector.rs index 0be2bba..0e3478d 100644 --- a/src/net/test/test_connector.rs +++ b/src/net/test/test_connector.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use crate::{ crypto::{Identity, KeyChain}, net::{ - traits::{ConnectSettings, TcpConnect}, + traits::{ConnectSettings, Connect}, Connector, SecureConnection, }, }; diff --git a/src/net/traits/tcp_connect.rs b/src/net/traits/connect.rs similarity index 95% rename from src/net/traits/tcp_connect.rs rename to src/net/traits/connect.rs index 7fa2187..8cd26a7 100644 --- a/src/net/traits/tcp_connect.rs +++ b/src/net/traits/connect.rs @@ -32,12 +32,12 @@ impl Default for ConnectSettings { } #[async_trait] -pub trait TcpConnect: Send + Sync { +pub trait Connect: Send + Sync { async fn connect(&self, settings: &ConnectSettings) -> Result; } #[async_trait] -impl TcpConnect for A +impl Connect for A where A: Send + Sync + Clone + ToSocketAddrs, { diff --git a/src/net/traits/mod.rs b/src/net/traits/mod.rs index f1a0921..4ebafa2 100644 --- a/src/net/traits/mod.rs +++ b/src/net/traits/mod.rs @@ -1,3 +1,3 @@ -mod tcp_connect; +mod connect; -pub use tcp_connect::{ConnectSettings, TcpConnect, TransportProtocol}; +pub use connect::{ConnectSettings, Connect, TransportProtocol}; From 2650f175461d9a08c74698910b8f623eb5e0d852 Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Fri, 5 Aug 2022 18:19:46 +0200 Subject: [PATCH 07/16] reset default transport to TCP --- src/link/rendezvous/connector.rs | 2 +- src/link/rendezvous/listener.rs | 12 ++++++------ src/link/rendezvous/server.rs | 8 ++++---- src/net/test/test_listener.rs | 28 ++++++++++------------------ src/net/traits/connect.rs | 2 +- 5 files changed, 22 insertions(+), 30 deletions(-) diff --git a/src/link/rendezvous/connector.rs b/src/link/rendezvous/connector.rs index 59548f1..2e63335 100644 --- a/src/link/rendezvous/connector.rs +++ b/src/link/rendezvous/connector.rs @@ -62,7 +62,7 @@ impl Connector { .spot(here!())?; let mut connection = address - .connect(&self.client.connect_settings()) + .connect(self.client.connect_settings()) .await .map_err(ConnectorError::connect_failed) .map_err(Doom::into_top) diff --git a/src/link/rendezvous/listener.rs b/src/link/rendezvous/listener.rs index 9ab7f2f..1b32758 100644 --- a/src/link/rendezvous/listener.rs +++ b/src/link/rendezvous/listener.rs @@ -25,8 +25,8 @@ use tokio_udt::UdtListener; type Outlet = Receiver<(Identity, SecureConnection)>; pub(crate) enum RawListener { - TCP(TcpListener), - UDT(UdtListener), + Tcp(TcpListener), + Udt(UdtListener), } pub struct Listener { @@ -55,7 +55,7 @@ impl Listener { .await .unwrap(); let port = listener.local_addr().unwrap().port(); - (RawListener::TCP(listener), port) + (RawListener::Tcp(listener), port) } TransportProtocol::UDT(ref config) => { let listener = @@ -63,7 +63,7 @@ impl Listener { .await .unwrap(); let port = listener.local_addr().unwrap().port(); - (RawListener::UDT(listener), port) + (RawListener::Udt(listener), port) } }; @@ -95,13 +95,13 @@ impl Listener { loop { let accept_result = match listener { - RawListener::TCP(ref tcp_listener) => { + RawListener::Tcp(ref tcp_listener) => { tcp_listener.accept().await.and_then(|(stream, addr)| { stream.set_nodelay(true)?; Ok((stream.into(), addr)) }) } - RawListener::UDT(ref udt_listener) => udt_listener + RawListener::Udt(ref udt_listener) => udt_listener .accept() .await .map(|(addr, udt_connection)| (udt_connection.into(), addr)), diff --git a/src/link/rendezvous/server.rs b/src/link/rendezvous/server.rs index b4651b0..588bf21 100644 --- a/src/link/rendezvous/server.rs +++ b/src/link/rendezvous/server.rs @@ -63,11 +63,11 @@ impl Server { let listener = { let result = match settings.connect.transport { - TransportProtocol::TCP => TcpListener::bind(address).await.map(RawListener::TCP), + TransportProtocol::TCP => TcpListener::bind(address).await.map(RawListener::Tcp), TransportProtocol::UDT(ref config) => { UdtListener::bind(address, Some(config.clone())) .await - .map(RawListener::UDT) + .map(RawListener::Udt) } }; result @@ -92,11 +92,11 @@ impl Server { let accept = || async { match listener { - RawListener::TCP(ref tcp_listener) => tcp_listener + RawListener::Tcp(ref tcp_listener) => tcp_listener .accept() .await .map(|(stream, address)| (stream.into(), address)), - RawListener::UDT(ref udt_listener) => udt_listener + RawListener::Udt(ref udt_listener) => udt_listener .accept() .await .map(|(address, stream)| (stream.into(), address)), diff --git a/src/net/test/test_listener.rs b/src/net/test/test_listener.rs index ff492e3..9b3c044 100644 --- a/src/net/test/test_listener.rs +++ b/src/net/test/test_listener.rs @@ -10,13 +10,14 @@ use doomstack::{here, Doom, ResultExt, Stack, Top}; use std::net::{Ipv4Addr, SocketAddr}; -use tokio::sync::{ - mpsc, - mpsc::{Receiver, Sender}, +use tokio::{ + net::TcpListener, + sync::{ + mpsc, + mpsc::{Receiver, Sender}, + }, }; -use tokio_udt::{UdtConfiguration, UdtListener}; - const CHANNEL_CAPACITY: usize = 32; type Outlet = Receiver<(Identity, SecureConnection)>; @@ -36,16 +37,7 @@ enum ServeError { impl TestListener { pub async fn new(keychain: KeyChain) -> (Self, SocketAddr) { - let bind_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); - let listener = UdtListener::bind( - bind_addr, - Some(UdtConfiguration { - reuse_mux: false, - ..Default::default() - }), - ) - .await - .unwrap(); + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap(); let address = listener.local_addr().unwrap(); @@ -68,14 +60,14 @@ impl TestListener { async fn listen( keychain: KeyChain, - listener: UdtListener, + listener: TcpListener, inlet: Sender<(Identity, SecureConnection)>, ) { let fuse = Fuse::new(); loop { - if let Ok((_addr, udt_connection)) = listener.accept().await { - let connection = udt_connection.into(); + if let Ok((stream, _)) = listener.accept().await { + let connection = stream.into(); let keychain = keychain.clone(); let inlet = inlet.clone(); diff --git a/src/net/traits/connect.rs b/src/net/traits/connect.rs index 8cd26a7..ea56ae1 100644 --- a/src/net/traits/connect.rs +++ b/src/net/traits/connect.rs @@ -26,7 +26,7 @@ pub struct ConnectSettings { impl Default for ConnectSettings { fn default() -> Self { Self { - transport: TransportProtocol::UDT(UdtConfiguration::default()), + transport: TransportProtocol::TCP, } } } From 52d6f29c7297167d1543e8f3bea0ae04f8be6351 Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Fri, 5 Aug 2022 18:24:44 +0200 Subject: [PATCH 08/16] clean up --- src/link/rendezvous/listener.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/link/rendezvous/listener.rs b/src/link/rendezvous/listener.rs index 1b32758..cfd3012 100644 --- a/src/link/rendezvous/listener.rs +++ b/src/link/rendezvous/listener.rs @@ -106,10 +106,6 @@ impl Listener { .await .map(|(addr, udt_connection)| (udt_connection.into(), addr)), }; - // if let Ok((stream, _)) = listener.accept().await.and_then(|(stream, addr)| { - // stream.set_nodelay(true)?; - // Ok((stream, addr)) - // }) { if let Ok((connection, _)) = accept_result { let keychain = keychain.clone(); let inlet = inlet.clone(); From 7f4605d7fae11feb9c55a0122b310a44497dd3ec Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Mon, 8 Aug 2022 10:52:14 +0200 Subject: [PATCH 09/16] use tokio_udt on linux only --- Cargo.toml | 1 + src/link/rendezvous/client.rs | 2 +- src/link/rendezvous/listener.rs | 17 ++++++++++------- src/link/rendezvous/server.rs | 5 +++-- src/link/test/tests.rs | 2 +- src/net/mod.rs | 4 +++- src/net/test/test_connector.rs | 2 +- src/net/traits/connect.rs | 18 ++++++++---------- src/net/traits/mod.rs | 2 +- 9 files changed, 29 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 45e24dd..3f68271 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,5 +33,6 @@ socket2 = { version = "0.4.4" } nix = { version = "0.24.2" } num_cpus = { version = "1.13" } +[target.'cfg(target_os="linux")'.dependencies] #tokio-udt = { git = "https://github.com/amatissart/tokio-udt/", rev="f9fdae" } tokio-udt = "0.1.0-alpha.5" diff --git a/src/link/rendezvous/client.rs b/src/link/rendezvous/client.rs index f4b72de..a33ddd4 100644 --- a/src/link/rendezvous/client.rs +++ b/src/link/rendezvous/client.rs @@ -1,7 +1,7 @@ use crate::{ crypto::{Identity, KeyCard}, link::rendezvous::{ClientSettings, Request, Response, ShardId}, - net::traits::{ConnectSettings, Connect}, + net::traits::{Connect, ConnectSettings}, }; use doomstack::{here, Doom, ResultExt, Top}; diff --git a/src/link/rendezvous/listener.rs b/src/link/rendezvous/listener.rs index cfd3012..081add6 100644 --- a/src/link/rendezvous/listener.rs +++ b/src/link/rendezvous/listener.rs @@ -20,13 +20,12 @@ use tokio::sync::{ mpsc::{Receiver, Sender}, }; -use tokio_udt::UdtListener; - type Outlet = Receiver<(Identity, SecureConnection)>; pub(crate) enum RawListener { Tcp(TcpListener), - Udt(UdtListener), + #[cfg(target_os = "linux")] + Udt(tokio_udt::UdtListener), } pub struct Listener { @@ -57,11 +56,14 @@ impl Listener { let port = listener.local_addr().unwrap().port(); (RawListener::Tcp(listener), port) } + #[cfg(target_os = "linux")] TransportProtocol::UDT(ref config) => { - let listener = - UdtListener::bind((Ipv4Addr::UNSPECIFIED, 0).into(), Some(config.clone())) - .await - .unwrap(); + let listener = tokio_udt::UdtListener::bind( + (Ipv4Addr::UNSPECIFIED, 0).into(), + Some(config.clone()), + ) + .await + .unwrap(); let port = listener.local_addr().unwrap().port(); (RawListener::Udt(listener), port) } @@ -101,6 +103,7 @@ impl Listener { Ok((stream.into(), addr)) }) } + #[cfg(target_os = "linux")] RawListener::Udt(ref udt_listener) => udt_listener .accept() .await diff --git a/src/link/rendezvous/server.rs b/src/link/rendezvous/server.rs index 588bf21..db89cbe 100644 --- a/src/link/rendezvous/server.rs +++ b/src/link/rendezvous/server.rs @@ -17,7 +17,6 @@ use std::{ }; use tokio::{io, net::TcpListener}; -use tokio_udt::UdtListener; pub struct Server { _fuse: Fuse, @@ -64,8 +63,9 @@ impl Server { let listener = { let result = match settings.connect.transport { TransportProtocol::TCP => TcpListener::bind(address).await.map(RawListener::Tcp), + #[cfg(target_os = "linux")] TransportProtocol::UDT(ref config) => { - UdtListener::bind(address, Some(config.clone())) + tokio_udt::UdtListener::bind(address, Some(config.clone())) .await .map(RawListener::Udt) } @@ -96,6 +96,7 @@ impl Server { .accept() .await .map(|(stream, address)| (stream.into(), address)), + #[cfg(target_os = "linux")] RawListener::Udt(ref udt_listener) => udt_listener .accept() .await diff --git a/src/link/test/tests.rs b/src/link/test/tests.rs index c8e1fe2..8fbb0cf 100644 --- a/src/link/test/tests.rs +++ b/src/link/test/tests.rs @@ -7,7 +7,7 @@ mod context { }, net::{ test::{System as NetSystem, TestConnector}, - traits::{ConnectSettings, Connect}, + traits::{Connect, ConnectSettings}, Connector, Listener, PlainConnection, }, time::test::join, diff --git a/src/net/mod.rs b/src/net/mod.rs index 72f0100..a6987eb 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -16,13 +16,15 @@ mod session_connector; mod session_control; mod session_listener; mod socket; -mod udt; mod unit_receiver; mod unit_sender; pub mod sockets; pub mod traits; +#[cfg(target_os = "linux")] +mod udt; + #[cfg(any(test, feature = "test_utilities"))] pub mod test; diff --git a/src/net/test/test_connector.rs b/src/net/test/test_connector.rs index 0e3478d..49fccba 100644 --- a/src/net/test/test_connector.rs +++ b/src/net/test/test_connector.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use crate::{ crypto::{Identity, KeyChain}, net::{ - traits::{ConnectSettings, Connect}, + traits::{Connect, ConnectSettings}, Connector, SecureConnection, }, }; diff --git a/src/net/traits/connect.rs b/src/net/traits/connect.rs index ea56ae1..80fdbb9 100644 --- a/src/net/traits/connect.rs +++ b/src/net/traits/connect.rs @@ -5,17 +5,12 @@ use crate::net::PlainConnection; use std::io::Result; use tokio::net::{TcpStream, ToSocketAddrs}; -use tokio_udt::{UdtConfiguration, UdtConnection}; - -/* TODO: - * Define generic Connect - * Retry handshake / RDV queue in UDT -*/ #[derive(Clone, Debug)] pub enum TransportProtocol { TCP, - UDT(UdtConfiguration), + #[cfg(target_os = "linux")] + UDT(tokio_udt::UdtConfiguration), } #[derive(Clone, Debug)] @@ -47,9 +42,12 @@ where stream.set_nodelay(true)?; Ok(stream.into()) }), - TransportProtocol::UDT(config) => UdtConnection::connect(&self, Some(config.clone())) - .await - .map(Into::into), + #[cfg(target_os = "linux")] + TransportProtocol::UDT(config) => { + tokio_udt::UdtConnection::connect(&self, Some(config.clone())) + .await + .map(Into::into) + } } } } diff --git a/src/net/traits/mod.rs b/src/net/traits/mod.rs index 4ebafa2..7448e66 100644 --- a/src/net/traits/mod.rs +++ b/src/net/traits/mod.rs @@ -1,3 +1,3 @@ mod connect; -pub use connect::{ConnectSettings, Connect, TransportProtocol}; +pub use connect::{Connect, ConnectSettings, TransportProtocol}; From 483022625f91975915b7af216767e43c285734b3 Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Mon, 8 Aug 2022 16:04:49 +0200 Subject: [PATCH 10/16] upgrade tokio-udt to alpha6 and support udt on non-linux systems too (unoptimized) --- Cargo.toml | 3 +-- src/link/rendezvous/listener.rs | 17 +++++++---------- src/link/rendezvous/server.rs | 5 ++--- src/net/mod.rs | 4 +--- src/net/traits/connect.rs | 13 +++++-------- 5 files changed, 16 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3f68271..3d182eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,5 @@ socket2 = { version = "0.4.4" } nix = { version = "0.24.2" } num_cpus = { version = "1.13" } -[target.'cfg(target_os="linux")'.dependencies] #tokio-udt = { git = "https://github.com/amatissart/tokio-udt/", rev="f9fdae" } -tokio-udt = "0.1.0-alpha.5" +tokio-udt = "0.1.0-alpha.6" diff --git a/src/link/rendezvous/listener.rs b/src/link/rendezvous/listener.rs index 081add6..cfd3012 100644 --- a/src/link/rendezvous/listener.rs +++ b/src/link/rendezvous/listener.rs @@ -20,12 +20,13 @@ use tokio::sync::{ mpsc::{Receiver, Sender}, }; +use tokio_udt::UdtListener; + type Outlet = Receiver<(Identity, SecureConnection)>; pub(crate) enum RawListener { Tcp(TcpListener), - #[cfg(target_os = "linux")] - Udt(tokio_udt::UdtListener), + Udt(UdtListener), } pub struct Listener { @@ -56,14 +57,11 @@ impl Listener { let port = listener.local_addr().unwrap().port(); (RawListener::Tcp(listener), port) } - #[cfg(target_os = "linux")] TransportProtocol::UDT(ref config) => { - let listener = tokio_udt::UdtListener::bind( - (Ipv4Addr::UNSPECIFIED, 0).into(), - Some(config.clone()), - ) - .await - .unwrap(); + let listener = + UdtListener::bind((Ipv4Addr::UNSPECIFIED, 0).into(), Some(config.clone())) + .await + .unwrap(); let port = listener.local_addr().unwrap().port(); (RawListener::Udt(listener), port) } @@ -103,7 +101,6 @@ impl Listener { Ok((stream.into(), addr)) }) } - #[cfg(target_os = "linux")] RawListener::Udt(ref udt_listener) => udt_listener .accept() .await diff --git a/src/link/rendezvous/server.rs b/src/link/rendezvous/server.rs index db89cbe..588bf21 100644 --- a/src/link/rendezvous/server.rs +++ b/src/link/rendezvous/server.rs @@ -17,6 +17,7 @@ use std::{ }; use tokio::{io, net::TcpListener}; +use tokio_udt::UdtListener; pub struct Server { _fuse: Fuse, @@ -63,9 +64,8 @@ impl Server { let listener = { let result = match settings.connect.transport { TransportProtocol::TCP => TcpListener::bind(address).await.map(RawListener::Tcp), - #[cfg(target_os = "linux")] TransportProtocol::UDT(ref config) => { - tokio_udt::UdtListener::bind(address, Some(config.clone())) + UdtListener::bind(address, Some(config.clone())) .await .map(RawListener::Udt) } @@ -96,7 +96,6 @@ impl Server { .accept() .await .map(|(stream, address)| (stream.into(), address)), - #[cfg(target_os = "linux")] RawListener::Udt(ref udt_listener) => udt_listener .accept() .await diff --git a/src/net/mod.rs b/src/net/mod.rs index a6987eb..72f0100 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -16,15 +16,13 @@ mod session_connector; mod session_control; mod session_listener; mod socket; +mod udt; mod unit_receiver; mod unit_sender; pub mod sockets; pub mod traits; -#[cfg(target_os = "linux")] -mod udt; - #[cfg(any(test, feature = "test_utilities"))] pub mod test; diff --git a/src/net/traits/connect.rs b/src/net/traits/connect.rs index 80fdbb9..5081c14 100644 --- a/src/net/traits/connect.rs +++ b/src/net/traits/connect.rs @@ -5,12 +5,12 @@ use crate::net::PlainConnection; use std::io::Result; use tokio::net::{TcpStream, ToSocketAddrs}; +use tokio_udt::{UdtConfiguration, UdtConnection}; #[derive(Clone, Debug)] pub enum TransportProtocol { TCP, - #[cfg(target_os = "linux")] - UDT(tokio_udt::UdtConfiguration), + UDT(UdtConfiguration), } #[derive(Clone, Debug)] @@ -42,12 +42,9 @@ where stream.set_nodelay(true)?; Ok(stream.into()) }), - #[cfg(target_os = "linux")] - TransportProtocol::UDT(config) => { - tokio_udt::UdtConnection::connect(&self, Some(config.clone())) - .await - .map(Into::into) - } + TransportProtocol::UDT(config) => UdtConnection::connect(&self, Some(config.clone())) + .await + .map(Into::into), } } } From a79a201ca328a2b06369772122ae48a15ed46619 Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Mon, 26 Sep 2022 12:05:58 +0200 Subject: [PATCH 11/16] fix warnings in tcp_proxy --- src/net/test/tcp_proxy.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net/test/tcp_proxy.rs b/src/net/test/tcp_proxy.rs index 634a1c3..5581c2d 100644 --- a/src/net/test/tcp_proxy.rs +++ b/src/net/test/tcp_proxy.rs @@ -89,12 +89,12 @@ impl TcpProxy { pub async fn start(&mut self) { let _ = self.state_inlet.send(State::On); - self.off_lock.write().await; + let _ = self.off_lock.write().await; } pub async fn stop(&mut self) { let _ = self.state_inlet.send(State::Off); - self.on_lock.write().await; + let _ = self.on_lock.write().await; } pub async fn reset(&mut self) { From 4cd1842c3176112f0a5fe5dce6bd38a5d77bdec2 Mon Sep 17 00:00:00 2001 From: Adrien Matissart Date: Tue, 27 Sep 2022 16:58:54 +0200 Subject: [PATCH 12/16] update tokio-udt version --- Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3d182eb..90c94dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,5 +33,4 @@ socket2 = { version = "0.4.4" } nix = { version = "0.24.2" } num_cpus = { version = "1.13" } -#tokio-udt = { git = "https://github.com/amatissart/tokio-udt/", rev="f9fdae" } -tokio-udt = "0.1.0-alpha.6" +tokio-udt = "0.1.0-alpha.7" From 541f1e34df0481aff356b24f1a6b985693cd9a5f Mon Sep 17 00:00:00 2001 From: Matteo Monti Date: Fri, 18 Nov 2022 17:38:41 +0000 Subject: [PATCH 13/16] [REVERT] Add debug `println!`s --- src/net/plex/multiplex.rs | 13 +++++++++++-- src/net/plex/plex_connector.rs | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/net/plex/multiplex.rs b/src/net/plex/multiplex.rs index 05feb76..5f885da 100644 --- a/src/net/plex/multiplex.rs +++ b/src/net/plex/multiplex.rs @@ -152,8 +152,17 @@ impl Multiplex { let fuse = Fuse::new(); - fuse.spawn(Multiplex::route_in(receiver, run_route_in_inlet)); - fuse.spawn(Multiplex::route_out(sender, route_out_outlet)); + fuse.spawn(async move { + if let Err(error) = Multiplex::route_in(receiver, run_route_in_inlet).await { + println!("ROUTE_IN ERROR: {error:?}"); + } + }); + + fuse.spawn(async move { + if let Err(error) = Multiplex::route_out(sender, route_out_outlet).await { + println!("ROUTE_OUT ERROR: {error:?}"); + } + }); let mut plex_handles = HashMap::new(); diff --git a/src/net/plex/plex_connector.rs b/src/net/plex/plex_connector.rs index d167e65..df61a1c 100644 --- a/src/net/plex/plex_connector.rs +++ b/src/net/plex/plex_connector.rs @@ -164,7 +164,16 @@ impl PlexConnector { // Prune all dead `ConnectMultiplex`es in `multiplexes` + let before = multiplexes.len(); multiplexes.retain(|_, multiplex| multiplex.is_alive()); + let after = multiplexes.len(); + + if after < before { + println!( + "WARNING: {} multiplexes were lost in battle!", + before - after + ); + } // If `multiplex_id` is `Some`, try connecting on `multiplex_id` @@ -223,7 +232,16 @@ impl PlexConnector { // Prune all dead `ConnectMultiplex`es in `multiplexes` + let before = multiplexes.len(); multiplexes.retain(|_, multiplex| multiplex.is_alive()); + let after = multiplexes.len(); + + if after < before { + println!( + "WARNING: {} multiplexes were lost in battle!", + before - after + ); + } // `ping()` all remaining `ConnectMultiplex`es in `multiplexes` From f4cf34cc3f15f7d6bd05e4e914ef7d8bfb1f21d5 Mon Sep 17 00:00:00 2001 From: Matteo Monti Date: Fri, 18 Nov 2022 17:47:12 +0000 Subject: [PATCH 14/16] [REVERT] Add more debug `println!`s --- src/net/plex/multiplex.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/net/plex/multiplex.rs b/src/net/plex/multiplex.rs index 5f885da..760c2ca 100644 --- a/src/net/plex/multiplex.rs +++ b/src/net/plex/multiplex.rs @@ -98,7 +98,7 @@ impl Multiplex { let settings = settings.clone(); fuse.spawn(async move { - let _ = Multiplex::run( + let result = Multiplex::run( connection, run_plex_outlet, accept_inlet, @@ -107,6 +107,8 @@ impl Multiplex { ) .await; + println!("RUN RETURNED WITH {result:?}"); + info.is_alive.store(false, Ordering::Relaxed); }); } From 0c50ed25f3daf6e934aa15b0c6cadf5e68eec485 Mon Sep 17 00:00:00 2001 From: Matteo Monti Date: Fri, 18 Nov 2022 17:49:44 +0000 Subject: [PATCH 15/16] [REVERT] Add more debug `println!`s --- src/net/plex/plex_listener.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/net/plex/plex_listener.rs b/src/net/plex/plex_listener.rs index 706b930..efb7d08 100644 --- a/src/net/plex/plex_listener.rs +++ b/src/net/plex/plex_listener.rs @@ -69,5 +69,7 @@ impl PlexListener { while let Ok(plex) = listen_multiplex.accept().await { let _ = accept_inlet.send((remote, plex)).await; } + + println!("RETURNING FROM SERVE"); } } From 708a1100402e8389ad8779906039e80ad3ea2791 Mon Sep 17 00:00:00 2001 From: Manuel Date: Fri, 18 Nov 2022 19:28:40 +0100 Subject: [PATCH 16/16] Add error message --- src/net/plex/plex_listener.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/net/plex/plex_listener.rs b/src/net/plex/plex_listener.rs index efb7d08..b25abee 100644 --- a/src/net/plex/plex_listener.rs +++ b/src/net/plex/plex_listener.rs @@ -46,13 +46,19 @@ impl PlexListener { let fuse = Fuse::new(); loop { - if let Ok((remote, connection)) = listener.accept().await { + let connection = listener.accept().await; + + if let Ok((remote, connection)) = connection { fuse.spawn(PlexListener::serve( remote, connection, accept_inlet.clone(), settings.multiplex_settings.clone(), )); + } else { + if let Err(e) = connection { + println!("Error listening! {:?}", e); + } } } }