Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

fix(comm): ignore connection loss #2556

Merged
merged 3 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ qp2p = "~0.11.9"
rand = "~0.7.3"
rand_chacha = "~0.2.2"
resource_proof = "0.8.0"
sn_messaging = "29.0.0"
sn_messaging = "30.0.0"
sn_data_types = "~0.18.3"
thiserror = "1.0.23"
tokio = "1.3.0"
Expand Down
84 changes: 0 additions & 84 deletions src/routing/connectivity_complaints.rs

This file was deleted.

92 changes: 20 additions & 72 deletions src/routing/core/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ use crate::{
Error,
};
use bls_dkg::key_gen::message::Message as DkgMessage;
use sn_messaging::{
node::{DkgFailureProof, DkgFailureProofSet, DkgKey, ElderCandidates, Proposal, Variant},
DstLocation,
};
use sn_messaging::node::{DkgFailureProof, DkgFailureProofSet, DkgKey, ElderCandidates, Proposal};
use std::{collections::BTreeSet, iter, net::SocketAddr, slice};
use xor_name::XorName;

Expand Down Expand Up @@ -104,75 +101,21 @@ impl Core {
}
}

pub(crate) async fn handle_connectivity_complaint(
&self,
sender: XorName,
elder_name: XorName,
) -> Result<Vec<Command>> {
self.connectivity_complaints
.add_complaint(sender, elder_name)
.await;

let weighing_adults: BTreeSet<XorName> = self
.section
.members()
.joined()
.map(|info| *info.peer.name())
.collect();
if self
.connectivity_complaints
.is_complained(elder_name, &weighing_adults)
.await
{
self.propose_offline(elder_name)
} else {
Ok(vec![])
}
}

fn complain_connectivity(&self, name: XorName) -> Result<Vec<Command>> {
if !self.is_elder() {
// When self is not an elder, then the peer has to be an elder, and we shall complaint
// the lost to other elders.
let variant = Variant::ConnectivityComplaint(name);
let recipients: Vec<_> = self
.section
.proven_authority_provider()
.value
.peers()
.filter(|peer| *peer.name() != name)
.collect();
trace!(
"Casting connectivity complaint against {:?} {:?}",
name,
recipients
);

return self.send_message_for_dst_accumulation(
self.node.name(),
DstLocation::DirectAndUnrouted,
variant,
&recipients,
);
}

self.propose_offline(name)
}

pub fn handle_connection_lost(&self, addr: SocketAddr) -> Result<Vec<Command>> {
let name = if let Some(peer) = self.section.find_joined_member_by_addr(&addr) {
debug!("Lost connection to known peer {}", peer);
*peer.name()
if let Some(peer) = self.section.find_joined_member_by_addr(&addr) {
debug!(
"Possible connection loss detected with known peer {:?}",
peer
)
} else if let Some(end_user) = self.get_enduser_by_addr(&addr) {
debug!(
"Possible connection loss detected with known client {:?}",
end_user
)
} else {
if let Some(end_user) = self.get_enduser_by_addr(&addr) {
debug!("Lost connection to client {:?}", end_user);
} else {
debug!("Lost connection to unknown peer {}", addr);
}
return Ok(vec![]);
};

self.complain_connectivity(name)
debug!("Possible connection loss detected with addr: {:?}", addr);
}
Ok(vec![])
}

pub fn handle_peer_lost(&self, addr: &SocketAddr) -> Result<Vec<Command>> {
Expand All @@ -184,7 +127,12 @@ impl Core {
return Ok(vec![]);
};

self.complain_connectivity(name)
if !self.is_elder() {
// Adults cannot complain about connectivity.
return Ok(vec![]);
}

self.propose_offline(name)
}

pub fn propose_offline(&self, name: XorName) -> Result<Vec<Command>> {
Expand Down
2 changes: 1 addition & 1 deletion src/routing/core/messaging/handling/decisions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use xor_name::XorName;
impl Core {
pub(crate) fn decide_message_status(&self, msg: &RoutingMsg) -> Result<MessageStatus> {
match msg.variant() {
Variant::SectionKnowledge { .. } | Variant::ConnectivityComplaint(_) => {
Variant::SectionKnowledge { .. } => {
if !self.is_elder() {
return Ok(MessageStatus::Useless);
}
Expand Down
4 changes: 0 additions & 4 deletions src/routing/core/messaging/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,6 @@ impl Core {
commands.extend(result?);
Ok(commands)
}
Variant::ConnectivityComplaint(elder_name) => {
self.handle_connectivity_complaint(msg.src.name(), *elder_name)
.await
}
Variant::NodeApproval { .. }
| Variant::JoinRetry { .. }
| Variant::ResourceChallenge { .. } => {
Expand Down
7 changes: 1 addition & 6 deletions src/routing/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ mod delivery_group;
mod messaging;
mod public_api;

use super::{
command::Command, connectivity_complaints::ConnectivityComplaints,
enduser_registry::EndUserRegistry, split_barrier::SplitBarrier,
};
use super::{command::Command, enduser_registry::EndUserRegistry, split_barrier::SplitBarrier};
use crate::{
agreement::{DkgVoter, ProposalAggregator},
error::Result,
Expand Down Expand Up @@ -60,7 +57,6 @@ pub(crate) struct Core {
joins_allowed: bool,
resource_proof: ResourceProof,
end_users: EndUserRegistry,
connectivity_complaints: ConnectivityComplaints,
}

impl Core {
Expand Down Expand Up @@ -88,7 +84,6 @@ impl Core {
joins_allowed: true,
resource_proof: ResourceProof::new(RESOURCE_PROOF_DATA_SIZE, RESOURCE_PROOF_DIFFICULTY),
end_users: EndUserRegistry::new(),
connectivity_complaints: ConnectivityComplaints::new(),
}
}

Expand Down
1 change: 0 additions & 1 deletion src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub(crate) mod command;

mod bootstrap;
mod comm;
mod connectivity_complaints;
mod core;
mod dispatcher;
mod enduser_registry;
Expand Down
13 changes: 13 additions & 0 deletions tests/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ mod utils;

use self::utils::*;
use anyhow::Result;
use bytes::Bytes;
use sn_messaging::{Aggregation, DstLocation, Itinerary, SrcLocation};
use sn_routing::{Event, NodeElderChange};

#[tokio::test]
#[ignore]
async fn test_node_drop() -> Result<()> {
// NOTE: create at least 4 nodes, so when one is dropped the remaining ones still form a
// supermajority and the `Offline` proposals reach agreement.
Expand Down Expand Up @@ -48,6 +51,16 @@ async fn test_node_drop() -> Result<()> {

tracing::info!("Dropped {} at {}", dropped_name, dropped_addr);

for (node, _) in &mut nodes {
let itinerary = Itinerary {
src: SrcLocation::Node(node.name().await),
dst: DstLocation::Node(dropped_name),
aggregation: Aggregation::None,
};
node.send_message(itinerary, Bytes::from(b"hello".to_vec()), None)
.await?
}

for (_, events) in &mut nodes {
assert_event!(events, Event::MemberLeft { name, .. } if name == dropped_name)
}
Expand Down