Skip to content

Commit

Permalink
swarm/src/lib: Extract NetworkBehaviourAction handling into new method
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed May 2, 2022
1 parent 87b7cb3 commit 00346de
Showing 1 changed file with 87 additions and 91 deletions.
178 changes: 87 additions & 91 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,89 @@ where
None
}

fn handle_behaviour_event(
&mut self,
event: NetworkBehaviourAction<TBehaviour::OutEvent, TBehaviour::ConnectionHandler>,
) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
match event {
NetworkBehaviourAction::GenerateEvent(event) => {
return Some(SwarmEvent::Behaviour(event))
}
NetworkBehaviourAction::Dial { opts, handler } => {
let peer_id = opts.get_peer_id();
if let Ok(()) = self.dial_with_handler(opts, handler) {
if let Some(peer_id) = peer_id {
return Some(SwarmEvent::Dialing(peer_id));
}
}
}
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
} => {
assert!(self.pending_event.is_none());
let handler = match handler {
NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
NotifyHandler::Any => {
let ids = self
.pool
.iter_established_connections_of_peer(&peer_id)
.collect();
PendingNotifyHandler::Any(ids)
}
};

self.pending_event = Some((peer_id, handler, event));
}
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
// Maps the given `observed_addr`, representing an address of the local
// node observed by a remote peer, onto the locally known listen addresses
// to yield one or more addresses of the local node that may be publicly
// reachable.
//
// I.e. self method incorporates the view of other peers into the listen
// addresses seen by the local node to account for possible IP and port
// mappings performed by intermediate network devices in an effort to
// obtain addresses for the local peer that are also reachable for peers
// other than the peer who reported the `observed_addr`.
//
// The translation is transport-specific. See [`Transport::address_translation`].
let translated_addresses = {
let transport = self.listeners.transport();
let mut addrs: Vec<_> = self
.listeners
.listen_addrs()
.filter_map(move |server| transport.address_translation(server, &address))
.collect();

// remove duplicates
addrs.sort_unstable();
addrs.dedup();
addrs
};
for addr in translated_addresses {
self.add_external_address(addr, score);
}
}
NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
} => match connection {
CloseConnection::One(connection_id) => {
if let Some(conn) = self.pool.get_established(connection_id) {
conn.start_close();
}
}
CloseConnection::All => {
self.pool.disconnect(peer_id);
}
},
}

None
}

/// Internal function used by everything event-related.
///
/// Polls the `Swarm` for the next event.
Expand Down Expand Up @@ -1010,99 +1093,12 @@ where
Poll::Pending if listeners_not_ready && connections_not_ready => {
return Poll::Pending
}
Poll::Pending => (),
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
return Poll::Ready(SwarmEvent::Behaviour(event))
}
Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) => {
let peer_id = opts.get_peer_id();
if let Ok(()) = this.dial_with_handler(opts, handler) {
if let Some(peer_id) = peer_id {
return Poll::Ready(SwarmEvent::Dialing(peer_id));
}
}
}
Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
}) => match handler {
NotifyHandler::One(connection) => {
if let Some(mut conn) = this.pool.get_established(connection) {
if let Some(event) = notify_one(&mut conn, event, cx) {
let handler = PendingNotifyHandler::One(connection);
this.pending_event = Some((peer_id, handler, event));
if listeners_not_ready && connections_not_ready {
return Poll::Pending;
} else {
continue;
}
}
}
}
NotifyHandler::Any => {
let ids = this
.pool
.iter_established_connections_of_peer(&peer_id)
.collect();
if let Some((event, ids)) =
notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx)
{
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
if listeners_not_ready && connections_not_ready {
return Poll::Pending;
} else {
continue;
}
}
}
},
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
// Maps the given `observed_addr`, representing an address of the local
// node observed by a remote peer, onto the locally known listen addresses
// to yield one or more addresses of the local node that may be publicly
// reachable.
//
// I.e. this method incorporates the view of other peers into the listen
// addresses seen by the local node to account for possible IP and port
// mappings performed by intermediate network devices in an effort to
// obtain addresses for the local peer that are also reachable for peers
// other than the peer who reported the `observed_addr`.
//
// The translation is transport-specific. See [`Transport::address_translation`].
let translated_addresses = {
let transport = this.listeners.transport();
let mut addrs: Vec<_> = this
.listeners
.listen_addrs()
.filter_map(move |server| {
transport.address_translation(server, &address)
})
.collect();

// remove duplicates
addrs.sort_unstable();
addrs.dedup();
addrs
};
for addr in translated_addresses {
this.add_external_address(addr, score);
Poll::Pending => {}
Poll::Ready(behaviour_event) => {
if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event) {
return Poll::Ready(swarm_event);
}
}
Poll::Ready(NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
}) => match connection {
CloseConnection::One(connection_id) => {
if let Some(conn) = this.pool.get_established(connection_id) {
conn.start_close();
}
}
CloseConnection::All => {
this.pool.disconnect(peer_id);
}
},
}
}
}
Expand Down

0 comments on commit 00346de

Please sign in to comment.