Skip to content

Commit

Permalink
putting kad records is completely handled as part of metrics state
Browse files Browse the repository at this point in the history
  • Loading branch information
momosh-ethernal committed Aug 27, 2024
1 parent cc61a09 commit c236ade
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 115 deletions.
18 changes: 15 additions & 3 deletions core/src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use configuration::LibP2PConfig;
use libp2p::{
autonat, dcutr, identify,
identity::{self, ed25519, Keypair},
kad::{self, Mode, PeerRecord},
kad::{self, Mode, PeerRecord, QueryStats, Record, RecordKey},
mdns, noise, ping, relay,
swarm::NetworkBehaviour,
tcp, upnp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder,
Expand Down Expand Up @@ -61,19 +61,31 @@ Bootstrap node list must not be empty.
Either use a '--network' flag or add a list of bootstrap nodes in the configuration file.
"#;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum OutputEvent {
IncomingGetRecord,
IncomingPutRecord,
KadModeChange(Mode),
PutRecord { success_rate: f64, duration: f64 },

Ping(Duration),
IncomingConnection,
IncomingConnectionError,
MultiaddressUpdate(Multiaddr),
EstablishedConnection,
OutgoingConnectionError,
Count,
PutRecord {
block_num: u32,
records: Vec<Record>,
},
PutRecordSuccess {
record_key: RecordKey,
query_stats: QueryStats,
},
PutRecordFailed {
record_key: RecordKey,
query_stats: QueryStats,
},
}

#[derive(Clone)]
Expand Down
25 changes: 8 additions & 17 deletions core/src/network/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tracing::{debug, info, trace};

use super::{
event_loop::{BlockStat, ConnectionEstablishedInfo},
is_global, is_multiaddr_global, Command, EventLoop, MultiAddressInfo, PeerInfo, QueryChannel,
event_loop::ConnectionEstablishedInfo, is_global, is_multiaddr_global, Command, EventLoop,
MultiAddressInfo, OutputEvent, PeerInfo, QueryChannel,
};
use crate::types::MultiaddrConfig;

