Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add project name parameterisation #684

Merged
merged 4 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async fn run(
let ot_metrics = Arc::new(
telemetry::otlp::initialize(
metric_attributes,
cfg.project_name.clone(),
&cfg.origin,
&cfg.libp2p.kademlia.operation_mode.into(),
cfg.otel.clone(),
Expand All @@ -89,6 +90,7 @@ async fn run(

let (p2p_client, p2p_event_loop, event_receiver) = p2p::init(
cfg.libp2p.clone(),
cfg.project_name.clone(),
id_keys,
version,
&cfg.genesis_hash,
Expand Down
13 changes: 8 additions & 5 deletions core/src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use libp2p_allow_block_list as allow_block_list;
const MINIMUM_SUPPORTED_BOOTSTRAP_VERSION: &str = "0.1.1";
const MINIMUM_SUPPORTED_LIGHT_CLIENT_VERSION: &str = "1.9.2";
const IDENTITY_PROTOCOL: &str = "/avail/light/1.0.0";
const IDENTITY_AGENT_BASE: &str = "avail-light-client";
const IDENTITY_AGENT_BASE: &str = "light-client";
const IDENTITY_AGENT_ROLE: &str = "light-client";
const IDENTITY_AGENT_CLIENT_TYPE: &str = "rust-client";

Expand Down Expand Up @@ -115,9 +115,9 @@ impl FromStr for AgentVersion {
}

impl AgentVersion {
fn new(version: &str) -> Self {
fn new(project_name: String, version: &str) -> Self {
Self {
base_version: IDENTITY_AGENT_BASE.to_string(),
base_version: format!("{project_name}-{IDENTITY_AGENT_BASE}"),
role: IDENTITY_AGENT_ROLE.to_string(),
release_version: version.to_string(),
client_type: IDENTITY_AGENT_CLIENT_TYPE.to_string(),
Expand Down Expand Up @@ -197,8 +197,10 @@ fn protocol_name(genesis_hash: &str) -> libp2p::StreamProtocol {
.expect("Invalid Kademlia protocol name")
}

#[allow(clippy::too_many_arguments)]
pub async fn init(
cfg: LibP2PConfig,
project_name: String,
id_keys: Keypair,
version: &str,
genesis_hash: &str,
Expand All @@ -222,7 +224,7 @@ pub async fn init(
db.inner(),
);
// create Swarm
let swarm = build_swarm(&cfg, version, genesis_hash, &id_keys, store)
let swarm = build_swarm(&cfg, project_name, version, genesis_hash, &id_keys, store)
.await
.expect("Unable to build swarm.");
let (event_sender, event_receiver) = broadcast::channel(1000);
Expand All @@ -234,14 +236,15 @@ pub async fn init(

async fn build_swarm(
cfg: &LibP2PConfig,
project_name: String,
version: &str,
genesis_hash: &str,
id_keys: &Keypair,
kad_store: Store,
) -> Result<Swarm<Behaviour>> {
// create Identify Protocol Config
let identify_cfg = identify::Config::new(IDENTITY_PROTOCOL.to_string(), id_keys.public())
.with_agent_version(AgentVersion::new(version).to_string());
.with_agent_version(AgentVersion::new(project_name, version).to_string());

// create AutoNAT Client Config
let autonat_cfg = autonat::Config {
Expand Down
58 changes: 29 additions & 29 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ impl MetricName for MetricCounter {
fn name(&self) -> &'static str {
use MetricCounter::*;
match self {
Starts => "avail.light.starts",
Up => "avail.light.up",
SessionBlocks => "avail.light.session_blocks",
OutgoingConnectionErrors => "avail.light.outgoing_connection_errors",
IncomingConnectionErrors => "avail.light.incoming_connection_errors",
IncomingConnections => "avail.light.incoming_connections",
EstablishedConnections => "avail.light.established_connections",
IncomingPutRecord => "avail.light.incoming_put_record",
IncomingGetRecord => "avail.light.incoming_get_record",
EventLoopEvent => "avail.light.event_loop_event",
Starts => "light.starts",
Up => "light.up",
SessionBlocks => "light.session_blocks",
OutgoingConnectionErrors => "light.outgoing_connection_errors",
IncomingConnectionErrors => "light.incoming_connection_errors",
IncomingConnections => "light.incoming_connections",
EstablishedConnections => "light.established_connections",
IncomingPutRecord => "light.incoming_put_record",
IncomingGetRecord => "light.incoming_get_record",
EventLoopEvent => "light.event_loop_event",
}
}
}
Expand Down Expand Up @@ -91,25 +91,25 @@ impl MetricName for MetricValue {
use MetricValue::*;

match self {
BlockHeight(_) => "avail.light.block.height",
BlockConfidence(_) => "avail.light.block.confidence",
BlockConfidenceThreshold(_) => "avail.light.block.confidence_threshold",
BlockProcessingDelay(_) => "avail.light.block.processing_delay",

DHTReplicationFactor(_) => "avail.light.dht.replication_factor",
DHTFetched(_) => "avail.light.dht.fetched",
DHTFetchedPercentage(_) => "avail.light.dht.fetched_percentage",
DHTFetchDuration(_) => "avail.light.dht.fetch_duration",
DHTPutDuration(_) => "avail.light.dht.put_duration",
DHTPutSuccess(_) => "avail.light.dht.put_success",

DHTConnectedPeers(_) => "avail.light.dht.connected_peers",
DHTQueryTimeout(_) => "avail.light.dht.query_timeout",
DHTPingLatency(_) => "avail.light.dht.ping_latency",

RPCFetched(_) => "avail.light.rpc.fetched",
RPCFetchDuration(_) => "avail.light.rpc.fetch_duration",
RPCCallDuration(_) => "avail.light.rpc.call_duration",
BlockHeight(_) => "light.block.height",
BlockConfidence(_) => "light.block.confidence",
BlockConfidenceThreshold(_) => "light.block.confidence_threshold",
BlockProcessingDelay(_) => "light.block.processing_delay",

DHTReplicationFactor(_) => "light.dht.replication_factor",
DHTFetched(_) => "light.dht.fetched",
DHTFetchedPercentage(_) => "light.dht.fetched_percentage",
DHTFetchDuration(_) => "light.dht.fetch_duration",
DHTPutDuration(_) => "light.dht.put_duration",
DHTPutSuccess(_) => "light.dht.put_success",

DHTConnectedPeers(_) => "light.dht.connected_peers",
DHTQueryTimeout(_) => "light.dht.query_timeout",
DHTPingLatency(_) => "light.dht.ping_latency",

RPCFetched(_) => "light.rpc.fetched",
RPCFetchDuration(_) => "light.rpc.fetch_duration",
RPCCallDuration(_) => "light.rpc.call_duration",
}
}
}
Expand Down
43 changes: 28 additions & 15 deletions core/src/telemetry/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tokio_stream::wrappers::BroadcastStream;
#[derive(Debug)]
pub struct Metrics {
meter: Meter,
project_name: String,
origin: Origin,
mode: RwLock<Mode>,
multiaddress: RwLock<Multiaddr>,
Expand All @@ -41,7 +42,8 @@ impl Metrics {
}

async fn record_u64(&self, name: &'static str, value: u64) -> Result<()> {
let instrument = self.meter.u64_observable_gauge(name).try_init()?;
let gauge_name = format!("{}.{}", name, self.project_name);
let instrument = self.meter.u64_observable_gauge(gauge_name).try_init()?;
let attributes = self.attributes().await;
self.meter
.register_callback(&[instrument.as_any()], move |observer| {
Expand All @@ -51,7 +53,8 @@ impl Metrics {
}

async fn record_f64(&self, name: &'static str, value: f64) -> Result<()> {
let instrument = self.meter.f64_observable_gauge(name).try_init()?;
let gauge_name = format!("{}.{}", name, self.project_name);
let instrument = self.meter.f64_observable_gauge(gauge_name).try_init()?;
let attributes = self.attributes().await;
self.meter
.register_callback(&[instrument.as_any()], move |observer| {
Expand Down Expand Up @@ -265,7 +268,11 @@ impl super::Metrics for Metrics {
}
}

fn init_counters(meter: Meter, origin: &Origin) -> HashMap<&'static str, Counter<u64>> {
fn init_counters(
meter: Meter,
origin: &Origin,
project_name: String,
) -> HashMap<&'static str, Counter<u64>> {
[
MetricCounter::Starts,
MetricCounter::Up,
Expand All @@ -280,7 +287,11 @@ fn init_counters(meter: Meter, origin: &Origin) -> HashMap<&'static str, Counter
]
.iter()
.filter(|counter| MetricCounter::is_allowed(counter, origin))
.map(|counter| (counter.name(), meter.u64_counter(counter.name()).init()))
.map(|counter| {
let otel_counter_name = format!("{}.{}", project_name, counter.name());
// Keep the `static str as the local bufer map key, but change the OTel counter name`
(counter.name(), meter.u64_counter(otel_counter_name).init())
})
.collect()
}

Expand All @@ -307,6 +318,7 @@ impl Default for OtelConfig {

pub fn initialize(
attributes: Vec<(&str, String)>,
project_name: String,
origin: &Origin,
mode: &Mode,
ot_config: OtelConfig,
Expand Down Expand Up @@ -336,9 +348,10 @@ pub fn initialize(
.collect();

// Initialize counters - they need to persist unlike Gauges that are recreated on every record
let counters = init_counters(meter.clone(), origin);
let counters = init_counters(meter.clone(), origin, project_name.clone());
Ok(Metrics {
meter,
project_name,
origin: origin.clone(),
mode: RwLock::new(*mode),
multiaddress: RwLock::new(Multiaddr::empty()),
Expand Down Expand Up @@ -411,7 +424,7 @@ mod tests {
let (m_u64, m_f64) = flatten_metrics(buffer);
assert!(m_u64.is_empty());
assert_eq!(m_f64.len(), 1);
assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&90.0));
assert_eq!(m_f64.get("light.block.confidence"), Some(&90.0));

let buffer = vec![
MetricValue::BlockConfidence(90.0),
Expand All @@ -420,9 +433,9 @@ mod tests {
];
let (m_u64, m_f64) = flatten_metrics(buffer);
assert_eq!(m_u64.len(), 1);
assert_eq!(m_u64.get("avail.light.block.height"), Some(&1));
assert_eq!(m_u64.get("light.block.height"), Some(&1));
assert_eq!(m_f64.len(), 1);
assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&91.5));
assert_eq!(m_f64.get("light.block.confidence"), Some(&91.5));

let buffer = vec![
MetricValue::BlockConfidence(90.0),
Expand All @@ -435,9 +448,9 @@ mod tests {
];
let (m_u64, m_f64) = flatten_metrics(buffer);
assert_eq!(m_u64.len(), 1);
assert_eq!(m_u64.get("avail.light.block.height"), Some(&10));
assert_eq!(m_u64.get("light.block.height"), Some(&10));
assert_eq!(m_f64.len(), 1);
assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&93.75));
assert_eq!(m_f64.get("light.block.confidence"), Some(&93.75));

let buffer = vec![
MetricValue::DHTConnectedPeers(90),
Expand All @@ -452,11 +465,11 @@ mod tests {
];
let (m_u64, m_f64) = flatten_metrics(buffer);
assert_eq!(m_u64.len(), 1);
assert_eq!(m_u64.get("avail.light.block.height"), Some(&999));
assert_eq!(m_u64.get("light.block.height"), Some(&999));
assert_eq!(m_f64.len(), 4);
assert_eq!(m_f64.get("avail.light.dht.put_success"), Some(&10.0));
assert_eq!(m_f64.get("avail.light.dht.fetch_duration"), Some(&1.7));
assert_eq!(m_f64.get("avail.light.block.confidence"), Some(&98.5));
assert_eq!(m_f64.get("avail.light.dht.connected_peers"), Some(&85.0));
assert_eq!(m_f64.get("light.dht.put_success"), Some(&10.0));
assert_eq!(m_f64.get("light.dht.fetch_duration"), Some(&1.7));
assert_eq!(m_f64.get("light.block.confidence"), Some(&98.5));
assert_eq!(m_f64.get("light.dht.connected_peers"), Some(&85.0));
}
}
3 changes: 3 additions & 0 deletions core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ pub enum SecretKey {
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct RuntimeConfig {
/// Name of the project running the client. (default: "avail")
pub project_name: String,
#[serde(flatten)]
pub api: APIConfig,
#[serde(flatten)]
Expand Down Expand Up @@ -486,6 +488,7 @@ impl From<&RuntimeConfig> for MaintenanceConfig {
impl Default for RuntimeConfig {
fn default() -> Self {
RuntimeConfig {
project_name: "avail".to_string(),
api: Default::default(),
libp2p: Default::default(),
rpc: Default::default(),
Expand Down
2 changes: 2 additions & 0 deletions crawler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ async fn run(config: Config, db: DB, shutdown: Controller<String>) -> Result<()>
let ot_metrics = Arc::new(
otlp::initialize(
metric_attributes,
"avail".to_string(),
&config.origin,
&KademliaMode::Client.into(),
config.otel.clone(),
Expand All @@ -90,6 +91,7 @@ async fn run(config: Config, db: DB, shutdown: Controller<String>) -> Result<()>

let (p2p_client, p2p_event_loop, event_receiver) = p2p::init(
config.libp2p.clone(),
"avail".to_string(),
p2p_keypair,
version,
&config.genesis_hash,
Expand Down
2 changes: 2 additions & 0 deletions fat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ async fn run(config: Config, db: DB, shutdown: Controller<String>) -> Result<()>
let ot_metrics = Arc::new(
telemetry::otlp::initialize(
metric_attributes,
"avail".to_string(),
&Origin::FatClient,
&KademliaMode::Client.into(),
config.otel.clone(),
Expand All @@ -90,6 +91,7 @@ async fn run(config: Config, db: DB, shutdown: Controller<String>) -> Result<()>

let (p2p_client, p2p_event_loop, event_receiver) = p2p::init(
config.libp2p.clone(),
"avail".to_string(),
p2p_keypair,
version,
&config.genesis_hash,
Expand Down
Loading