diff --git a/Cargo.lock b/Cargo.lock index 7cbaa14..0eed395 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -839,6 +839,7 @@ name = "turnhammer" version = "0.1.0" dependencies = [ "bytecodec 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "spin_sleep 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 0d923eb..15ee319 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ stunclient = "0.1.1" turnclient = "0.1.0" spin_sleep = "0.3.7" tokio-timer = "0.2.10" +byteorder = "1.3.1" [replace] "stun_codec:0.1.10" = {path = "/mnt/src/git/stun_codec"} diff --git a/src/main.rs b/src/main.rs index 7773c64..1286355 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,10 +9,12 @@ extern crate turnclient; extern crate tokio; extern crate spin_sleep; extern crate tokio_timer; +extern crate byteorder; use std::time::{Duration,Instant}; use std::net::SocketAddr; use structopt::StructOpt; +use std::sync::Arc; use futures::{Async, Future, Poll}; use futures::{Stream, Sink}; @@ -54,7 +56,7 @@ struct Opt { /// Packets per second #[structopt(long="pps", default_value="5")] - delay_between_packets: u32, + packets_per_second: u32, /// Experiment duration, seconds #[structopt(short="d", long="duration", default_value="5")] @@ -74,11 +76,67 @@ enum ServeTurnEventOrShutdown { Shutdown, } +fn sending_thread( + udp: Arc, + packet_size: usize, + packets_per_second: u32, + duration_seconds: u64, + destinations: Vec, + time_base: Instant, +) { + let sleeper = spin_sleep::SpinSleeper::default(); + sleeper.sleep_ns(500_000_000); // to allow receiver to warm up + let start = Instant::now(); + let step = Duration::from_secs(1) / packets_per_second; + let n = packets_per_second * (duration_seconds as u32); + + use byteorder::{BE,ByteOrder}; + + let mut buf = vec![0; packet_size]; + + let mut totalctr : u32 = 0; + + for i in 0..n { + let deadline = start + step * i; + let now = Instant::now(); + let delta = now - time_base; + + BE::write_u64(&mut buf[0..8], delta.as_secs()); + BE::write_u32(&mut buf[8..12], delta.subsec_nanos()); + + if now < deadline { + sleeper.sleep(deadline - now); + } + + let udp = &*udp; + for addr in &destinations { + BE::write_u32(&mut buf[12..16], totalctr); + udp.send_to(&buf[..], addr).expect("UDP send_to failed"); + totalctr+=1; + } + } +} + +fn receiving_thread( + udp: Arc, + duration_seconds: u64, + packet_size: usize, + time_base: Instant, +) { + let mut buf = vec![0; packet_size]; + loop { + let (_len, _addr) = udp.recv_from(&mut buf[..]).expect("Failed to receive packet"); + println!("Received a packet from {}", _addr); + } +} + fn main() -> Result<(), Error> { let opt = Opt::from_args(); let local_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); let probing_udp = std::net::UdpSocket::bind(local_addr)?; + let probing_udp = Arc::new(probing_udp); + let time_base = Instant::now(); // Phase 1: Query my own external address let extaddr = stunclient::StunClient::new(opt.server) @@ -91,6 +149,8 @@ fn main() -> Result<(), Error> { let k = opt.num_connections; let duration = opt.duration; let delay_after_stopping_sender = opt.delay_after_stopping_sender; + let packet_size = opt.packet_size; + let pps = opt.packets_per_second; let clientstream = futures::stream::repeat::<_,Error>( (opt.server, opt.username, opt.password), @@ -149,18 +209,39 @@ fn main() -> Result<(), Error> { let clienthandles = clienthandles.and_then(move |x|{ let (init_handles, shutdown_handles) : (Vec<_>, Vec<_>) = x.into_iter().unzip(); futures::future::join_all(init_handles) - .and_then(|h| { - eprintln!("Allocated {} TURN clients", h.len()); - futures::future::ok(()) - }) .map_err(|_e|Error::from("Oneshot error")) - .and_then(move |()| { + .and_then(move |destinations| { + eprintln!("Allocated {} TURN clients", destinations.len()); + // Phase 3: Starting sender and receiver + + let probing_udp2 = probing_udp.clone(); + std::thread::spawn(move || { + sending_thread( + probing_udp2, + packet_size, + pps, + duration, + destinations, + time_base, + ); + }); + std::thread::spawn(move || { + receiving_thread( + probing_udp, + duration, + packet_size, + time_base, + ); + }); + tokio_timer::Delay::new( Instant::now() + Duration::from_secs( duration + delay_after_stopping_sender ) ).and_then(|()| { + // Phase 4: Stopping + eprintln!("Stopping TURN clients"); for sh in shutdown_handles { sh.send(());