Expand Down Expand Up @@ -203,27 +203,18 @@ impl Client {
) -> Result<()> {
self.command_sender
.send(Box::new(move |context: &mut EventLoop| {
context
.active_blocks
.entry(block_num)
// increase the total cell count we monitor if the block entry already exists
.and_modify(|block| block.increase_block_stat_counters(records.len()))
// initiate counting for the new block if the block doesn't exist
.or_insert(BlockStat {
total_count: records.len(),
remaining_counter: records.len(),
success_counter: 0,
error_counter: 0,
time_stat: 0,
});

for record in records {
for record in records.clone() {
let _ = context
.swarm
.behaviour_mut()
.kademlia
.put_record(record, quorum);
}

context
.event_sender
.send(OutputEvent::PutRecord { block_num, records })?;

Ok(())
}))
.map_err(|_| eyre!("Failed to send the Put Kad Record Command to the EventLoop"))
Expand Down
101 changes: 24 additions & 77 deletions core/src/network/p2p/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use libp2p::{
identify::{self, Info},
kad::{
self, store::RecordStore, BootstrapOk, GetRecordOk, InboundRequest, Mode, PutRecordOk,
QueryId, QueryResult, QueryStats, RecordKey,
QueryId, QueryResult, RecordKey,
},
mdns,
multiaddr::Protocol,
Expand Down Expand Up @@ -35,22 +35,6 @@ use crate::{
types::TimeToLive,
};

#[derive(Debug)]
pub struct BlockStat {
pub total_count: usize,
pub remaining_counter: usize,
pub success_counter: usize,
pub error_counter: usize,
pub time_stat: u64,
}

impl BlockStat {
pub fn increase_block_stat_counters(&mut self, cell_number: usize) {
self.total_count += cell_number;
self.remaining_counter += cell_number;
}
}

// RelayState keeps track of all things relay related
struct RelayState {
// id of the selected Relay that needs to be connected
Expand Down Expand Up @@ -141,15 +125,13 @@ impl EventCounter {
pub struct EventLoop {
pub swarm: Swarm<Behaviour>,
command_receiver: UnboundedReceiver<Command>,
pub event_sender: broadcast::Sender<OutputEvent>,
pub(crate) event_sender: broadcast::Sender<OutputEvent>,
// Tracking Kademlia events
pub pending_kad_queries: HashMap<QueryId, QueryChannel>,
// Tracking swarm events (i.e. peer dialing)
pub pending_swarm_events: HashMap<PeerId, oneshot::Sender<Result<ConnectionEstablishedInfo>>>,
relay: RelayState,
bootstrap: BootstrapState,
/// Blocks we monitor for PUT success rate
pub active_blocks: HashMap<u32, BlockStat>,
shutdown: Controller<String>,
event_loop_config: EventLoopConfig,
pub kad_mode: Mode,
Expand Down Expand Up @@ -208,7 +190,6 @@ impl EventLoop {
is_startup_done: false,
timer: interval_at(Instant::now() + bootstrap_interval, bootstrap_interval),
},
active_blocks: Default::default(),
shutdown,
event_loop_config: EventLoopConfig {
is_fat_client,
Expand Down Expand Up @@ -346,22 +327,35 @@ impl EventLoop {
};

match error {
kad::PutRecordError::QuorumFailed { key, .. } => {
self.handle_put_result(key.clone(), stats.clone(), true)
.await;
},
kad::PutRecordError::Timeout { key, .. } => {
self.handle_put_result(key.clone(), stats.clone(), true)
.await;
kad::PutRecordError::QuorumFailed { key, .. }
| kad::PutRecordError::Timeout { key, .. } => {
// Remove local records for fat clients (memory optimization)
if self.event_loop_config.is_fat_client {
debug!("Pruning local records on fat client");
self.swarm.behaviour_mut().kademlia.remove_record(&key);
}

_ = self.event_sender.send(OutputEvent::PutRecordFailed {
record_key: key,
query_stats: stats,
});
},
}
},

QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
_ = self.pending_kad_queries.remove(&id);

self.handle_put_result(key.clone(), stats.clone(), false)
.await;
// Remove local records for fat clients (memory optimization)
if self.event_loop_config.is_fat_client {
debug!("Pruning local records on fat client");
self.swarm.behaviour_mut().kademlia.remove_record(&key);
}

_ = self.event_sender.send(OutputEvent::PutRecordSuccess {
record_key: key,
query_stats: stats,
});
},
QueryResult::Bootstrap(result) => match result {
Ok(BootstrapOk {
Expand Down Expand Up @@ -673,53 +667,6 @@ impl EventLoop {
},
}
}

async fn handle_put_result(&mut self, key: RecordKey, stats: QueryStats, is_error: bool) {
let block_num = match key.clone().try_into() {
Ok(DHTKey::Cell(block_num, _, _)) => block_num,
Ok(DHTKey::Row(block_num, _)) => block_num,
Err(error) => {
warn!("Unable to cast Kademlia key to DHT key: {error}");
return;
},
};
if let Some(block) = self.active_blocks.get_mut(&block_num) {
// Decrement record counter for this block
block.remaining_counter -= 1;
if is_error {
block.error_counter += 1;
} else {
block.success_counter += 1;
}

block.time_stat = stats
.duration()
.as_ref()
.map(Duration::as_secs)
.unwrap_or_default();

if block.remaining_counter == 0 {
let success_rate = block.success_counter as f64 / block.total_count as f64;
info!(
"Cell upload success rate for block {block_num}: {}/{}. Duration: {}",
block.success_counter, block.total_count, block.time_stat
);

_ = self.event_sender.send(OutputEvent::PutRecord {
success_rate,
duration: block.time_stat as f64,
});
}

if self.event_loop_config.is_fat_client {
// Remove local records for fat clients (memory optimization)
debug!("Pruning local records on fat client");
self.swarm.behaviour_mut().kademlia.remove_record(&key);
}
} else {
debug!("Can't find block in the active blocks list")
}
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit c236ade

Please sign in to comment.