diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 00000000000..e2748cad7b7 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "libp2p-autonat" +edition = "2021" +rust-version = "1.56.1" +version = "0.20.0" +authors = ["David Craven ", "Elena Frank "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[build-dependencies] +prost-build = "0.6" + +[dependencies] +async-trait = "0.1" +futures = "0.3" +futures-timer = "3.0" +instant = "0.1" +libp2p-core = { version = "0.31.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.33.0", path = "../../swarm" } +libp2p-request-response = { version = "0.15.0", path = "../request-response" } +log = "0.4" +rand = "0.8" +prost = "0.8" + +[dev-dependencies] +async-std = { version = "1.10", features = ["attributes"] } +env_logger = "0.9" +structopt = "0.3" + + +[dev-dependencies.libp2p] +path = "../../" +default-features = false +features = ["autonat", "dns-async-std", "identify", "mplex", "noise", "tcp-async-io", "websocket", "yamux"] diff --git a/build.rs b/build.rs new file mode 100644 index 00000000000..d3714fdec14 --- /dev/null +++ b/build.rs @@ -0,0 +1,23 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +fn main() { + prost_build::compile_protos(&["src/structs.proto"], &["src"]).unwrap(); +} diff --git a/examples/client.rs b/examples/client.rs new file mode 100644 index 00000000000..d99a2b07715 --- /dev/null +++ b/examples/client.rs @@ -0,0 +1,135 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Basic example that combines the AutoNAT and identify protocols. +//! +//! The identify protocol informs the local peer of its external addresses, that are then send in AutoNAT dial-back +//! requests to the server. +//! +//! To run this example, follow the instructions in `examples/server` to start a server, then run in a new terminal: +//! ```sh +//! cargo run --example client -- --server-address --server-peer-id --listen_port +//! ``` +//! The `listen_port` parameter is optional and allows to set a fixed port at which the local client should listen. + +use futures::prelude::*; +use libp2p::autonat; +use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent}; +use libp2p::multiaddr::Protocol; +use libp2p::swarm::{Swarm, SwarmEvent}; +use libp2p::{identity, Multiaddr, NetworkBehaviour, PeerId}; +use std::error::Error; +use std::net::Ipv4Addr; +use std::time::Duration; +use structopt::StructOpt; + +#[derive(Debug, StructOpt)] +#[structopt(name = "libp2p autonat")] +struct Opt { + #[structopt(long)] + listen_port: Option, + + #[structopt(long)] + server_address: Multiaddr, + + #[structopt(long)] + server_peer_id: PeerId, +} + +#[async_std::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let opt = Opt::from_args(); + + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + println!("Local peer id: {:?}", local_peer_id); + + let transport = libp2p::development_transport(local_key.clone()).await?; + + let behaviour = Behaviour::new(local_key.public()); + + let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + swarm.listen_on( + Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) + .with(Protocol::Tcp(opt.listen_port.unwrap_or(0))), + )?; + + swarm + .behaviour_mut() + .auto_nat + .add_server(opt.server_peer_id, Some(opt.server_address)); + + loop { + match swarm.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address), + SwarmEvent::Behaviour(event) => println!("{:?}", event), + e => println!("{:?}", e), + } + } +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "Event")] +struct Behaviour { + identify: Identify, + auto_nat: autonat::Behaviour, +} + +impl Behaviour { + fn new(local_public_key: identity::PublicKey) -> Self { + Self { + identify: Identify::new(IdentifyConfig::new( + "/ipfs/0.1.0".into(), + local_public_key.clone(), + )), + auto_nat: autonat::Behaviour::new( + local_public_key.to_peer_id(), + autonat::Config { + retry_interval: Duration::from_secs(10), + refresh_interval: Duration::from_secs(30), + boot_delay: Duration::from_secs(5), + throttle_server_period: Duration::ZERO, + ..Default::default() + }, + ), + } + } +} + +#[derive(Debug)] +enum Event { + AutoNat(autonat::Event), + Identify(IdentifyEvent), +} + +impl From for Event { + fn from(v: IdentifyEvent) -> Self { + Self::Identify(v) + } +} + +impl From for Event { + fn from(v: autonat::Event) -> Self { + Self::AutoNat(v) + } +} diff --git a/examples/server.rs b/examples/server.rs new file mode 100644 index 00000000000..556367fdc90 --- /dev/null +++ b/examples/server.rs @@ -0,0 +1,114 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Basic example for a AutoNAT server that supports the /libp2p/autonat/1.0.0 and "/ipfs/0.1.0" protocols. +//! +//! To start the server run: +//! ```sh +//! cargo run --example server -- --listen_port +//! ``` +//! The `listen_port` parameter is optional and allows to set a fixed port at which the local peer should listen. + +use futures::prelude::*; +use libp2p::autonat; +use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent}; +use libp2p::multiaddr::Protocol; +use libp2p::swarm::{Swarm, SwarmEvent}; +use libp2p::{identity, Multiaddr, NetworkBehaviour, PeerId}; +use std::error::Error; +use std::net::Ipv4Addr; +use structopt::StructOpt; + +#[derive(Debug, StructOpt)] +#[structopt(name = "libp2p autonat")] +struct Opt { + #[structopt(long)] + listen_port: Option, +} + +#[async_std::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let opt = Opt::from_args(); + + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + println!("Local peer id: {:?}", local_peer_id); + + let transport = libp2p::development_transport(local_key.clone()).await?; + + let behaviour = Behaviour::new(local_key.public()); + + let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + swarm.listen_on( + Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) + .with(Protocol::Tcp(opt.listen_port.unwrap_or(0))), + )?; + + loop { + match swarm.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address), + SwarmEvent::Behaviour(event) => println!("{:?}", event), + e => println!("{:?}", e), + } + } +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "Event")] +struct Behaviour { + identify: Identify, + auto_nat: autonat::Behaviour, +} + +impl Behaviour { + fn new(local_public_key: identity::PublicKey) -> Self { + Self { + identify: Identify::new(IdentifyConfig::new( + "/ipfs/0.1.0".into(), + local_public_key.clone(), + )), + auto_nat: autonat::Behaviour::new( + local_public_key.to_peer_id(), + autonat::Config::default(), + ), + } + } +} + +#[derive(Debug)] +enum Event { + AutoNat(autonat::Event), + Identify(IdentifyEvent), +} + +impl From for Event { + fn from(v: IdentifyEvent) -> Self { + Self::Identify(v) + } +} + +impl From for Event { + fn from(v: autonat::Event) -> Self { + Self::AutoNat(v) + } +} diff --git a/src/behaviour.rs b/src/behaviour.rs new file mode 100644 index 00000000000..8a8e1d1eabe --- /dev/null +++ b/src/behaviour.rs @@ -0,0 +1,501 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +mod as_client; +mod as_server; + +use crate::protocol::{AutoNatCodec, AutoNatProtocol, DialRequest, DialResponse, ResponseError}; +use as_client::AsClient; +pub use as_client::{OutboundProbeError, OutboundProbeEvent}; +use as_server::AsServer; +pub use as_server::{InboundProbeError, InboundProbeEvent}; +use futures_timer::Delay; +use instant::Instant; +use libp2p_core::{ + connection::{ConnectionId, ListenerId}, + ConnectedPoint, Multiaddr, PeerId, +}; +use libp2p_request_response::{ + handler::RequestResponseHandlerEvent, ProtocolSupport, RequestId, RequestResponse, + RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel, +}; +use libp2p_swarm::{ + DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +}; +use std::{ + collections::{HashMap, VecDeque}, + iter, + task::{Context, Poll}, + time::Duration, +}; + +/// Config for the [`Behaviour`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Config { + /// Timeout for requests. + pub timeout: Duration, + + // Client Config + /// Delay on init before starting the fist probe. + pub boot_delay: Duration, + /// Interval in which the NAT should be tested again if max confidence was reached in a status. + pub refresh_interval: Duration, + /// Interval in which the NAT status should be re-tried if it is currently unknown + /// or max confidence was not reached yet. + pub retry_interval: Duration, + /// Throttle period for re-using a peer as server for a dial-request. + pub throttle_server_period: Duration, + /// Use connected peers as servers for probes. + pub use_connected: bool, + /// Max confidence that can be reached in a public / private NAT status. + /// Note: for [`NatStatus::Unknown`] the confidence is always 0. + pub confidence_max: usize, + + // Server Config + /// Max addresses that are tried per peer. + pub max_peer_addresses: usize, + /// Max total dial requests done in `[Config::throttle_clients_period`]. + pub throttle_clients_global_max: usize, + /// Max dial requests done in `[Config::throttle_clients_period`] for a peer. + pub throttle_clients_peer_max: usize, + /// Period for throttling clients requests. + pub throttle_clients_period: Duration, +} + +impl Default for Config { + fn default() -> Self { + Config { + timeout: Duration::from_secs(30), + boot_delay: Duration::from_secs(15), + retry_interval: Duration::from_secs(90), + refresh_interval: Duration::from_secs(15 * 60), + throttle_server_period: Duration::from_secs(90), + use_connected: true, + confidence_max: 3, + max_peer_addresses: 16, + throttle_clients_global_max: 30, + throttle_clients_peer_max: 3, + throttle_clients_period: Duration::from_secs(1), + } + } +} + +/// Assumed NAT status. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum NatStatus { + Public(Multiaddr), + Private, + Unknown, +} + +impl NatStatus { + pub fn is_public(&self) -> bool { + matches!(self, NatStatus::Public(..)) + } +} + +/// Unique identifier for a probe. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ProbeId(usize); + +impl ProbeId { + fn next(&mut self) -> ProbeId { + let current = *self; + self.0 += 1; + current + } +} + +/// Event produced by [`Behaviour`]. +#[derive(Debug, Clone, PartialEq)] +pub enum Event { + /// Event on an inbound probe. + InboundProbe(InboundProbeEvent), + /// Event on an outbound probe. + OutboundProbe(OutboundProbeEvent), + /// The assumed NAT changed. + StatusChanged { + /// Former status. + old: NatStatus, + /// New status. + new: NatStatus, + }, +} + +/// [`NetworkBehaviour`] for AutoNAT. +/// +/// The behaviour frequently runs probes to determine whether the local peer is behind NAT and/ or a firewall, or +/// publicly reachable. +/// In a probe, a dial-back request is sent to a peer that is randomly selected from the list of fixed servers and +/// connected peers. Upon receiving a dial-back request, the remote tries to dial the included addresses. When a +/// first address was successfully dialed, a status Ok will be send back together with the dialed address. If no address +/// can be reached a dial-error is send back. +/// Based on the received response, the sender assumes themselves to be public or private. +/// The status is retried in a frequency of [`Config::retry_interval`] or [`Config::retry_interval`], depending on whether +/// enough confidence in the assumed NAT status was reached or not. +/// The confidence increases each time a probe confirms the assumed status, and decreases if a different status is reported. +/// If the confidence is 0, the status is flipped and the Behaviour will report the new status in an `OutEvent`. +pub struct Behaviour { + // Local peer id + local_peer_id: PeerId, + + // Inner behaviour for sending requests and receiving the response. + inner: RequestResponse, + + config: Config, + + // Additional peers apart from the currently connected ones, that may be used for probes. + servers: Vec, + + // Assumed NAT status. + nat_status: NatStatus, + + // Confidence in the assumed NAT status. + confidence: usize, + + // Timer for the next probe. + schedule_probe: Delay, + + // Ongoing inbound requests, where no response has been sent back to the remote yet. + ongoing_inbound: HashMap< + PeerId, + ( + ProbeId, + RequestId, + Vec, + ResponseChannel, + ), + >, + + // Ongoing outbound probes and mapped to the inner request id. + ongoing_outbound: HashMap, + + // Connected peers with the observed address of each connection. + // If the endpoint of a connection is relayed, the observed address is `None`. + connected: HashMap>>, + + // Used servers in recent outbound probes that are throttled through Config::throttle_server_period. + throttled_servers: Vec<(PeerId, Instant)>, + + // Recent probes done for clients + throttled_clients: Vec<(PeerId, Instant)>, + + last_probe: Option, + + pending_out_events: VecDeque<::OutEvent>, + + probe_id: ProbeId, +} + +impl Behaviour { + pub fn new(local_peer_id: PeerId, config: Config) -> Self { + let protocols = iter::once((AutoNatProtocol, ProtocolSupport::Full)); + let mut cfg = RequestResponseConfig::default(); + cfg.set_request_timeout(config.timeout); + let inner = RequestResponse::new(AutoNatCodec, protocols, cfg); + Self { + local_peer_id, + inner, + schedule_probe: Delay::new(config.boot_delay), + config, + servers: Vec::new(), + ongoing_inbound: HashMap::default(), + ongoing_outbound: HashMap::default(), + connected: HashMap::default(), + nat_status: NatStatus::Unknown, + confidence: 0, + throttled_servers: Vec::new(), + throttled_clients: Vec::new(), + last_probe: None, + pending_out_events: VecDeque::new(), + probe_id: ProbeId(0), + } + } + + /// Assumed public address of the local peer. + /// Returns `None` in case of status [`NatStatus::Private`] or [`NatStatus::Unknown`]. + pub fn public_address(&self) -> Option<&Multiaddr> { + match &self.nat_status { + NatStatus::Public(address) => Some(address), + _ => None, + } + } + + /// Assumed NAT status. + pub fn nat_status(&self) -> NatStatus { + self.nat_status.clone() + } + + /// Confidence in the assumed NAT status. + pub fn confidence(&self) -> usize { + self.confidence + } + + /// Add a peer to the list over servers that may be used for probes. + /// These peers are used for dial-request even if they are currently not connection, in which case a connection will be + /// establish before sending the dial-request. + pub fn add_server(&mut self, peer: PeerId, address: Option) { + self.servers.push(peer); + if let Some(addr) = address { + self.inner.add_address(&peer, addr); + } + } + + /// Remove a peer from the list of servers. + /// See [`Behaviour::add_server`] for more info. + pub fn remove_server(&mut self, peer: &PeerId) { + self.servers.retain(|p| p != peer); + } + + fn as_client(&mut self) -> AsClient { + AsClient { + inner: &mut self.inner, + local_peer_id: self.local_peer_id, + config: &self.config, + connected: &self.connected, + probe_id: &mut self.probe_id, + servers: &self.servers, + throttled_servers: &mut self.throttled_servers, + nat_status: &mut self.nat_status, + confidence: &mut self.confidence, + ongoing_outbound: &mut self.ongoing_outbound, + last_probe: &mut self.last_probe, + schedule_probe: &mut self.schedule_probe, + } + } + + fn as_server(&mut self) -> AsServer { + AsServer { + inner: &mut self.inner, + config: &self.config, + connected: &self.connected, + probe_id: &mut self.probe_id, + throttled_clients: &mut self.throttled_clients, + ongoing_inbound: &mut self.ongoing_inbound, + } + } +} + +impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = Event; + + fn inject_connection_established( + &mut self, + peer: &PeerId, + conn: &ConnectionId, + endpoint: &ConnectedPoint, + failed_addresses: Option<&Vec>, + ) { + self.inner + .inject_connection_established(peer, conn, endpoint, failed_addresses); + let connections = self.connected.entry(*peer).or_default(); + let addr = if endpoint.is_relayed() { + None + } else { + Some(endpoint.get_remote_address().clone()) + }; + connections.insert(*conn, addr); + + match endpoint { + ConnectedPoint::Dialer { address } => { + if let Some(event) = self.as_server().on_outbound_connection(peer, address) { + self.pending_out_events + .push_back(Event::InboundProbe(event)); + } + } + ConnectedPoint::Listener { .. } => self.as_client().on_inbound_connection(), + } + } + + fn inject_connection_closed( + &mut self, + peer: &PeerId, + conn: &ConnectionId, + endpoint: &ConnectedPoint, + handler: ::Handler, + ) { + self.inner + .inject_connection_closed(peer, conn, endpoint, handler); + let connections = self.connected.get_mut(peer).expect("Peer is connected."); + connections.remove(conn); + } + + fn inject_dial_failure( + &mut self, + peer: Option, + handler: Self::ProtocolsHandler, + error: &DialError, + ) { + self.inner.inject_dial_failure(peer, handler, error); + if let Some(event) = self.as_server().on_outbound_dial_error(peer, error) { + self.pending_out_events + .push_back(Event::InboundProbe(event)); + } + } + + fn inject_disconnected(&mut self, peer: &PeerId) { + self.inner.inject_disconnected(peer); + self.connected.remove(peer); + } + + fn inject_address_change( + &mut self, + peer: &PeerId, + conn: &ConnectionId, + old: &ConnectedPoint, + new: &ConnectedPoint, + ) { + self.inner.inject_address_change(peer, conn, old, new); + + if old.is_relayed() && new.is_relayed() { + return; + } + let connections = self.connected.get_mut(peer).expect("Peer is connected."); + let addr = if new.is_relayed() { + None + } else { + Some(new.get_remote_address().clone()) + }; + connections.insert(*conn, addr); + } + + fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { + self.inner.inject_new_listen_addr(id, addr); + self.as_client().on_new_address(); + } + + fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { + self.inner.inject_expired_listen_addr(id, addr); + self.as_client().on_expired_address(addr); + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + self.inner.inject_new_external_addr(addr); + self.as_client().on_new_address(); + } + + fn inject_expired_external_addr(&mut self, addr: &Multiaddr) { + self.inner.inject_expired_external_addr(addr); + self.as_client().on_expired_address(addr); + } + + fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll { + loop { + if let Some(event) = self.pending_out_events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + let mut is_inner_pending = false; + match self.inner.poll(cx, params) { + Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { + let (mut events, action) = match event { + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { .. }, + .. + } + | RequestResponseEvent::OutboundFailure { .. } => { + self.as_client().handle_event(params, event) + } + RequestResponseEvent::Message { + message: RequestResponseMessage::Request { .. }, + .. + } + | RequestResponseEvent::InboundFailure { .. } => { + self.as_server().handle_event(params, event) + } + RequestResponseEvent::ResponseSent { .. } => (VecDeque::new(), None), + }; + self.pending_out_events.append(&mut events); + if let Some(action) = action { + return Poll::Ready(action); + } + } + Poll::Ready(action) => return Poll::Ready(action.map_out(|_| unreachable!())), + Poll::Pending => is_inner_pending = true, + } + + match self.as_client().poll_auto_probe(params, cx) { + Poll::Ready(event) => self + .pending_out_events + .push_back(Event::OutboundProbe(event)), + Poll::Pending if is_inner_pending => return Poll::Pending, + Poll::Pending => {} + } + } + } + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.inner.new_handler() + } + + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + self.inner.addresses_of_peer(peer) + } + + fn inject_connected(&mut self, peer: &PeerId) { + self.inner.inject_connected(peer) + } + + fn inject_event( + &mut self, + peer_id: PeerId, + conn: ConnectionId, + event: RequestResponseHandlerEvent, + ) { + self.inner.inject_event(peer_id, conn, event) + } + + fn inject_listen_failure( + &mut self, + local_addr: &Multiaddr, + send_back_addr: &Multiaddr, + handler: Self::ProtocolsHandler, + ) { + self.inner + .inject_listen_failure(local_addr, send_back_addr, handler) + } + + fn inject_new_listener(&mut self, id: ListenerId) { + self.inner.inject_new_listener(id) + } + + fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { + self.inner.inject_listener_error(id, err) + } + + fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) { + self.inner.inject_listener_closed(id, reason) + } +} + +type Action = NetworkBehaviourAction< + ::OutEvent, + ::ProtocolsHandler, +>; + +// Trait implemented for `AsClient` as `AsServer` to handle events from the inner [`RequestResponse`] Protocol. +trait HandleInnerEvent { + fn handle_event( + &mut self, + params: &mut impl PollParameters, + event: RequestResponseEvent, + ) -> (VecDeque, Option); +} diff --git a/src/behaviour/as_client.rs b/src/behaviour/as_client.rs new file mode 100644 index 00000000000..de6a00d5629 --- /dev/null +++ b/src/behaviour/as_client.rs @@ -0,0 +1,373 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::ResponseError; + +use super::{ + Action, AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, NatStatus, + ProbeId, +}; +use futures::FutureExt; +use futures_timer::Delay; +use instant::Instant; +use libp2p_core::{connection::ConnectionId, Multiaddr, PeerId}; +use libp2p_request_response::{ + OutboundFailure, RequestId, RequestResponse, RequestResponseEvent, RequestResponseMessage, +}; +use libp2p_swarm::{AddressScore, NetworkBehaviourAction, PollParameters}; +use rand::{seq::SliceRandom, thread_rng}; +use std::{ + collections::{HashMap, VecDeque}, + task::{Context, Poll}, + time::Duration, +}; + +/// Outbound probe failed or was aborted. +#[derive(Debug, Clone, PartialEq)] +pub enum OutboundProbeError { + /// Probe was aborted because no server is known, or all servers + /// are throttled through [`Config::throttle_server_period`]. + NoServer, + /// Probe was aborted because the local peer has no listening or + /// external addresses. + NoAddresses, + /// Sending the dial-back request or receiving a response failed. + OutboundRequest(OutboundFailure), + /// The server refused or failed to dial us. + Response(ResponseError), +} + +#[derive(Debug, Clone, PartialEq)] +pub enum OutboundProbeEvent { + /// A dial-back request was sent to a remote peer. + Request { + probe_id: ProbeId, + /// Peer to which the request is sent. + peer: PeerId, + }, + /// The remote successfully dialed one of our addresses. + Response { + probe_id: ProbeId, + /// Id of the peer that sent the response. + peer: PeerId, + /// The address at which the remote succeeded to dial us. + address: Multiaddr, + }, + /// The outbound request failed, was rejected, or the remote could dial + /// none of our addresses. + Error { + probe_id: ProbeId, + /// Id of the peer used for the probe. + /// `None` if the probe was aborted due to no addresses or no qualified server. + peer: Option, + error: OutboundProbeError, + }, +} + +/// View over [`super::Behaviour`] in a client role. +pub struct AsClient<'a> { + pub inner: &'a mut RequestResponse, + pub local_peer_id: PeerId, + pub config: &'a Config, + pub connected: &'a HashMap>>, + pub probe_id: &'a mut ProbeId, + + pub servers: &'a Vec, + pub throttled_servers: &'a mut Vec<(PeerId, Instant)>, + + pub nat_status: &'a mut NatStatus, + pub confidence: &'a mut usize, + + pub ongoing_outbound: &'a mut HashMap, + + pub last_probe: &'a mut Option, + pub schedule_probe: &'a mut Delay, +} + +impl<'a> HandleInnerEvent for AsClient<'a> { + fn handle_event( + &mut self, + params: &mut impl PollParameters, + event: RequestResponseEvent, + ) -> (VecDeque, Option) { + let mut events = VecDeque::new(); + let mut action = None; + match event { + RequestResponseEvent::Message { + peer, + message: + RequestResponseMessage::Response { + request_id, + response, + }, + } => { + log::debug!("Outbound dial-back request returned {:?}.", response); + + let probe_id = self + .ongoing_outbound + .remove(&request_id) + .expect("RequestId exists."); + + let event = match response.result.clone() { + Ok(address) => OutboundProbeEvent::Response { + probe_id, + peer, + address, + }, + Err(e) => OutboundProbeEvent::Error { + probe_id, + peer: Some(peer), + error: OutboundProbeError::Response(e), + }, + }; + events.push_back(Event::OutboundProbe(event)); + + if let Some(old) = self.handle_reported_status(response.result.clone().into()) { + events.push_back(Event::StatusChanged { + old, + new: self.nat_status.clone(), + }); + } + + if let Ok(address) = response.result { + // Update observed address score if it is finite. + let score = params + .external_addresses() + .find_map(|r| (r.addr == address).then(|| r.score)) + .unwrap_or(AddressScore::Finite(0)); + if let AddressScore::Finite(finite_score) = score { + action = Some(NetworkBehaviourAction::ReportObservedAddr { + address, + score: AddressScore::Finite(finite_score + 1), + }); + } + } + } + RequestResponseEvent::OutboundFailure { + peer, + error, + request_id, + } => { + log::debug!( + "Outbound Failure {} when on dial-back request to peer {}.", + error, + peer + ); + let probe_id = self + .ongoing_outbound + .remove(&request_id) + .unwrap_or_else(|| self.probe_id.next()); + + events.push_back(Event::OutboundProbe(OutboundProbeEvent::Error { + probe_id, + peer: Some(peer), + error: OutboundProbeError::OutboundRequest(error), + })); + + self.schedule_probe.reset(Duration::ZERO); + } + _ => {} + } + (events, action) + } +} + +impl<'a> AsClient<'a> { + pub fn poll_auto_probe( + &mut self, + params: &mut impl PollParameters, + cx: &mut Context<'_>, + ) -> Poll { + match self.schedule_probe.poll_unpin(cx) { + Poll::Ready(()) => { + self.schedule_probe.reset(self.config.retry_interval); + + let mut addresses: Vec<_> = params.external_addresses().map(|r| r.addr).collect(); + addresses.extend(params.listened_addresses()); + + let probe_id = self.probe_id.next(); + let event = match self.do_probe(probe_id, addresses) { + Ok(peer) => OutboundProbeEvent::Request { probe_id, peer }, + Err(error) => { + self.handle_reported_status(NatStatus::Unknown); + OutboundProbeEvent::Error { + probe_id, + peer: None, + error, + } + } + }; + Poll::Ready(event) + } + Poll::Pending => Poll::Pending, + } + } + + // An inbound connection can indicate that we are public; adjust the delay to the next probe. + pub fn on_inbound_connection(&mut self) { + if *self.confidence == self.config.confidence_max { + if self.nat_status.is_public() { + self.schedule_next_probe(self.config.refresh_interval * 2); + } else { + self.schedule_next_probe(self.config.refresh_interval / 5); + } + } + } + + pub fn on_new_address(&mut self) { + if !self.nat_status.is_public() { + // New address could be publicly reachable, trigger retry. + if *self.confidence > 0 { + *self.confidence -= 1; + } + self.schedule_next_probe(self.config.retry_interval); + } + } + + pub fn on_expired_address(&mut self, addr: &Multiaddr) { + if let NatStatus::Public(public_address) = self.nat_status { + if public_address == addr { + *self.confidence = 0; + *self.nat_status = NatStatus::Unknown; + self.schedule_next_probe(Duration::ZERO); + } + } + } + + // Select a random server for the probe. + fn random_server(&mut self) -> Option { + // Update list of throttled servers. + let i = self.throttled_servers.partition_point(|(_, time)| { + *time + self.config.throttle_server_period < Instant::now() + }); + self.throttled_servers.drain(..i); + + let mut servers: Vec<&PeerId> = self.servers.iter().collect(); + + if self.config.use_connected { + servers.extend(self.connected.iter().map(|(id, _)| id)); + } + + servers.retain(|s| !self.throttled_servers.iter().any(|(id, _)| s == &id)); + + servers.choose(&mut thread_rng()).map(|&&p| p) + } + + // Send a dial-request to a randomly selected server. + // Returns the server that is used in this probe. + // `Err` if there are no qualified servers or no addresses. + fn do_probe( + &mut self, + probe_id: ProbeId, + addresses: Vec, + ) -> Result { + let _ = self.last_probe.insert(Instant::now()); + if addresses.is_empty() { + log::debug!("Outbound dial-back request aborted: No dial-back addresses."); + return Err(OutboundProbeError::NoAddresses); + } + let server = match self.random_server() { + Some(s) => s, + None => { + log::debug!("Outbound dial-back request aborted: No qualified server."); + return Err(OutboundProbeError::NoServer); + } + }; + let request_id = self.inner.send_request( + &server, + DialRequest { + peer_id: self.local_peer_id, + addresses, + }, + ); + self.throttled_servers.push((server, Instant::now())); + log::debug!("Send dial-back request to peer {}.", server); + self.ongoing_outbound.insert(request_id, probe_id); + Ok(server) + } + + // Set the delay to the next probe based on the time of our last probe + // and the specified delay. + fn schedule_next_probe(&mut self, delay: Duration) { + let last_probe_instant = match self.last_probe { + Some(instant) => instant, + None => { + return; + } + }; + let schedule_next = *last_probe_instant + delay; + self.schedule_probe + .reset(schedule_next.saturating_duration_since(Instant::now())); + } + + // Adapt current confidence and NAT status to the status reported by the latest probe. + // Return the old status if it flipped. + fn handle_reported_status(&mut self, reported_status: NatStatus) -> Option { + self.schedule_next_probe(self.config.retry_interval); + + if matches!(reported_status, NatStatus::Unknown) { + return None; + } + + if reported_status == *self.nat_status { + if *self.confidence < self.config.confidence_max { + *self.confidence += 1; + } + // Delay with (usually longer) refresh-interval. + if *self.confidence >= self.config.confidence_max { + self.schedule_next_probe(self.config.refresh_interval); + } + return None; + } + + if reported_status.is_public() && self.nat_status.is_public() { + // Different address than the currently assumed public address was reported. + // Switch address, but don't report as flipped. + *self.nat_status = reported_status; + return None; + } + if *self.confidence > 0 { + // Reduce confidence but keep old status. + *self.confidence -= 1; + return None; + } + + log::debug!( + "Flipped assumed NAT status from {:?} to {:?}", + self.nat_status, + reported_status + ); + + let old_status = self.nat_status.clone(); + *self.nat_status = reported_status; + + Some(old_status) + } +} + +impl From> for NatStatus { + fn from(result: Result) -> Self { + match result { + Ok(addr) => NatStatus::Public(addr), + Err(ResponseError::DialError) => NatStatus::Private, + _ => NatStatus::Unknown, + } + } +} diff --git a/src/behaviour/as_server.rs b/src/behaviour/as_server.rs new file mode 100644 index 00000000000..24ffb443deb --- /dev/null +++ b/src/behaviour/as_server.rs @@ -0,0 +1,439 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use super::{ + Action, AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, ProbeId, + ResponseError, +}; +use instant::Instant; +use libp2p_core::{connection::ConnectionId, multiaddr::Protocol, Multiaddr, PeerId}; +use libp2p_request_response::{ + InboundFailure, RequestId, RequestResponse, RequestResponseEvent, RequestResponseMessage, + ResponseChannel, +}; +use libp2p_swarm::{ + dial_opts::{DialOpts, PeerCondition}, + DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +}; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + num::NonZeroU8, +}; + +/// Inbound probe failed. +#[derive(Debug, Clone, PartialEq)] +pub enum InboundProbeError { + /// Receiving the dial-back request or sending a response failed. + InboundRequest(InboundFailure), + /// We refused or failed to dial the client. + Response(ResponseError), +} + +#[derive(Debug, Clone, PartialEq)] +pub enum InboundProbeEvent { + /// A dial-back request was received from a remote peer. + Request { + probe_id: ProbeId, + /// Peer that sent the request. + peer: PeerId, + /// The addresses that will be attempted to dial. + addresses: Vec, + }, + /// A dial request to the remote was successful. + Response { + probe_id: ProbeId, + /// Peer to which the response is sent. + peer: PeerId, + address: Multiaddr, + }, + /// The inbound request failed, was rejected, or none of the remote's + /// addresses could be dialed. + Error { + probe_id: ProbeId, + /// Peer that sent the dial-back request. + peer: PeerId, + error: InboundProbeError, + }, +} + +/// View over [`super::Behaviour`] in a server role. +pub struct AsServer<'a> { + pub inner: &'a mut RequestResponse, + pub config: &'a Config, + pub connected: &'a HashMap>>, + pub probe_id: &'a mut ProbeId, + + pub throttled_clients: &'a mut Vec<(PeerId, Instant)>, + + #[allow(clippy::type_complexity)] + pub ongoing_inbound: &'a mut HashMap< + PeerId, + ( + ProbeId, + RequestId, + Vec, + ResponseChannel, + ), + >, +} + +impl<'a> HandleInnerEvent for AsServer<'a> { + fn handle_event( + &mut self, + _params: &mut impl PollParameters, + event: RequestResponseEvent, + ) -> (VecDeque, Option) { + let mut events = VecDeque::new(); + let mut action = None; + match event { + RequestResponseEvent::Message { + peer, + message: + RequestResponseMessage::Request { + request_id, + request, + channel, + }, + } => { + let probe_id = self.probe_id.next(); + match self.resolve_inbound_request(peer, request) { + Ok(addrs) => { + log::debug!( + "Inbound dial request from Peer {} with dial-back addresses {:?}.", + peer, + addrs + ); + + self.ongoing_inbound + .insert(peer, (probe_id, request_id, addrs.clone(), channel)); + self.throttled_clients.push((peer, Instant::now())); + + events.push_back(Event::InboundProbe(InboundProbeEvent::Request { + probe_id, + peer, + addresses: addrs.clone(), + })); + + action = Some(NetworkBehaviourAction::Dial { + opts: DialOpts::peer_id(peer) + .condition(PeerCondition::Always) + .override_dial_concurrency_factor(NonZeroU8::new(1).expect("1 > 0")) + .addresses(addrs) + .build(), + handler: self.inner.new_handler(), + }); + } + Err((status_text, error)) => { + log::debug!( + "Reject inbound dial request from peer {}: {}.", + peer, + status_text + ); + + let response = DialResponse { + result: Err(error.clone()), + status_text: Some(status_text), + }; + let _ = self.inner.send_response(channel, response); + + events.push_back(Event::InboundProbe(InboundProbeEvent::Error { + probe_id, + peer, + error: InboundProbeError::Response(error), + })); + } + } + } + RequestResponseEvent::InboundFailure { + peer, + error, + request_id, + } => { + log::debug!( + "Inbound Failure {} when on dial-back request from peer {}.", + error, + peer + ); + + let probe_id = match self.ongoing_inbound.get(&peer) { + Some((_, rq_id, _, _)) if *rq_id == request_id => { + self.ongoing_inbound.remove(&peer).unwrap().0 + } + _ => self.probe_id.next(), + }; + + events.push_back(Event::InboundProbe(InboundProbeEvent::Error { + probe_id, + peer, + error: InboundProbeError::InboundRequest(error), + })); + } + _ => {} + } + (events, action) + } +} + +impl<'a> AsServer<'a> { + pub fn on_outbound_connection( + &mut self, + peer: &PeerId, + address: &Multiaddr, + ) -> Option { + let (_, _, addrs, _) = self.ongoing_inbound.get(peer)?; + + // Check if the dialed address was among the requested addresses. + if !addrs.contains(address) { + return None; + } + + log::debug!( + "Dial-back to peer {} succeeded at addr {:?}.", + peer, + address + ); + + let (probe_id, _, _, channel) = self.ongoing_inbound.remove(peer).unwrap(); + let response = DialResponse { + result: Ok(address.clone()), + status_text: None, + }; + let _ = self.inner.send_response(channel, response); + + Some(InboundProbeEvent::Response { + probe_id, + peer: *peer, + address: address.clone(), + }) + } + + pub fn on_outbound_dial_error( + &mut self, + peer: Option, + error: &DialError, + ) -> Option { + let (probe_id, _, _, channel) = peer.and_then(|p| self.ongoing_inbound.remove(&p))?; + log::debug!( + "Dial-back to peer {} failed with error {:?}.", + peer.unwrap(), + error + ); + let response_error = ResponseError::DialError; + let response = DialResponse { + result: Err(response_error.clone()), + status_text: Some("dial failed".to_string()), + }; + let _ = self.inner.send_response(channel, response); + + Some(InboundProbeEvent::Error { + probe_id, + peer: peer.expect("PeerId is present."), + error: InboundProbeError::Response(response_error), + }) + } + + // Validate the inbound request and collect the addresses to be dialed. + fn resolve_inbound_request( + &mut self, + sender: PeerId, + request: DialRequest, + ) -> Result, (String, ResponseError)> { + // Update list of throttled clients. + let i = self.throttled_clients.partition_point(|(_, time)| { + *time + self.config.throttle_clients_period < Instant::now() + }); + self.throttled_clients.drain(..i); + + if request.peer_id != sender { + let status_text = "peer id mismatch".to_string(); + return Err((status_text, ResponseError::BadRequest)); + } + + if self.ongoing_inbound.contains_key(&sender) { + let status_text = "dial-back already ongoing".to_string(); + return Err((status_text, ResponseError::DialRefused)); + } + + if self.throttled_clients.len() >= self.config.throttle_clients_global_max { + let status_text = "too many total dials".to_string(); + return Err((status_text, ResponseError::DialRefused)); + } + + let throttled_for_client = self + .throttled_clients + .iter() + .filter(|(p, _)| p == &sender) + .count(); + + if throttled_for_client >= self.config.throttle_clients_peer_max { + let status_text = "too many dials for peer".to_string(); + return Err((status_text, ResponseError::DialRefused)); + } + + // Obtain an observed address from non-relayed connections. + let observed_addr = self + .connected + .get(&sender) + .expect("Peer is connected.") + .values() + .find_map(|a| a.as_ref()) + .ok_or_else(|| { + let status_text = "no dial-request over relayed connections".to_string(); + (status_text, ResponseError::DialError) + })?; + + let mut addrs = Self::filter_valid_addrs(sender, request.addresses, observed_addr); + addrs.truncate(self.config.max_peer_addresses); + + if addrs.is_empty() { + let status_text = "no dialable addresses".to_string(); + return Err((status_text, ResponseError::DialError)); + } + + Ok(addrs) + } + + // Filter dial addresses and replace demanded ip with the observed one. + fn filter_valid_addrs( + peer: PeerId, + demanded: Vec, + observed_remote_at: &Multiaddr, + ) -> Vec { + // Skip if the observed address is a relay address. + if observed_remote_at.iter().any(|p| p == Protocol::P2pCircuit) { + return Vec::new(); + } + let observed_ip = match observed_remote_at + .into_iter() + .find(|p| matches!(p, Protocol::Ip4(_) | Protocol::Ip6(_))) + { + Some(ip) => ip, + None => return Vec::new(), + }; + let mut distinct = HashSet::new(); + demanded + .into_iter() + .filter_map(|addr| { + // Replace the demanded ip with the observed one. + let i = addr + .iter() + .position(|p| matches!(p, Protocol::Ip4(_) | Protocol::Ip6(_)))?; + let mut addr = addr.replace(i, |_| Some(observed_ip.clone()))?; + + let is_valid = addr.iter().all(|proto| match proto { + Protocol::P2pCircuit => false, + Protocol::P2p(hash) => hash == peer.into(), + _ => true, + }); + + if !is_valid { + return None; + } + if !addr.iter().any(|p| matches!(p, Protocol::P2p(_))) { + addr.push(Protocol::P2p(peer.into())) + } + // Only collect distinct addresses. + distinct.insert(addr.clone()).then(|| addr) + }) + .collect() + } +} + +#[cfg(test)] +mod test { + use super::*; + + use std::net::Ipv4Addr; + + fn random_ip<'a>() -> Protocol<'a> { + Protocol::Ip4(Ipv4Addr::new( + rand::random(), + rand::random(), + rand::random(), + rand::random(), + )) + } + fn random_port<'a>() -> Protocol<'a> { + Protocol::Tcp(rand::random()) + } + + #[test] + fn filter_addresses() { + let peer_id = PeerId::random(); + let observed_ip = random_ip(); + let observed_addr = Multiaddr::empty() + .with(observed_ip.clone()) + .with(random_port()) + .with(Protocol::P2p(peer_id.into())); + // Valid address with matching peer-id + let demanded_1 = Multiaddr::empty() + .with(random_ip()) + .with(random_port()) + .with(Protocol::P2p(peer_id.into())); + // Invalid because peer_id does not match + let demanded_2 = Multiaddr::empty() + .with(random_ip()) + .with(random_port()) + .with(Protocol::P2p(PeerId::random().into())); + // Valid address without peer-id + let demanded_3 = Multiaddr::empty().with(random_ip()).with(random_port()); + // Invalid because relayed + let demanded_4 = Multiaddr::empty() + .with(random_ip()) + .with(random_port()) + .with(Protocol::P2p(PeerId::random().into())) + .with(Protocol::P2pCircuit) + .with(Protocol::P2p(peer_id.into())); + let demanded = vec![ + demanded_1.clone(), + demanded_2, + demanded_3.clone(), + demanded_4, + ]; + let filtered = AsServer::filter_valid_addrs(peer_id, demanded, &observed_addr); + let expected_1 = demanded_1 + .replace(0, |_| Some(observed_ip.clone())) + .unwrap(); + let expected_2 = demanded_3 + .replace(0, |_| Some(observed_ip)) + .unwrap() + .with(Protocol::P2p(peer_id.into())); + assert_eq!(filtered, vec![expected_1, expected_2]); + } + + #[test] + fn skip_relayed_addr() { + let peer_id = PeerId::random(); + let observed_ip = random_ip(); + // Observed address is relayed. + let observed_addr = Multiaddr::empty() + .with(observed_ip.clone()) + .with(random_port()) + .with(Protocol::P2p(PeerId::random().into())) + .with(Protocol::P2pCircuit) + .with(Protocol::P2p(peer_id.into())); + let demanded = Multiaddr::empty() + .with(random_ip()) + .with(random_port()) + .with(Protocol::P2p(peer_id.into())); + let filtered = AsServer::filter_valid_addrs(peer_id, vec![demanded], &observed_addr); + assert!(filtered.is_empty()); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 00000000000..d55ab3acc14 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,36 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the AutoNAT protocol. +mod behaviour; +mod protocol; + +pub use self::{ + behaviour::{ + Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, NatStatus, + OutboundProbeError, OutboundProbeEvent, ProbeId, + }, + protocol::ResponseError, +}; +pub use libp2p_request_response::{InboundFailure, OutboundFailure}; + +mod structs_proto { + include!(concat!(env!("OUT_DIR"), "/structs.rs")); +} diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 00000000000..13647a4f7af --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,333 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::structs_proto; +use async_trait::async_trait; +use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use libp2p_core::{upgrade, Multiaddr, PeerId}; +use libp2p_request_response::{ProtocolName, RequestResponseCodec}; +use prost::Message; +use std::{convert::TryFrom, io}; + +#[derive(Clone, Debug)] +pub struct AutoNatProtocol; + +impl ProtocolName for AutoNatProtocol { + fn protocol_name(&self) -> &[u8] { + b"/libp2p/autonat/1.0.0" + } +} + +#[derive(Clone)] +pub struct AutoNatCodec; + +#[async_trait] +impl RequestResponseCodec for AutoNatCodec { + type Protocol = AutoNatProtocol; + type Request = DialRequest; + type Response = DialResponse; + + async fn read_request( + &mut self, + _: &AutoNatProtocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Send + Unpin, + { + let bytes = upgrade::read_length_prefixed(io, 1024).await?; + let request = DialRequest::from_bytes(&bytes)?; + Ok(request) + } + + async fn read_response( + &mut self, + _: &AutoNatProtocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Send + Unpin, + { + let bytes = upgrade::read_length_prefixed(io, 1024).await?; + let response = DialResponse::from_bytes(&bytes)?; + Ok(response) + } + + async fn write_request( + &mut self, + _: &AutoNatProtocol, + io: &mut T, + data: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Send + Unpin, + { + upgrade::write_length_prefixed(io, data.into_bytes()).await?; + io.close().await + } + + async fn write_response( + &mut self, + _: &AutoNatProtocol, + io: &mut T, + data: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Send + Unpin, + { + upgrade::write_length_prefixed(io, data.into_bytes()).await?; + io.close().await + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct DialRequest { + pub peer_id: PeerId, + pub addresses: Vec, +} + +impl DialRequest { + pub fn from_bytes(bytes: &[u8]) -> Result { + let msg = structs_proto::Message::decode(bytes) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; + if msg.r#type != Some(structs_proto::message::MessageType::Dial as _) { + return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid type")); + } + let (peer_id, addrs) = if let Some(structs_proto::message::Dial { + peer: + Some(structs_proto::message::PeerInfo { + id: Some(peer_id), + addrs, + }), + }) = msg.dial + { + (peer_id, addrs) + } else { + log::debug!("Received malformed dial message."); + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid dial message", + )); + }; + + let peer_id = { + PeerId::try_from(peer_id) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid peer id"))? + }; + let addrs = { + let mut maddrs = vec![]; + for addr in addrs.into_iter() { + let maddr = Multiaddr::try_from(addr) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; + maddrs.push(maddr); + } + maddrs + }; + Ok(Self { + peer_id, + addresses: addrs, + }) + } + + pub fn into_bytes(self) -> Vec { + let peer_id = self.peer_id.to_bytes(); + let addrs = self + .addresses + .into_iter() + .map(|addr| addr.to_vec()) + .collect(); + + let msg = structs_proto::Message { + r#type: Some(structs_proto::message::MessageType::Dial as _), + dial: Some(structs_proto::message::Dial { + peer: Some(structs_proto::message::PeerInfo { + id: Some(peer_id), + addrs, + }), + }), + dial_response: None, + }; + + let mut bytes = Vec::with_capacity(msg.encoded_len()); + msg.encode(&mut bytes) + .expect("Vec provides capacity as needed"); + bytes + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum ResponseError { + DialError, + DialRefused, + BadRequest, + InternalError, +} + +impl From for i32 { + fn from(t: ResponseError) -> Self { + match t { + ResponseError::DialError => 100, + ResponseError::DialRefused => 101, + ResponseError::BadRequest => 200, + ResponseError::InternalError => 300, + } + } +} + +impl TryFrom for ResponseError { + type Error = io::Error; + + fn try_from(value: structs_proto::message::ResponseStatus) -> Result { + match value { + structs_proto::message::ResponseStatus::EDialError => Ok(ResponseError::DialError), + structs_proto::message::ResponseStatus::EDialRefused => Ok(ResponseError::DialRefused), + structs_proto::message::ResponseStatus::EBadRequest => Ok(ResponseError::BadRequest), + structs_proto::message::ResponseStatus::EInternalError => { + Ok(ResponseError::InternalError) + } + structs_proto::message::ResponseStatus::Ok => { + log::debug!("Received response with status code OK but expected error."); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid response error type", + )) + } + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct DialResponse { + pub status_text: Option, + pub result: Result, +} + +impl DialResponse { + pub fn from_bytes(bytes: &[u8]) -> Result { + let msg = structs_proto::Message::decode(bytes) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; + if msg.r#type != Some(structs_proto::message::MessageType::DialResponse as _) { + return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid type")); + } + + Ok(match msg.dial_response { + Some(structs_proto::message::DialResponse { + status: Some(status), + status_text, + addr: Some(addr), + }) if structs_proto::message::ResponseStatus::from_i32(status) + == Some(structs_proto::message::ResponseStatus::Ok) => + { + let addr = Multiaddr::try_from(addr) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; + Self { + status_text, + result: Ok(addr), + } + } + Some(structs_proto::message::DialResponse { + status: Some(status), + status_text, + addr: None, + }) => Self { + status_text, + result: Err(ResponseError::try_from( + structs_proto::message::ResponseStatus::from_i32(status).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidData, "invalid response status code") + })?, + )?), + }, + _ => { + log::debug!("Received malformed response message."); + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid dial response message", + )); + } + }) + } + + pub fn into_bytes(self) -> Vec { + let dial_response = match self.result { + Ok(addr) => structs_proto::message::DialResponse { + status: Some(0), + status_text: self.status_text, + addr: Some(addr.to_vec()), + }, + Err(error) => structs_proto::message::DialResponse { + status: Some(error.into()), + status_text: self.status_text, + addr: None, + }, + }; + + let msg = structs_proto::Message { + r#type: Some(structs_proto::message::MessageType::DialResponse as _), + dial: None, + dial_response: Some(dial_response), + }; + + let mut bytes = Vec::with_capacity(msg.encoded_len()); + msg.encode(&mut bytes) + .expect("Vec provides capacity as needed"); + bytes + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_request_encode_decode() { + let request = DialRequest { + peer_id: PeerId::random(), + addresses: vec![ + "/ip4/8.8.8.8/tcp/30333".parse().unwrap(), + "/ip4/192.168.1.42/tcp/30333".parse().unwrap(), + ], + }; + let bytes = request.clone().into_bytes(); + let request2 = DialRequest::from_bytes(&bytes).unwrap(); + assert_eq!(request, request2); + } + + #[test] + fn test_response_ok_encode_decode() { + let response = DialResponse { + result: Ok("/ip4/8.8.8.8/tcp/30333".parse().unwrap()), + status_text: None, + }; + let bytes = response.clone().into_bytes(); + let response2 = DialResponse::from_bytes(&bytes).unwrap(); + assert_eq!(response, response2); + } + + #[test] + fn test_response_err_encode_decode() { + let response = DialResponse { + result: Err(ResponseError::DialError), + status_text: Some("dial failed".to_string()), + }; + let bytes = response.clone().into_bytes(); + let response2 = DialResponse::from_bytes(&bytes).unwrap(); + assert_eq!(response, response2); + } +} diff --git a/src/structs.proto b/src/structs.proto new file mode 100644 index 00000000000..19e27abd36a --- /dev/null +++ b/src/structs.proto @@ -0,0 +1,37 @@ +syntax = "proto2"; + +package structs; + +message Message { + enum MessageType { + DIAL = 0; + DIAL_RESPONSE = 1; + } + + enum ResponseStatus { + OK = 0; + E_DIAL_ERROR = 100; + E_DIAL_REFUSED = 101; + E_BAD_REQUEST = 200; + E_INTERNAL_ERROR = 300; + } + + message PeerInfo { + optional bytes id = 1; + repeated bytes addrs = 2; + } + + message Dial { + optional PeerInfo peer = 1; + } + + message DialResponse { + optional ResponseStatus status = 1; + optional string statusText = 2; + optional bytes addr = 3; + } + + optional MessageType type = 1; + optional Dial dial = 2; + optional DialResponse dialResponse = 3; +} diff --git a/tests/test_client.rs b/tests/test_client.rs new file mode 100644 index 00000000000..fcfb8922f2a --- /dev/null +++ b/tests/test_client.rs @@ -0,0 +1,513 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{channel::oneshot, Future, FutureExt, StreamExt}; +use futures_timer::Delay; +use libp2p::{ + development_transport, + identity::Keypair, + swarm::{AddressScore, Swarm, SwarmEvent}, + Multiaddr, PeerId, +}; +use libp2p_autonat::{ + Behaviour, Config, Event, NatStatus, OutboundProbeError, OutboundProbeEvent, ResponseError, +}; +use std::time::Duration; + +const MAX_CONFIDENCE: usize = 3; +const TEST_RETRY_INTERVAL: Duration = Duration::from_secs(1); +const TEST_REFRESH_INTERVAL: Duration = Duration::from_secs(2); + +async fn init_swarm(config: Config) -> Swarm { + let keypair = Keypair::generate_ed25519(); + let local_id = PeerId::from_public_key(&keypair.public()); + let transport = development_transport(keypair).await.unwrap(); + let behaviour = Behaviour::new(local_id, config); + Swarm::new(transport, behaviour, local_id) +} + +async fn spawn_server(kill: oneshot::Receiver<()>) -> (PeerId, Multiaddr) { + let (tx, rx) = oneshot::channel(); + async_std::task::spawn(async move { + let mut server = init_swarm(Config { + boot_delay: Duration::from_secs(60), + throttle_clients_peer_max: usize::MAX, + ..Default::default() + }) + .await; + let peer_id = *server.local_peer_id(); + server + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .unwrap(); + let addr = loop { + match server.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } => break address, + _ => {} + }; + }; + tx.send((peer_id, addr)).unwrap(); + let mut kill = kill.fuse(); + loop { + futures::select! { + _ = server.select_next_some() => {}, + _ = kill => return, + + } + } + }); + rx.await.unwrap() +} + +async fn next_event(swarm: &mut Swarm) -> Event { + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(event) => { + break event; + } + _ => {} + } + } +} + +async fn run_test_with_timeout(test: impl Future) { + futures::select! { + _ = test.fuse() => {}, + _ = Delay::new(Duration::from_secs(60)).fuse() => panic!("test timed out") + } +} + +#[async_std::test] +async fn test_auto_probe() { + let test = async { + let mut client = init_swarm(Config { + retry_interval: TEST_RETRY_INTERVAL, + refresh_interval: TEST_REFRESH_INTERVAL, + confidence_max: MAX_CONFIDENCE, + throttle_server_period: Duration::ZERO, + boot_delay: Duration::ZERO, + ..Default::default() + }) + .await; + + let (_handle, rx) = oneshot::channel(); + let (server_id, addr) = spawn_server(rx).await; + client.behaviour_mut().add_server(server_id, Some(addr)); + + // Initial status should be unknown. + assert_eq!(client.behaviour().nat_status(), NatStatus::Unknown); + assert!(client.behaviour().public_address().is_none()); + assert_eq!(client.behaviour().confidence(), 0); + + // Test no listening addresses + match next_event(&mut client).await { + Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => { + assert!(peer.is_none()); + assert_eq!(error, OutboundProbeError::NoAddresses); + } + other => panic!("Unexpected Event: {:?}", other), + } + + assert_eq!(client.behaviour().nat_status(), NatStatus::Unknown); + assert!(client.behaviour().public_address().is_none()); + assert_eq!(client.behaviour().confidence(), 0); + + // Test Private NAT Status + + // Artificially add a faulty address. + let unreachable_addr: Multiaddr = "/ip4/127.0.0.1/tcp/42".parse().unwrap(); + client.add_external_address(unreachable_addr.clone(), AddressScore::Infinite); + + let id = match next_event(&mut client).await { + Event::OutboundProbe(OutboundProbeEvent::Request { probe_id, peer }) => { + assert_eq!(peer, server_id); + probe_id + } + other => panic!("Unexpected Event: {:?}", other), + }; + + match next_event(&mut client).await { + Event::OutboundProbe(OutboundProbeEvent::Error { + probe_id, + peer, + error, + }) => { + assert_eq!(peer.unwrap(), server_id); + assert_eq!(probe_id, id); + assert_eq!( + error, + OutboundProbeError::Response(ResponseError::DialError) + ); + } + other => panic!("Unexpected Event: {:?}", other), + } + + match next_event(&mut client).await { + Event::StatusChanged { old, new } => { + assert_eq!(old, NatStatus::Unknown); + assert_eq!(new, NatStatus::Private); + } + other => panic!("Unexpected Event: {:?}", other), + } + + assert_eq!(client.behaviour().confidence(), 0); + assert_eq!(client.behaviour().nat_status(), NatStatus::Private); + assert!(client.behaviour().public_address().is_none()); + + // Test new public listening address + client + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .unwrap(); + loop { + match client.select_next_some().await { + SwarmEvent::NewListenAddr { .. } => break, + _ => {} + } + } + + let id = match next_event(&mut client).await { + Event::OutboundProbe(OutboundProbeEvent::Request { probe_id, peer }) => { + assert_eq!(peer, server_id); + probe_id + } + other => panic!("Unexpected Event: {:?}", other), + }; + + // Expect inbound dial from server. + loop { + match client.select_next_some().await { + SwarmEvent::ConnectionEstablished { + endpoint, peer_id, .. + } if endpoint.is_listener() => { + assert_eq!(peer_id, server_id); + break; + } + SwarmEvent::IncomingConnection { .. } | SwarmEvent::NewListenAddr { .. } => {} + _ => panic!("Unexpected Swarm Event"), + } + } + + match next_event(&mut client).await { + Event::OutboundProbe(OutboundProbeEvent::Response { probe_id, peer, .. }) => { + assert_eq!(peer, server_id); + assert_eq!(probe_id, id); + } + other => panic!("Unexpected Event: {:?}", other), + } + + // Expect to flip status to public + match next_event(&mut client).await { + Event::StatusChanged { old, new } => { + assert_eq!(old, NatStatus::Private); + assert!(matches!(new, NatStatus::Public(_))); + assert!(new.is_public()); + } + other => panic!("Unexpected Event: {:?}", other), + } + + assert_eq!(client.behaviour().confidence(), 0); + assert!(client.behaviour().nat_status().is_public()); + assert!(client.behaviour().public_address().is_some()); + + drop(_handle); + }; + + run_test_with_timeout(test).await; +} + +#[async_std::test] +async fn test_confidence() { + let test = async { + let mut client = init_swarm(Config { + retry_interval: TEST_RETRY_INTERVAL, + refresh_interval: TEST_REFRESH_INTERVAL, + confidence_max: MAX_CONFIDENCE, + throttle_server_period: Duration::ZERO, + boot_delay: Duration::from_millis(100), + ..Default::default() + }) + .await; + + let (_handle, rx) = oneshot::channel(); + let (server_id, addr) = spawn_server(rx).await; + client.behaviour_mut().add_server(server_id, Some(addr)); + + // Randomly test either for public or for private status the confidence. + let test_public = rand::random::(); + if test_public { + client + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .unwrap(); + loop { + match client.select_next_some().await { + SwarmEvent::NewListenAddr { .. } => break, + _ => {} + } + } + } else { + let unreachable_addr: Multiaddr = "/ip4/127.0.0.1/tcp/42".parse().unwrap(); + client.add_external_address(unreachable_addr.clone(), AddressScore::Infinite); + } + + for i in 0..MAX_CONFIDENCE + 1 { + let id = match next_event(&mut client).await { + Event::OutboundProbe(OutboundProbeEvent::Request { probe_id, peer }) => { + assert_eq!(peer, server_id); + probe_id + } + other => panic!("Unexpected Event: {:?}", other), + }; + + match next_event(&mut client).await { + Event::OutboundProbe(event) => { + let (peer, probe_id) = match event { + OutboundProbeEvent::Response { probe_id, peer, .. } if test_public => { + (peer, probe_id) + } + OutboundProbeEvent::Error { + probe_id, + peer, + error, + } if !test_public => { + assert_eq!( + error, + OutboundProbeError::Response(ResponseError::DialError) + ); + (peer.unwrap(), probe_id) + } + other => panic!("Unexpected Outbound Event: {:?}", other), + }; + assert_eq!(peer, server_id); + assert_eq!(probe_id, id); + } + other => panic!("Unexpected Event: {:?}", other), + } + + // Confidence should increase each iteration up to MAX_CONFIDENCE + let expect_confidence = if i <= MAX_CONFIDENCE { + i + } else { + MAX_CONFIDENCE + }; + assert_eq!(client.behaviour().confidence(), expect_confidence); + assert_eq!(client.behaviour().nat_status().is_public(), test_public); + + // Expect status to flip after first probe + if i == 0 { + match next_event(&mut client).await { + Event::StatusChanged { old, new } => { + assert_eq!(old, NatStatus::Unknown); + assert_eq!(new.is_public(), test_public); + } + other => panic!("Unexpected Event: {:?}", other), + } + } + } + + drop(_handle); + }; + + run_test_with_timeout(test).await; +} + +#[async_std::test] +async fn test_throttle_server_period() { + let test = async { + let mut client = init_swarm(Config { + retry_interval: TEST_RETRY_INTERVAL, + refresh_interval: TEST_REFRESH_INTERVAL, + confidence_max: MAX_CONFIDENCE, + // Throttle servers so they can not be re-used for dial request. + throttle_server_period: Duration::from_secs(1000), + boot_delay: Duration::from_millis(100), + ..Default::default() + }) + .await; + + client + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .unwrap(); + loop { + match client.select_next_some().await { + SwarmEvent::NewListenAddr { .. } => break, + _ => {} + } + } + + let (_handle, rx) = oneshot::channel(); + let (id, addr) = spawn_server(rx).await; + client.behaviour_mut().add_server(id, Some(addr)); + + // First probe should be successful and flip status to public. + loop { + match next_event(&mut client).await { + Event::StatusChanged { old, new } => { + assert_eq!(old, NatStatus::Unknown); + assert!(new.is_public()); + break; + } + Event::OutboundProbe(OutboundProbeEvent::Request { .. }) => {} + Event::OutboundProbe(OutboundProbeEvent::Response { .. }) => {} + other => panic!("Unexpected Event: {:?}", other), + } + } + + assert_eq!(client.behaviour().confidence(), 0); + + // Expect following probe to fail because server is throttled + + match next_event(&mut client).await { + Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => { + assert!(peer.is_none()); + assert_eq!(error, OutboundProbeError::NoServer); + } + other => panic!("Unexpected Event: {:?}", other), + } + assert_eq!(client.behaviour().confidence(), 0); + + drop(_handle) + }; + + run_test_with_timeout(test).await; +} + +#[async_std::test] +async fn test_use_connected_as_server() { + let test = async { + let mut client = init_swarm(Config { + retry_interval: TEST_RETRY_INTERVAL, + refresh_interval: TEST_REFRESH_INTERVAL, + confidence_max: MAX_CONFIDENCE, + throttle_server_period: Duration::ZERO, + boot_delay: Duration::from_millis(100), + ..Default::default() + }) + .await; + + let (_handle, rx) = oneshot::channel(); + let (server_id, addr) = spawn_server(rx).await; + + client + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .unwrap(); + + client.dial(addr).unwrap(); + + // await connection + loop { + if let SwarmEvent::ConnectionEstablished { peer_id, .. } = + client.select_next_some().await + { + assert_eq!(peer_id, server_id); + break; + } + } + + match next_event(&mut client).await { + Event::OutboundProbe(OutboundProbeEvent::Request { peer, .. }) => { + assert_eq!(peer, server_id); + } + other => panic!("Unexpected Event: {:?}", other), + } + + match next_event(&mut client).await { + Event::OutboundProbe(OutboundProbeEvent::Response { peer, .. }) => { + assert_eq!(peer, server_id); + } + other => panic!("Unexpected Event: {:?}", other), + } + + drop(_handle); + }; + + run_test_with_timeout(test).await; +} + +#[async_std::test] +async fn test_outbound_failure() { + let test = async { + let mut servers = Vec::new(); + + let mut client = init_swarm(Config { + retry_interval: TEST_RETRY_INTERVAL, + refresh_interval: TEST_REFRESH_INTERVAL, + confidence_max: MAX_CONFIDENCE, + throttle_server_period: Duration::ZERO, + boot_delay: Duration::from_millis(100), + ..Default::default() + }) + .await; + + for _ in 0..5 { + let (tx, rx) = oneshot::channel(); + let (id, addr) = spawn_server(rx).await; + client.behaviour_mut().add_server(id, Some(addr)); + servers.push((id, tx)); + } + + client + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .unwrap(); + + loop { + match client.select_next_some().await { + SwarmEvent::NewListenAddr { .. } => break, + _ => {} + } + } + // First probe should be successful and flip status to public. + loop { + match next_event(&mut client).await { + Event::StatusChanged { old, new } => { + assert_eq!(old, NatStatus::Unknown); + assert!(new.is_public()); + break; + } + Event::OutboundProbe(OutboundProbeEvent::Request { .. }) => {} + Event::OutboundProbe(OutboundProbeEvent::Response { .. }) => {} + other => panic!("Unexpected Event: {:?}", other), + } + } + + let inactive = servers.split_off(1); + // Drop the handles of the inactive servers to kill them. + let inactive_ids: Vec<_> = inactive.into_iter().map(|(id, _handle)| id).collect(); + + // Expect to retry on outbound failure + loop { + match next_event(&mut client).await { + Event::OutboundProbe(OutboundProbeEvent::Request { .. }) => {} + Event::OutboundProbe(OutboundProbeEvent::Response { peer, .. }) => { + assert_eq!(peer, servers[0].0); + break; + } + Event::OutboundProbe(OutboundProbeEvent::Error { + peer: Some(peer), + error: OutboundProbeError::OutboundRequest(_), + .. + }) => { + assert!(inactive_ids.contains(&peer)); + } + other => panic!("Unexpected Event: {:?}", other), + } + } + }; + + run_test_with_timeout(test).await; +} diff --git a/tests/test_server.rs b/tests/test_server.rs new file mode 100644 index 00000000000..b96adaf78a4 --- /dev/null +++ b/tests/test_server.rs @@ -0,0 +1,421 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{channel::oneshot, Future, FutureExt, StreamExt}; +use futures_timer::Delay; +use libp2p::{ + development_transport, + identity::Keypair, + multiaddr::Protocol, + swarm::{AddressScore, Swarm, SwarmEvent}, + Multiaddr, PeerId, +}; +use libp2p_autonat::{ + Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, ResponseError, +}; +use libp2p_core::ConnectedPoint; +use libp2p_swarm::DialError; +use std::{num::NonZeroU32, time::Duration}; + +async fn init_swarm(config: Config) -> Swarm { + let keypair = Keypair::generate_ed25519(); + let local_id = PeerId::from_public_key(&keypair.public()); + let transport = development_transport(keypair).await.unwrap(); + let behaviour = Behaviour::new(local_id, config); + Swarm::new(transport, behaviour, local_id) +} + +async fn init_server(config: Option) -> (Swarm, PeerId, Multiaddr) { + let mut config = config.unwrap_or_default(); + // Don't do any outbound probes. + config.boot_delay = Duration::from_secs(60); + + let mut server = init_swarm(config).await; + let peer_id = *server.local_peer_id(); + server + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .unwrap(); + let addr = loop { + match server.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } => break address, + _ => {} + }; + }; + (server, peer_id, addr) +} + +async fn spawn_client( + listen: bool, + add_dummy_external_addr: bool, + server_id: PeerId, + server_addr: Multiaddr, + kill: oneshot::Receiver<()>, +) -> (PeerId, Option) { + let (tx, rx) = oneshot::channel(); + async_std::task::spawn(async move { + let mut client = init_swarm(Config { + boot_delay: Duration::from_millis(100), + refresh_interval: Duration::from_millis(100), + retry_interval: Duration::from_millis(200), + throttle_server_period: Duration::ZERO, + ..Default::default() + }) + .await; + client + .behaviour_mut() + .add_server(server_id, Some(server_addr)); + let peer_id = *client.local_peer_id(); + let mut addr = None; + if listen { + client + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .unwrap(); + loop { + match client.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } => { + addr = Some(address); + break; + } + _ => {} + }; + } + } + if add_dummy_external_addr { + let dummy_addr: Multiaddr = "/ip4/127.0.0.1/tcp/42".parse().unwrap(); + client.add_external_address(dummy_addr, AddressScore::Infinite); + } + tx.send((peer_id, addr)).unwrap(); + let mut kill = kill.fuse(); + loop { + futures::select! { + _ = client.select_next_some() => {}, + _ = kill => return, + + } + } + }); + rx.await.unwrap() +} + +async fn next_event(swarm: &mut Swarm) -> Event { + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(event) => { + break event; + } + _ => {} + } + } +} + +async fn run_test_with_timeout(test: impl Future) { + futures::select! { + _ = test.fuse() => {}, + _ = Delay::new(Duration::from_secs(60)).fuse() => panic!("test timed out") + } +} + +#[async_std::test] +async fn test_dial_back() { + let test = async { + let (mut server, server_id, server_addr) = init_server(None).await; + let (_handle, rx) = oneshot::channel(); + let (client_id, client_addr) = spawn_client(true, false, server_id, server_addr, rx).await; + let client_port = client_addr + .unwrap() + .into_iter() + .find_map(|p| match p { + Protocol::Tcp(port) => Some(port), + _ => None, + }) + .unwrap(); + let observed_client_ip = loop { + match server.select_next_some().await { + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint: + ConnectedPoint::Listener { + mut send_back_addr, .. + }, + .. + } => { + assert_eq!(peer_id, client_id); + let observed_client_ip = loop { + match send_back_addr.pop().unwrap() { + Protocol::Ip4(ip4_addr) => break ip4_addr, + _ => {} + } + }; + break observed_client_ip; + } + SwarmEvent::IncomingConnection { .. } | SwarmEvent::NewListenAddr { .. } => {} + _ => panic!("Unexpected Swarm Event"), + } + }; + let expect_addr = Multiaddr::empty() + .with(Protocol::Ip4(observed_client_ip)) + .with(Protocol::Tcp(client_port)) + .with(Protocol::P2p(client_id.into())); + let request_probe_id = match next_event(&mut server).await { + Event::InboundProbe(InboundProbeEvent::Request { + peer, + addresses, + probe_id, + }) => { + assert_eq!(peer, client_id); + assert_eq!(addresses.len(), 1); + assert_eq!(addresses[0], expect_addr); + probe_id + } + other => panic!("Unexpected Event: {:?}", other), + }; + + loop { + match server.select_next_some().await { + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint: ConnectedPoint::Dialer { address }, + num_established, + concurrent_dial_errors, + } => { + assert_eq!(peer_id, client_id); + assert_eq!(num_established, NonZeroU32::new(2).unwrap()); + assert!(concurrent_dial_errors.unwrap().is_empty()); + assert_eq!(address, expect_addr); + break; + } + SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id), + SwarmEvent::NewListenAddr { .. } => {} + _ => panic!("Unexpected Swarm Event"), + } + } + + match next_event(&mut server).await { + Event::InboundProbe(InboundProbeEvent::Response { + probe_id, + peer, + address, + }) => { + assert_eq!(probe_id, request_probe_id); + assert_eq!(peer, client_id); + assert_eq!(address, expect_addr); + } + other => panic!("Unexpected Event: {:?}", other), + } + + drop(_handle); + }; + + run_test_with_timeout(test).await; +} + +#[async_std::test] +async fn test_dial_error() { + let test = async { + let (mut server, server_id, server_addr) = init_server(None).await; + let (_handle, rx) = oneshot::channel(); + let (client_id, _) = spawn_client(false, true, server_id, server_addr, rx).await; + let request_probe_id = match next_event(&mut server).await { + Event::InboundProbe(InboundProbeEvent::Request { peer, probe_id, .. }) => { + assert_eq!(peer, client_id); + probe_id + } + other => panic!("Unexpected Event: {:?}", other), + }; + + loop { + match server.select_next_some().await { + SwarmEvent::OutgoingConnectionError { peer_id, error } => { + assert_eq!(peer_id.unwrap(), client_id); + assert!(matches!(error, DialError::Transport(_))); + break; + } + SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id), + SwarmEvent::NewListenAddr { .. } => {} + _ => panic!("Unexpected Swarm Event"), + } + } + + match next_event(&mut server).await { + Event::InboundProbe(InboundProbeEvent::Error { + probe_id, + peer, + error, + }) => { + assert_eq!(probe_id, request_probe_id); + assert_eq!(peer, client_id); + assert_eq!(error, InboundProbeError::Response(ResponseError::DialError)); + } + other => panic!("Unexpected Event: {:?}", other), + } + + drop(_handle); + }; + + run_test_with_timeout(test).await; +} + +#[async_std::test] +async fn test_throttle_global_max() { + let test = async { + let (mut server, server_id, server_addr) = init_server(Some(Config { + throttle_clients_global_max: 1, + throttle_clients_period: Duration::from_secs(60), + ..Default::default() + })) + .await; + let mut _handles = Vec::new(); + for _ in 0..2 { + let (_handle, rx) = oneshot::channel(); + spawn_client(true, false, server_id, server_addr.clone(), rx).await; + _handles.push(_handle); + } + + let (first_probe_id, first_peer_id) = match next_event(&mut server).await { + Event::InboundProbe(InboundProbeEvent::Request { peer, probe_id, .. }) => { + (probe_id, peer) + } + other => panic!("Unexpected Event: {:?}", other), + }; + + loop { + match next_event(&mut server).await { + Event::InboundProbe(InboundProbeEvent::Error { + peer, + probe_id, + error: InboundProbeError::Response(ResponseError::DialRefused), + }) => { + assert_ne!(first_peer_id, peer); + assert_ne!(first_probe_id, probe_id); + break; + } + Event::InboundProbe(InboundProbeEvent::Response { peer, probe_id, .. }) => { + assert_eq!(first_peer_id, peer); + assert_eq!(first_probe_id, probe_id); + } + other => panic!("Unexpected Event: {:?}", other), + }; + } + + drop(_handles); + }; + + run_test_with_timeout(test).await; +} + +#[async_std::test] +async fn test_throttle_peer_max() { + let test = async { + let (mut server, server_id, server_addr) = init_server(Some(Config { + throttle_clients_peer_max: 1, + throttle_clients_period: Duration::from_secs(60), + ..Default::default() + })) + .await; + + let (_handle, rx) = oneshot::channel(); + let (client_id, _) = spawn_client(true, false, server_id, server_addr.clone(), rx).await; + + let first_probe_id = match next_event(&mut server).await { + Event::InboundProbe(InboundProbeEvent::Request { peer, probe_id, .. }) => { + assert_eq!(client_id, peer); + probe_id + } + other => panic!("Unexpected Event: {:?}", other), + }; + + match next_event(&mut server).await { + Event::InboundProbe(InboundProbeEvent::Response { peer, probe_id, .. }) => { + assert_eq!(peer, client_id); + assert_eq!(probe_id, first_probe_id); + } + other => panic!("Unexpected Event: {:?}", other), + } + + match next_event(&mut server).await { + Event::InboundProbe(InboundProbeEvent::Error { + peer, + probe_id, + error, + }) => { + assert_eq!(client_id, peer); + assert_ne!(first_probe_id, probe_id); + assert_eq!( + error, + InboundProbeError::Response(ResponseError::DialRefused) + ) + } + other => panic!("Unexpected Event: {:?}", other), + }; + + drop(_handle); + }; + + run_test_with_timeout(test).await; +} + +#[async_std::test] +async fn test_dial_multiple_addr() { + let test = async { + let (mut server, server_id, server_addr) = init_server(Some(Config { + throttle_clients_peer_max: 1, + throttle_clients_period: Duration::from_secs(60), + ..Default::default() + })) + .await; + + let (_handle, rx) = oneshot::channel(); + let (client_id, _) = spawn_client(true, true, server_id, server_addr.clone(), rx).await; + + let dial_addresses = match next_event(&mut server).await { + Event::InboundProbe(InboundProbeEvent::Request { + peer, addresses, .. + }) => { + assert_eq!(addresses.len(), 2); + assert_eq!(client_id, peer); + addresses + } + other => panic!("Unexpected Event: {:?}", other), + }; + + loop { + match server.select_next_some().await { + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint: ConnectedPoint::Dialer { address }, + concurrent_dial_errors, + .. + } => { + assert_eq!(peer_id, client_id); + let dial_errors = concurrent_dial_errors.unwrap(); + assert_eq!(dial_errors.len(), 1); + assert_eq!(dial_errors[0].0, dial_addresses[0]); + assert_eq!(address, dial_addresses[1]); + break; + } + SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id), + SwarmEvent::NewListenAddr { .. } => {} + _ => panic!("Unexpected Swarm Event"), + } + } + }; + + run_test_with_timeout(test).await; +}