From f5eb97974f5d827429d0dde015769be7430a6f28 Mon Sep 17 00:00:00 2001 From: Gustavo Murayama Date: Thu, 10 Oct 2024 23:38:24 -0300 Subject: [PATCH 1/7] wip: process collector Signed-off-by: Gustavo Murayama --- Cargo.toml | 17 +++++++-- process-collector/Cargo.toml | 10 +++++ process-collector/src/lib.rs | 72 ++++++++++++++++++++++++++++++++++++ 3 files changed, 96 insertions(+), 3 deletions(-) create mode 100644 process-collector/Cargo.toml create mode 100644 process-collector/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index a0f5e2a2..ed9b201f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,13 @@ authors = ["Max Inden "] edition = "2021" description = "Open Metrics client library allowing users to natively instrument applications." license = "Apache-2.0 OR MIT" -keywords = ["openmetrics", "prometheus", "metrics", "instrumentation", "monitoring"] +keywords = [ + "openmetrics", + "prometheus", + "metrics", + "instrumentation", + "monitoring", +] repository = "https://github.com/prometheus/client_rust" homepage = "https://github.com/prometheus/client_rust" documentation = "https://docs.rs/prometheus-client" @@ -15,7 +21,7 @@ default = [] protobuf = ["dep:prost", "dep:prost-types", "dep:prost-build"] [workspace] -members = ["derive-encode"] +members = ["derive-encode", "process-collector"] [dependencies] dtoa = "1.0" @@ -36,7 +42,12 @@ quickcheck = "1" rand = "0.8.4" tide = "0.16" actix-web = "4" -tokio = { version = "1", features = ["rt-multi-thread", "net", "macros", "signal"] } +tokio = { version = "1", features = [ + "rt-multi-thread", + "net", + "macros", + "signal", +] } hyper = { version = "1.3.1", features = ["server", "http1"] } hyper-util = { version = "0.1.3", features = ["tokio"] } http-body-util = "0.1.1" diff --git a/process-collector/Cargo.toml b/process-collector/Cargo.toml new file mode 100644 index 00000000..c90952bb --- /dev/null +++ b/process-collector/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "process-collector" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +procfs = "0.17.0" +prometheus-client = { path = "../" } diff --git a/process-collector/src/lib.rs b/process-collector/src/lib.rs new file mode 100644 index 00000000..1598609d --- /dev/null +++ b/process-collector/src/lib.rs @@ -0,0 +1,72 @@ +use procfs::process::Process; +use prometheus_client::{ + collector::Collector, + encoding::{DescriptorEncoder, EncodeMetric}, + metrics::counter::ConstCounter, + registry::Unit, +}; + +#[derive(Debug)] +pub struct ProcessCollector { + namespace: String, +} + +impl Collector for ProcessCollector { + fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { + let tps = procfs::ticks_per_second(); + // process_cpu_seconds_total Total user and system CPU time spent in seconds. + // process_max_fds Maximum number of open file descriptors. + // process_open_fds Number of open file descriptors. + // process_virtual_memory_bytes Virtual memory size in bytes. + // process_resident_memory_bytes Resident memory size in bytes. + // process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes. + // process_start_time_seconds Start time of the process since unix epoch in seconds. + // process_network_receive_bytes_total Number of bytes received by the process over the network. + // process_network_transmit_bytes_total Number of bytes sent by the process over the network. + + if let Ok(proc) = Process::myself() { + if let Ok(stat) = proc.stat() { + let cpu_time = (stat.stime + stat.utime) / tps as u64; + let counter = ConstCounter::new(cpu_time); + let metric_encoder = encoder.encode_descriptor( + "process_cpu_seconds_total", + "Total user and system CPU time spent in seconds.", + Some(&Unit::Seconds), + counter.metric_type(), + )?; + counter.encode(metric_encoder)?; + } + + if let Ok(limits) = proc.limits() { + let max_fds = match limits.max_open_files.soft_limit { + procfs::process::LimitValue::Value(v) => v, + procfs::process::LimitValue::Unlimited => 0, + }; + let counter = ConstCounter::new(max_fds); + let metric_encoder = encoder.encode_descriptor( + "process_max_fds", + "Maximum number of open file descriptors.", + None, + counter.metric_type(), + )?; + counter.encode(metric_encoder)?; + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use prometheus_client::registry::Registry; + + #[test] + fn register_process_collector() { + let mut registry = Registry::default(); + registry.register_collector(Box::new(ProcessCollector { + namespace: String::new(), + })) + } +} From 4c578d86c87d80c8a9c2a5604a5dbaea20439728 Mon Sep 17 00:00:00 2001 From: Gustavo Murayama Date: Tue, 22 Oct 2024 00:02:39 -0300 Subject: [PATCH 2/7] feat: metrics for linux Signed-off-by: Gustavo Murayama --- Cargo.toml | 3 +- process-collector/.gitignore | 2 + process-collector/src/lib.rs | 129 ++++++++++++++++++++++++--------- process-collector/src/linux.rs | 0 4 files changed, 97 insertions(+), 37 deletions(-) create mode 100644 process-collector/.gitignore create mode 100644 process-collector/src/linux.rs diff --git a/Cargo.toml b/Cargo.toml index ed9b201f..cc20d1ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,8 @@ default = [] protobuf = ["dep:prost", "dep:prost-types", "dep:prost-build"] [workspace] -members = ["derive-encode", "process-collector"] +members = ["derive-encode"] +exclude = ["process-collector"] [dependencies] dtoa = "1.0" diff --git a/process-collector/.gitignore b/process-collector/.gitignore new file mode 100644 index 00000000..96ef6c0b --- /dev/null +++ b/process-collector/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/process-collector/src/lib.rs b/process-collector/src/lib.rs index 1598609d..d60591d3 100644 --- a/process-collector/src/lib.rs +++ b/process-collector/src/lib.rs @@ -1,8 +1,10 @@ -use procfs::process::Process; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; + +use procfs::process::{LimitValue, Process}; use prometheus_client::{ collector::Collector, encoding::{DescriptorEncoder, EncodeMetric}, - metrics::counter::ConstCounter, + metrics::{counter::ConstCounter, gauge::ConstGauge}, registry::Unit, }; @@ -14,45 +16,100 @@ pub struct ProcessCollector { impl Collector for ProcessCollector { fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { let tps = procfs::ticks_per_second(); - // process_cpu_seconds_total Total user and system CPU time spent in seconds. - // process_max_fds Maximum number of open file descriptors. - // process_open_fds Number of open file descriptors. - // process_virtual_memory_bytes Virtual memory size in bytes. - // process_resident_memory_bytes Resident memory size in bytes. - // process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes. - // process_start_time_seconds Start time of the process since unix epoch in seconds. - // process_network_receive_bytes_total Number of bytes received by the process over the network. - // process_network_transmit_bytes_total Number of bytes sent by the process over the network. - if let Ok(proc) = Process::myself() { - if let Ok(stat) = proc.stat() { - let cpu_time = (stat.stime + stat.utime) / tps as u64; - let counter = ConstCounter::new(cpu_time); - let metric_encoder = encoder.encode_descriptor( - "process_cpu_seconds_total", - "Total user and system CPU time spent in seconds.", - Some(&Unit::Seconds), - counter.metric_type(), - )?; - counter.encode(metric_encoder)?; + // TODO: handle errors + let proc = match Process::myself() { + Ok(proc) => proc, + Err(_) => { + return Ok(()); } - - if let Ok(limits) = proc.limits() { - let max_fds = match limits.max_open_files.soft_limit { - procfs::process::LimitValue::Value(v) => v, - procfs::process::LimitValue::Unlimited => 0, - }; - let counter = ConstCounter::new(max_fds); - let metric_encoder = encoder.encode_descriptor( - "process_max_fds", - "Maximum number of open file descriptors.", - None, - counter.metric_type(), - )?; - counter.encode(metric_encoder)?; + }; + let stat = match proc.stat() { + Ok(stat) => stat, + Err(_) => { + return Ok(()); } + }; + + let cpu_time = (stat.stime + stat.utime) / tps; + let counter = ConstCounter::new(cpu_time); + let metric_encoder = encoder.encode_descriptor( + "process_cpu_seconds_total", + "Total user and system CPU time spent in seconds.", + Some(&Unit::Seconds), + counter.metric_type(), + )?; + counter.encode(metric_encoder)?; + + if let Ok(limits) = proc.limits() { + let max_open_files = limits.max_open_files; + let max_fds = match max_open_files.soft_limit { + LimitValue::Unlimited => match max_open_files.hard_limit { + LimitValue::Unlimited => 0, + LimitValue::Value(hard) => hard, + }, + LimitValue::Value(soft) => soft, + }; + let gauge = ConstGauge::new(max_fds as i64); + let metric_encoder = encoder.encode_descriptor( + "process_max_fds", + "Maximum number of open file descriptors.", + None, + gauge.metric_type(), + )?; + gauge.encode(metric_encoder)?; + + let max_address_space = limits.max_address_space; + let max_virtual_memory = match max_address_space.soft_limit { + LimitValue::Unlimited => match max_address_space.hard_limit { + LimitValue::Unlimited => 0, + LimitValue::Value(hard) => hard, + }, + LimitValue::Value(soft) => soft, + }; + let gauge = ConstGauge::new(max_fds as i64); + let metric_encoder = encoder.encode_descriptor( + "process_virtual_memory_max_bytes", + "Maximum amount of virtual memory available in bytes.", + None, + gauge.metric_type(), + )?; + gauge.encode(metric_encoder)?; } + let vm_bytes = ConstGauge::new(stat.vsize as i64); + let vme = encoder.encode_descriptor( + "process_virtual_memory_bytes", + "Virtual memory size in bytes", + Some(&Unit::Bytes), + vm_bytes.metric_type(), + )?; + vm_bytes.encode(vme)?; + + // TODO: add rss_bytes (fix self.page_size) + // + // let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64); + // let rsse = encoder.encode_descriptor( + // "process_resident_memory_bytes", + // "Resident memory size in bytes.", + // Some(&Unit::Bytes), + // rss_bytes.metric_type(), + // )?; + // rss_bytes.encode(rsse)?; + + let start_time_from_epoch = SystemTime::now() + .duration_since(UNIX_EPOCH) + // TODO: remove expect + .expect("process start time"); + let start_time = ConstGauge::new(start_time_from_epoch.as_secs_f64()); + let start_time_metric = encoder.encode_descriptor( + "process_start_time_seconds", + "Start time of the process since unix epoch in seconds.", + Some(&Unit::Seconds), + start_time.metric_type(), + )?; + start_time.encode(start_time_metric)?; + Ok(()) } } diff --git a/process-collector/src/linux.rs b/process-collector/src/linux.rs new file mode 100644 index 00000000..e69de29b From a71845e174bfc5aed889ba68e6e2e7308b47b7a1 Mon Sep 17 00:00:00 2001 From: Gustavo Murayama Date: Tue, 5 Nov 2024 00:49:50 -0300 Subject: [PATCH 3/7] feat: linux mod Signed-off-by: Gustavo Murayama --- process-collector/src/lib.rs | 115 +++++++-------------------------- process-collector/src/linux.rs | 101 +++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 91 deletions(-) diff --git a/process-collector/src/lib.rs b/process-collector/src/lib.rs index d60591d3..becaf244 100644 --- a/process-collector/src/lib.rs +++ b/process-collector/src/lib.rs @@ -1,106 +1,38 @@ -use std::time::{Instant, SystemTime, UNIX_EPOCH}; - -use procfs::process::{LimitValue, Process}; use prometheus_client::{ collector::Collector, encoding::{DescriptorEncoder, EncodeMetric}, - metrics::{counter::ConstCounter, gauge::ConstGauge}, + metrics::gauge::ConstGauge, registry::Unit, }; +use std::time::{SystemTime, UNIX_EPOCH}; + +mod linux; #[derive(Debug)] pub struct ProcessCollector { - namespace: String, + namespace: Option, + #[cfg(target_os = "linux")] + system: linux::System, } -impl Collector for ProcessCollector { - fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { - let tps = procfs::ticks_per_second(); - - // TODO: handle errors - let proc = match Process::myself() { - Ok(proc) => proc, - Err(_) => { - return Ok(()); - } - }; - let stat = match proc.stat() { - Ok(stat) => stat, - Err(_) => { - return Ok(()); - } - }; - - let cpu_time = (stat.stime + stat.utime) / tps; - let counter = ConstCounter::new(cpu_time); - let metric_encoder = encoder.encode_descriptor( - "process_cpu_seconds_total", - "Total user and system CPU time spent in seconds.", - Some(&Unit::Seconds), - counter.metric_type(), - )?; - counter.encode(metric_encoder)?; - - if let Ok(limits) = proc.limits() { - let max_open_files = limits.max_open_files; - let max_fds = match max_open_files.soft_limit { - LimitValue::Unlimited => match max_open_files.hard_limit { - LimitValue::Unlimited => 0, - LimitValue::Value(hard) => hard, - }, - LimitValue::Value(soft) => soft, - }; - let gauge = ConstGauge::new(max_fds as i64); - let metric_encoder = encoder.encode_descriptor( - "process_max_fds", - "Maximum number of open file descriptors.", - None, - gauge.metric_type(), - )?; - gauge.encode(metric_encoder)?; +impl ProcessCollector { + pub fn new(namespace: Option) -> Self { + #[cfg(target_os = "linux")] + let system = linux::System {}; - let max_address_space = limits.max_address_space; - let max_virtual_memory = match max_address_space.soft_limit { - LimitValue::Unlimited => match max_address_space.hard_limit { - LimitValue::Unlimited => 0, - LimitValue::Value(hard) => hard, - }, - LimitValue::Value(soft) => soft, - }; - let gauge = ConstGauge::new(max_fds as i64); - let metric_encoder = encoder.encode_descriptor( - "process_virtual_memory_max_bytes", - "Maximum amount of virtual memory available in bytes.", - None, - gauge.metric_type(), - )?; - gauge.encode(metric_encoder)?; + ProcessCollector { + namespace, + #[cfg(target_os = "linux")] + system, } + } +} - let vm_bytes = ConstGauge::new(stat.vsize as i64); - let vme = encoder.encode_descriptor( - "process_virtual_memory_bytes", - "Virtual memory size in bytes", - Some(&Unit::Bytes), - vm_bytes.metric_type(), - )?; - vm_bytes.encode(vme)?; - - // TODO: add rss_bytes (fix self.page_size) - // - // let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64); - // let rsse = encoder.encode_descriptor( - // "process_resident_memory_bytes", - // "Resident memory size in bytes.", - // Some(&Unit::Bytes), - // rss_bytes.metric_type(), - // )?; - // rss_bytes.encode(rsse)?; - +impl Collector for ProcessCollector { + fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { let start_time_from_epoch = SystemTime::now() .duration_since(UNIX_EPOCH) - // TODO: remove expect - .expect("process start time"); + .map_err(|e| std::fmt::Error)?; let start_time = ConstGauge::new(start_time_from_epoch.as_secs_f64()); let start_time_metric = encoder.encode_descriptor( "process_start_time_seconds", @@ -110,6 +42,9 @@ impl Collector for ProcessCollector { )?; start_time.encode(start_time_metric)?; + #[cfg(target_os = "linux")] + self.system.encode(encoder)?; + Ok(()) } } @@ -122,8 +57,6 @@ mod tests { #[test] fn register_process_collector() { let mut registry = Registry::default(); - registry.register_collector(Box::new(ProcessCollector { - namespace: String::new(), - })) + registry.register_collector(Box::new(ProcessCollector::new(None))) } } diff --git a/process-collector/src/linux.rs b/process-collector/src/linux.rs index e69de29b..3d31372f 100644 --- a/process-collector/src/linux.rs +++ b/process-collector/src/linux.rs @@ -0,0 +1,101 @@ +use procfs::process::{LimitValue, Process}; +use prometheus_client::{ + collector::Collector, + encoding::EncodeMetric, + metrics::{counter::ConstCounter, gauge::ConstGauge}, + registry::Unit, +}; + +#[derive(Debug)] +pub(crate) struct System {} + +impl Collector for System { + fn encode( + &self, + mut encoder: prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let tps = procfs::ticks_per_second(); + + // TODO: handle errors + let proc = match Process::myself() { + Ok(proc) => proc, + Err(_) => { + return Ok(()); + } + }; + let stat = match proc.stat() { + Ok(stat) => stat, + Err(_) => { + return Ok(()); + } + }; + + let cpu_time = (stat.stime + stat.utime) / tps; + let counter = ConstCounter::new(cpu_time); + let metric_encoder = encoder.encode_descriptor( + "process_cpu_seconds_total", + "Total user and system CPU time spent in seconds.", + Some(&Unit::Seconds), + counter.metric_type(), + )?; + counter.encode(metric_encoder)?; + + if let Ok(limits) = proc.limits() { + let max_open_files = limits.max_open_files; + let max_fds = match max_open_files.soft_limit { + LimitValue::Unlimited => match max_open_files.hard_limit { + LimitValue::Unlimited => 0, + LimitValue::Value(hard) => hard, + }, + LimitValue::Value(soft) => soft, + }; + let gauge = ConstGauge::new(max_fds as i64); + let metric_encoder = encoder.encode_descriptor( + "process_max_fds", + "Maximum number of open file descriptors.", + None, + gauge.metric_type(), + )?; + gauge.encode(metric_encoder)?; + + let max_address_space = limits.max_address_space; + let max_virtual_memory = match max_address_space.soft_limit { + LimitValue::Unlimited => match max_address_space.hard_limit { + LimitValue::Unlimited => 0, + LimitValue::Value(hard) => hard, + }, + LimitValue::Value(soft) => soft, + }; + let gauge = ConstGauge::new(max_virtual_memory as i64); + let metric_encoder = encoder.encode_descriptor( + "process_virtual_memory_max_bytes", + "Maximum amount of virtual memory available in bytes.", + None, + gauge.metric_type(), + )?; + gauge.encode(metric_encoder)?; + } + + let vm_bytes = ConstGauge::new(stat.vsize as i64); + let vme = encoder.encode_descriptor( + "process_virtual_memory_bytes", + "Virtual memory size in bytes", + Some(&Unit::Bytes), + vm_bytes.metric_type(), + )?; + vm_bytes.encode(vme)?; + + // TODO: add rss_bytes (fix self.page_size) + // + // let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64); + // let rsse = encoder.encode_descriptor( + // "process_resident_memory_bytes", + // "Resident memory size in bytes.", + // Some(&Unit::Bytes), + // rss_bytes.metric_type(), + // )?; + // rss_bytes.encode(rsse)?; + + Ok(()) + } +} From baffc486d37d878cfd74e45e028c1dea6f0909db Mon Sep 17 00:00:00 2001 From: Gustavo Murayama Date: Wed, 2 Apr 2025 22:09:15 -0300 Subject: [PATCH 4/7] feat: rss_bytes Signed-off-by: Gustavo Murayama --- process-collector/Cargo.toml | 3 +++ process-collector/src/lib.rs | 10 +++---- process-collector/src/linux.rs | 48 ++++++++++++++++++++++++++-------- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/process-collector/Cargo.toml b/process-collector/Cargo.toml index c90952bb..73ef42d3 100644 --- a/process-collector/Cargo.toml +++ b/process-collector/Cargo.toml @@ -8,3 +8,6 @@ edition = "2021" [dependencies] procfs = "0.17.0" prometheus-client = { path = "../" } + +[target.'cfg(unix)'.dependencies] +libc = "^0.2" diff --git a/process-collector/src/lib.rs b/process-collector/src/lib.rs index becaf244..0912128a 100644 --- a/process-collector/src/lib.rs +++ b/process-collector/src/lib.rs @@ -16,15 +16,15 @@ pub struct ProcessCollector { } impl ProcessCollector { - pub fn new(namespace: Option) -> Self { + pub fn new(namespace: Option) -> std::io::Result { #[cfg(target_os = "linux")] - let system = linux::System {}; + let system = linux::System::load()?; - ProcessCollector { + Ok(ProcessCollector { namespace, #[cfg(target_os = "linux")] system, - } + }) } } @@ -57,6 +57,6 @@ mod tests { #[test] fn register_process_collector() { let mut registry = Registry::default(); - registry.register_collector(Box::new(ProcessCollector::new(None))) + // registry.register_collector(Box::new(ProcessCollector::new(None))) } } diff --git a/process-collector/src/linux.rs b/process-collector/src/linux.rs index 3d31372f..bc60cdeb 100644 --- a/process-collector/src/linux.rs +++ b/process-collector/src/linux.rs @@ -1,3 +1,6 @@ +use std::io; + +use libc::{self}; use procfs::process::{LimitValue, Process}; use prometheus_client::{ collector::Collector, @@ -7,7 +10,16 @@ use prometheus_client::{ }; #[derive(Debug)] -pub(crate) struct System {} +pub(crate) struct System { + page_size: u64, +} + +impl System { + pub fn load() -> std::io::Result { + let page_size = page_size()?; + Ok(Self { page_size }) + } +} impl Collector for System { fn encode( @@ -16,7 +28,6 @@ impl Collector for System { ) -> Result<(), std::fmt::Error> { let tps = procfs::ticks_per_second(); - // TODO: handle errors let proc = match Process::myself() { Ok(proc) => proc, Err(_) => { @@ -86,16 +97,31 @@ impl Collector for System { vm_bytes.encode(vme)?; // TODO: add rss_bytes (fix self.page_size) - // - // let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64); - // let rsse = encoder.encode_descriptor( - // "process_resident_memory_bytes", - // "Resident memory size in bytes.", - // Some(&Unit::Bytes), - // rss_bytes.metric_type(), - // )?; - // rss_bytes.encode(rsse)?; + + let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64); + let rsse = encoder.encode_descriptor( + "process_resident_memory_bytes", + "Resident memory size in bytes.", + Some(&Unit::Bytes), + rss_bytes.metric_type(), + )?; + rss_bytes.encode(rsse)?; Ok(()) } } + +fn page_size() -> io::Result { + sysconf(libc::_SC_PAGESIZE) +} + +#[allow(unsafe_code)] +fn sysconf(num: libc::c_int) -> Result { + match unsafe { libc::sysconf(num) } { + e if e <= 0 => { + let error = io::Error::last_os_error(); + Err(error) + } + val => Ok(val as u64), + } +} From 0b04fe59d32f1a0e1ab10f46d503734b91c3f90e Mon Sep 17 00:00:00 2001 From: Gustavo Murayama Date: Thu, 3 Apr 2025 00:43:21 -0300 Subject: [PATCH 5/7] feat(netstat): network in and out Signed-off-by: Gustavo Murayama --- process-collector/Cargo.toml | 3 - process-collector/src/lib.rs | 2 +- process-collector/src/linux.rs | 127 -------------- process-collector/src/linux/mod.rs | 3 + process-collector/src/linux/netstat.rs | 69 ++++++++ process-collector/src/linux/system.rs | 232 +++++++++++++++++++++++++ 6 files changed, 305 insertions(+), 131 deletions(-) delete mode 100644 process-collector/src/linux.rs create mode 100644 process-collector/src/linux/mod.rs create mode 100644 process-collector/src/linux/netstat.rs create mode 100644 process-collector/src/linux/system.rs diff --git a/process-collector/Cargo.toml b/process-collector/Cargo.toml index 73ef42d3..c90952bb 100644 --- a/process-collector/Cargo.toml +++ b/process-collector/Cargo.toml @@ -8,6 +8,3 @@ edition = "2021" [dependencies] procfs = "0.17.0" prometheus-client = { path = "../" } - -[target.'cfg(unix)'.dependencies] -libc = "^0.2" diff --git a/process-collector/src/lib.rs b/process-collector/src/lib.rs index 0912128a..fddd3a44 100644 --- a/process-collector/src/lib.rs +++ b/process-collector/src/lib.rs @@ -32,7 +32,7 @@ impl Collector for ProcessCollector { fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { let start_time_from_epoch = SystemTime::now() .duration_since(UNIX_EPOCH) - .map_err(|e| std::fmt::Error)?; + .map_err(|_| std::fmt::Error)?; let start_time = ConstGauge::new(start_time_from_epoch.as_secs_f64()); let start_time_metric = encoder.encode_descriptor( "process_start_time_seconds", diff --git a/process-collector/src/linux.rs b/process-collector/src/linux.rs deleted file mode 100644 index bc60cdeb..00000000 --- a/process-collector/src/linux.rs +++ /dev/null @@ -1,127 +0,0 @@ -use std::io; - -use libc::{self}; -use procfs::process::{LimitValue, Process}; -use prometheus_client::{ - collector::Collector, - encoding::EncodeMetric, - metrics::{counter::ConstCounter, gauge::ConstGauge}, - registry::Unit, -}; - -#[derive(Debug)] -pub(crate) struct System { - page_size: u64, -} - -impl System { - pub fn load() -> std::io::Result { - let page_size = page_size()?; - Ok(Self { page_size }) - } -} - -impl Collector for System { - fn encode( - &self, - mut encoder: prometheus_client::encoding::DescriptorEncoder, - ) -> Result<(), std::fmt::Error> { - let tps = procfs::ticks_per_second(); - - let proc = match Process::myself() { - Ok(proc) => proc, - Err(_) => { - return Ok(()); - } - }; - let stat = match proc.stat() { - Ok(stat) => stat, - Err(_) => { - return Ok(()); - } - }; - - let cpu_time = (stat.stime + stat.utime) / tps; - let counter = ConstCounter::new(cpu_time); - let metric_encoder = encoder.encode_descriptor( - "process_cpu_seconds_total", - "Total user and system CPU time spent in seconds.", - Some(&Unit::Seconds), - counter.metric_type(), - )?; - counter.encode(metric_encoder)?; - - if let Ok(limits) = proc.limits() { - let max_open_files = limits.max_open_files; - let max_fds = match max_open_files.soft_limit { - LimitValue::Unlimited => match max_open_files.hard_limit { - LimitValue::Unlimited => 0, - LimitValue::Value(hard) => hard, - }, - LimitValue::Value(soft) => soft, - }; - let gauge = ConstGauge::new(max_fds as i64); - let metric_encoder = encoder.encode_descriptor( - "process_max_fds", - "Maximum number of open file descriptors.", - None, - gauge.metric_type(), - )?; - gauge.encode(metric_encoder)?; - - let max_address_space = limits.max_address_space; - let max_virtual_memory = match max_address_space.soft_limit { - LimitValue::Unlimited => match max_address_space.hard_limit { - LimitValue::Unlimited => 0, - LimitValue::Value(hard) => hard, - }, - LimitValue::Value(soft) => soft, - }; - let gauge = ConstGauge::new(max_virtual_memory as i64); - let metric_encoder = encoder.encode_descriptor( - "process_virtual_memory_max_bytes", - "Maximum amount of virtual memory available in bytes.", - None, - gauge.metric_type(), - )?; - gauge.encode(metric_encoder)?; - } - - let vm_bytes = ConstGauge::new(stat.vsize as i64); - let vme = encoder.encode_descriptor( - "process_virtual_memory_bytes", - "Virtual memory size in bytes", - Some(&Unit::Bytes), - vm_bytes.metric_type(), - )?; - vm_bytes.encode(vme)?; - - // TODO: add rss_bytes (fix self.page_size) - - let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64); - let rsse = encoder.encode_descriptor( - "process_resident_memory_bytes", - "Resident memory size in bytes.", - Some(&Unit::Bytes), - rss_bytes.metric_type(), - )?; - rss_bytes.encode(rsse)?; - - Ok(()) - } -} - -fn page_size() -> io::Result { - sysconf(libc::_SC_PAGESIZE) -} - -#[allow(unsafe_code)] -fn sysconf(num: libc::c_int) -> Result { - match unsafe { libc::sysconf(num) } { - e if e <= 0 => { - let error = io::Error::last_os_error(); - Err(error) - } - val => Ok(val as u64), - } -} diff --git a/process-collector/src/linux/mod.rs b/process-collector/src/linux/mod.rs new file mode 100644 index 00000000..3c76913e --- /dev/null +++ b/process-collector/src/linux/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod netstat; +mod system; +pub(crate) use system::System; diff --git a/process-collector/src/linux/netstat.rs b/process-collector/src/linux/netstat.rs new file mode 100644 index 00000000..0dbc7859 --- /dev/null +++ b/process-collector/src/linux/netstat.rs @@ -0,0 +1,69 @@ +use std::fs; +use std::io::{self, BufRead, BufReader}; + +#[derive(Debug, Default)] +pub struct Netstat { + pub ip_ext: IpExt, +} + +#[derive(Debug, Default)] +pub struct IpExt { + pub in_octets: Option, + pub out_octets: Option, +} + +impl Netstat { + pub fn read(pid: i32) -> io::Result { + let filename = format!("/proc/{}/net/netstat", pid); + let proc_netstat = read_from_file(&filename)?; + Ok(proc_netstat) + } +} +fn read_from_file(path: &str) -> io::Result { + let data = fs::read(path)?; + parse_proc_netstat(&data[..], path) +} + +fn parse_proc_netstat(reader: R, file_name: &str) -> io::Result { + let mut proc_netstat = Netstat::default(); + let reader = BufReader::new(reader); + let mut lines = reader.lines(); + + while let Some(header_line) = lines.next() { + let header = header_line?; + let name_parts: Vec<&str> = header.split_whitespace().collect(); + + let value_line = match lines.next() { + Some(l) => l?, + None => break, + }; + let value_parts: Vec<&str> = value_line.split_whitespace().collect(); + + let protocol = name_parts[0].trim_end_matches(':'); + if name_parts.len() != value_parts.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("mismatch field count in {}: {}", file_name, protocol), + )); + } + + for i in 1..name_parts.len() { + let value: f64 = value_parts[i].parse().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid value in {}: {}", file_name, e), + ) + })?; + let key = name_parts[i]; + match protocol { + "IpExt" => match key { + "InOctets" => proc_netstat.ip_ext.in_octets = Some(value), + "OutOctets" => proc_netstat.ip_ext.out_octets = Some(value), + _ => {} + }, + _ => {} + } + } + } + Ok(proc_netstat) +} diff --git a/process-collector/src/linux/system.rs b/process-collector/src/linux/system.rs new file mode 100644 index 00000000..f256310b --- /dev/null +++ b/process-collector/src/linux/system.rs @@ -0,0 +1,232 @@ +use procfs::process::{LimitValue, Process, Stat}; +use prometheus_client::{ + collector::Collector, + encoding::EncodeMetric, + metrics::{counter::ConstCounter, gauge::ConstGauge}, + registry::Unit, +}; + +use super::netstat::Netstat; + +type SystemResult = Result<(), std::fmt::Error>; + +#[derive(Debug)] +pub struct System { + namespace: String, + page_size: u64, +} + +impl System { + pub fn load(namespace: Option) -> std::io::Result { + let page_size = procfs::page_size(); + let namespace = match namespace { + Some(mut n) => { + n.push('_'); + n + } + None => "".to_string(), + }; + Ok(Self { + page_size, + namespace, + }) + } + + fn open_fds( + &self, + proc: &Process, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> SystemResult { + let open_file_descriptors = proc.fd_count().map_err(|_| std::fmt::Error)?; + let counter = ConstCounter::new(open_file_descriptors as u32); + let metric_name = format!("{}process_open_fds", &self.namespace); + let metric_encoder = encoder.encode_descriptor( + &metric_name, + "Number of open file descriptors.", + None, + counter.metric_type(), + )?; + counter.encode(metric_encoder)?; + + Ok(()) + } + + fn cpu_seconds_total( + &self, + stat: &Stat, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> SystemResult { + let tps = procfs::ticks_per_second(); + let cpu_time = (stat.stime + stat.utime) / tps; + let counter = ConstCounter::new(cpu_time); + let metric_name = format!("{}process_cpu_seconds_total", &self.namespace); + let metric_encoder = encoder.encode_descriptor( + &metric_name, + "Total user and system CPU time spent in seconds.", + Some(&Unit::Seconds), + counter.metric_type(), + )?; + counter.encode(metric_encoder)?; + + Ok(()) + } + + fn max_fds( + &self, + proc: &Process, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> SystemResult { + // TODO: handle error + if let Ok(limits) = proc.limits() { + let max_open_files = limits.max_open_files; + let max_fds = match max_open_files.soft_limit { + LimitValue::Unlimited => match max_open_files.hard_limit { + LimitValue::Unlimited => 0, + LimitValue::Value(hard) => hard, + }, + LimitValue::Value(soft) => soft, + }; + let gauge = ConstGauge::new(max_fds as i64); + let metric_name = format!("{}process_max_fds", &self.namespace); + let metric_encoder = encoder.encode_descriptor( + &metric_name, + "Maximum number of open file descriptors.", + None, + gauge.metric_type(), + )?; + gauge.encode(metric_encoder)?; + } + + Ok(()) + } + + fn virtual_memory_max_bytes( + &self, + proc: &Process, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> SystemResult { + // TODO: handle error + if let Ok(limits) = proc.limits() { + let max_address_space = limits.max_address_space; + let max_virtual_memory = match max_address_space.soft_limit { + LimitValue::Unlimited => match max_address_space.hard_limit { + LimitValue::Unlimited => 0, + LimitValue::Value(hard) => hard, + }, + LimitValue::Value(soft) => soft, + }; + let gauge = ConstGauge::new(max_virtual_memory as i64); + let metric_name = format!("{}process_virtual_memory_max_bytes", &self.namespace); + let metric_encoder = encoder.encode_descriptor( + &metric_name, + "Maximum amount of virtual memory available in bytes.", + None, + gauge.metric_type(), + )?; + gauge.encode(metric_encoder)?; + } + + Ok(()) + } + + fn virtual_memory_bytes( + &self, + stat: &Stat, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> SystemResult { + let vm_bytes = ConstGauge::new(stat.vsize as i64); + let metric_name = format!("{}process_virtual_memory_bytes", &self.namespace); + let vme = encoder.encode_descriptor( + &metric_name, + "Virtual memory size in bytes", + Some(&Unit::Bytes), + vm_bytes.metric_type(), + )?; + vm_bytes.encode(vme)?; + + Ok(()) + } + + fn resident_memory_bytes( + &self, + stat: &Stat, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> SystemResult { + let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64); + let metric_name = format!("{}process_resident_memory_bytes", &self.namespace); + let rsse = encoder.encode_descriptor( + &metric_name, + "Resident memory size in bytes.", + Some(&Unit::Bytes), + rss_bytes.metric_type(), + )?; + rss_bytes.encode(rsse)?; + + Ok(()) + } + + fn network_in_out( + &self, + stat: &Stat, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> SystemResult { + match Netstat::read(stat.pid) { + Ok(Netstat { ip_ext, .. }) => { + let recv_bytes = ConstCounter::new(ip_ext.in_octets.unwrap_or_default()); + let metric_name = format!("{}process_network_receive_bytes_total", &self.namespace); + let rbe = encoder.encode_descriptor( + &metric_name, + "Number of bytes received by the process over the network.", + Some(&Unit::Bytes), + recv_bytes.metric_type(), + )?; + recv_bytes.encode(rbe)?; + + let transmit_bytes = ConstCounter::new(ip_ext.out_octets.unwrap_or_default()); + let metric_name = + format!("{}process_network_transmit_bytes_total", &self.namespace); + let tbe = encoder.encode_descriptor( + &metric_name, + "Number of bytes sent by the process over the network.", + Some(&Unit::Bytes), + transmit_bytes.metric_type(), + )?; + transmit_bytes.encode(tbe)?; + } + // TODO: handle error case + Err(e) => {} + } + + Ok(()) + } +} + +impl Collector for System { + fn encode( + &self, + mut encoder: prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let proc = match Process::myself() { + Ok(proc) => proc, + Err(_) => { + return Ok(()); + } + }; + let stat = match proc.stat() { + Ok(stat) => stat, + Err(_) => { + return Ok(()); + } + }; + + self.resident_memory_bytes(&stat, &mut encoder)?; + self.virtual_memory_bytes(&stat, &mut encoder)?; + self.virtual_memory_max_bytes(&proc, &mut encoder)?; + self.open_fds(&proc, &mut encoder)?; + self.max_fds(&proc, &mut encoder)?; + self.cpu_seconds_total(&stat, &mut encoder)?; + self.network_in_out(&stat, &mut encoder)?; + + Ok(()) + } +} From 7bf72b09e7dfb7f59054895712b059e03c10cd19 Mon Sep 17 00:00:00 2001 From: Gustavo Murayama Date: Fri, 4 Apr 2025 22:01:58 -0300 Subject: [PATCH 6/7] tests: register process collector Signed-off-by: Gustavo Murayama --- process-collector/src/lib.rs | 187 ++++++++++++++++++++++++- process-collector/src/linux/netstat.rs | 7 +- process-collector/src/linux/system.rs | 13 +- 3 files changed, 190 insertions(+), 17 deletions(-) diff --git a/process-collector/src/lib.rs b/process-collector/src/lib.rs index fddd3a44..94efb339 100644 --- a/process-collector/src/lib.rs +++ b/process-collector/src/lib.rs @@ -10,7 +10,7 @@ mod linux; #[derive(Debug)] pub struct ProcessCollector { - namespace: Option, + namespace: String, #[cfg(target_os = "linux")] system: linux::System, } @@ -18,7 +18,14 @@ pub struct ProcessCollector { impl ProcessCollector { pub fn new(namespace: Option) -> std::io::Result { #[cfg(target_os = "linux")] - let system = linux::System::load()?; + let system = linux::System::load(namespace.clone())?; + let namespace = match namespace { + Some(mut n) => { + n.push('_'); + n + } + None => "".to_string(), + }; Ok(ProcessCollector { namespace, @@ -34,8 +41,9 @@ impl Collector for ProcessCollector { .duration_since(UNIX_EPOCH) .map_err(|_| std::fmt::Error)?; let start_time = ConstGauge::new(start_time_from_epoch.as_secs_f64()); + let metric_name = format!("{}process_start_time", self.namespace); let start_time_metric = encoder.encode_descriptor( - "process_start_time_seconds", + &metric_name, "Start time of the process since unix epoch in seconds.", Some(&Unit::Seconds), start_time.metric_type(), @@ -52,11 +60,178 @@ impl Collector for ProcessCollector { #[cfg(test)] mod tests { use super::*; - use prometheus_client::registry::Registry; + use prometheus_client::{encoding::text::encode, registry::Registry}; #[test] - fn register_process_collector() { + fn register_start_time() { let mut registry = Registry::default(); - // registry.register_collector(Box::new(ProcessCollector::new(None))) + let processor_collector = ProcessCollector::new(None).unwrap(); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let start_time = "# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.\n".to_owned() + + "# TYPE process_start_time_seconds gauge\n" + + "# UNIT process_start_time_seconds seconds\n" + + "process_start_time_seconds "; + + assert!( + encoded.contains(&start_time), + "encoded does not contain expected start_time" + ); + } + + #[test] + fn register_resident_memory() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(None).unwrap(); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let resident_memory = + "# HELP process_resident_memory_bytes Resident memory size in bytes.\n".to_owned() + + "# TYPE process_resident_memory_bytes gauge\n" + + "# UNIT process_resident_memory_bytes bytes\n" + + "process_resident_memory_bytes "; + + assert!( + encoded.contains(&resident_memory), + "encoded does not contain expected resident_memory" + ); + } + + #[test] + fn register_virtual_memory() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(None).unwrap(); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let virtual_memory = "# HELP process_virtual_memory_bytes Virtual memory size in bytes\n" + .to_owned() + + "# TYPE process_virtual_memory_bytes gauge\n" + + "# UNIT process_virtual_memory_bytes bytes\n" + + "process_virtual_memory_bytes "; + + assert!( + encoded.contains(&virtual_memory), + "encoded does not contain expected virtual_memory" + ); + } + + #[test] + fn register_virtual_memory_max() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(None).unwrap(); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let virtual_memory_max = "# HELP process_virtual_memory_max Maximum amount of virtual memory available in bytes.\n".to_owned() + + "# TYPE process_virtual_memory_max gauge\n" + + "process_virtual_memory_max "; + + assert!( + encoded.contains(&virtual_memory_max), + "encoded does not contain expected virtual_memory_max" + ); + } + + #[test] + fn register_open_fds() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(None).unwrap(); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let open_fds = "# HELP process_open_fds Number of open file descriptors.\n".to_owned() + + "# TYPE process_open_fds counter\n" + + "process_open_fds_total "; + + assert!( + encoded.contains(&open_fds), + "encoded does not contain expected open_fds" + ); + } + + #[test] + fn register_max_fds() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(None).unwrap(); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let max_fds = "# HELP process_max_fds Maximum number of open file descriptors.\n" + .to_owned() + + "# TYPE process_max_fds gauge\n" + + "process_max_fds "; + + assert!( + encoded.contains(&max_fds), + "encoded does not contain expected max_fds" + ); + } + + #[test] + fn register_cpu_seconds() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(None).unwrap(); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let cpu_seconds = + "# HELP process_cpu_seconds Total user and system CPU time spent in seconds.\n" + .to_owned() + + "# TYPE process_cpu_seconds counter\n" + + "# UNIT process_cpu_seconds seconds\n" + + "process_cpu_seconds_total "; + + assert!( + encoded.contains(&cpu_seconds), + "encoded does not contain expected cpu_seconds" + ); + } + + #[test] + fn register_network_receive() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(None).unwrap(); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let network_receive = "# HELP process_network_receive_bytes Number of bytes received by the process over the network.\n".to_owned() + + "# TYPE process_network_receive_bytes counter\n" + + "# UNIT process_network_receive_bytes bytes\n" + + "process_network_receive_bytes_total "; + + assert!( + encoded.contains(&network_receive), + "encoded does not contain expected network_receive" + ); + } + + #[test] + fn register_network_transmit() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(None).unwrap(); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let network_transmit = "# HELP process_network_transmit_bytes Number of bytes sent by the process over the network.\n".to_owned() + + "# TYPE process_network_transmit_bytes counter\n" + + "# UNIT process_network_transmit_bytes bytes\n" + + "process_network_transmit_bytes_total "; + + assert!( + encoded.contains(&network_transmit), + "encoded does not contain expected network_transmit" + ); } } diff --git a/process-collector/src/linux/netstat.rs b/process-collector/src/linux/netstat.rs index 0dbc7859..18de6902 100644 --- a/process-collector/src/linux/netstat.rs +++ b/process-collector/src/linux/netstat.rs @@ -55,13 +55,12 @@ fn parse_proc_netstat(reader: R, file_name: &str) -> io::Result match key { + if protocol == "IpExt" { + match key { "InOctets" => proc_netstat.ip_ext.in_octets = Some(value), "OutOctets" => proc_netstat.ip_ext.out_octets = Some(value), _ => {} - }, - _ => {} + } } } } diff --git a/process-collector/src/linux/system.rs b/process-collector/src/linux/system.rs index f256310b..47285bee 100644 --- a/process-collector/src/linux/system.rs +++ b/process-collector/src/linux/system.rs @@ -59,7 +59,7 @@ impl System { let tps = procfs::ticks_per_second(); let cpu_time = (stat.stime + stat.utime) / tps; let counter = ConstCounter::new(cpu_time); - let metric_name = format!("{}process_cpu_seconds_total", &self.namespace); + let metric_name = format!("{}process_cpu", &self.namespace); let metric_encoder = encoder.encode_descriptor( &metric_name, "Total user and system CPU time spent in seconds.", @@ -116,7 +116,7 @@ impl System { LimitValue::Value(soft) => soft, }; let gauge = ConstGauge::new(max_virtual_memory as i64); - let metric_name = format!("{}process_virtual_memory_max_bytes", &self.namespace); + let metric_name = format!("{}process_virtual_memory_max", &self.namespace); let metric_encoder = encoder.encode_descriptor( &metric_name, "Maximum amount of virtual memory available in bytes.", @@ -135,7 +135,7 @@ impl System { encoder: &mut prometheus_client::encoding::DescriptorEncoder, ) -> SystemResult { let vm_bytes = ConstGauge::new(stat.vsize as i64); - let metric_name = format!("{}process_virtual_memory_bytes", &self.namespace); + let metric_name = format!("{}process_virtual_memory", &self.namespace); let vme = encoder.encode_descriptor( &metric_name, "Virtual memory size in bytes", @@ -153,7 +153,7 @@ impl System { encoder: &mut prometheus_client::encoding::DescriptorEncoder, ) -> SystemResult { let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64); - let metric_name = format!("{}process_resident_memory_bytes", &self.namespace); + let metric_name = format!("{}process_resident_memory", &self.namespace); let rsse = encoder.encode_descriptor( &metric_name, "Resident memory size in bytes.", @@ -173,7 +173,7 @@ impl System { match Netstat::read(stat.pid) { Ok(Netstat { ip_ext, .. }) => { let recv_bytes = ConstCounter::new(ip_ext.in_octets.unwrap_or_default()); - let metric_name = format!("{}process_network_receive_bytes_total", &self.namespace); + let metric_name = format!("{}process_network_receive", &self.namespace); let rbe = encoder.encode_descriptor( &metric_name, "Number of bytes received by the process over the network.", @@ -183,8 +183,7 @@ impl System { recv_bytes.encode(rbe)?; let transmit_bytes = ConstCounter::new(ip_ext.out_octets.unwrap_or_default()); - let metric_name = - format!("{}process_network_transmit_bytes_total", &self.namespace); + let metric_name = format!("{}process_network_transmit", &self.namespace); let tbe = encoder.encode_descriptor( &metric_name, "Number of bytes sent by the process over the network.", From 29fa0071ce6443aa64017a3d22f7beeb20615488 Mon Sep 17 00:00:00 2001 From: Gustavo Murayama Date: Fri, 11 Apr 2025 23:33:32 -0300 Subject: [PATCH 7/7] feat: handle error Signed-off-by: Gustavo Murayama --- process-collector/src/lib.rs | 133 +++++++++++++++++-------- process-collector/src/linux/system.rs | 138 +++++++++++++++++--------- 2 files changed, 182 insertions(+), 89 deletions(-) diff --git a/process-collector/src/lib.rs b/process-collector/src/lib.rs index 94efb339..c8614e06 100644 --- a/process-collector/src/lib.rs +++ b/process-collector/src/lib.rs @@ -1,55 +1,47 @@ -use prometheus_client::{ - collector::Collector, - encoding::{DescriptorEncoder, EncodeMetric}, - metrics::gauge::ConstGauge, - registry::Unit, -}; -use std::time::{SystemTime, UNIX_EPOCH}; +use prometheus_client::{collector::Collector, encoding::DescriptorEncoder}; mod linux; +#[derive(Debug, Default)] +pub struct CollectorConfig { + namespace: Option, + report_error: bool, +} + +impl CollectorConfig { + pub fn with_namespace(mut self, namespace: Option) -> Self { + self.namespace = namespace; + + self + } + + pub fn with_report_error(mut self, report_error: bool) -> Self { + self.report_error = report_error; + + self + } +} + #[derive(Debug)] pub struct ProcessCollector { - namespace: String, #[cfg(target_os = "linux")] system: linux::System, } impl ProcessCollector { - pub fn new(namespace: Option) -> std::io::Result { + pub fn new(config: CollectorConfig) -> Self { #[cfg(target_os = "linux")] - let system = linux::System::load(namespace.clone())?; - let namespace = match namespace { - Some(mut n) => { - n.push('_'); - n - } - None => "".to_string(), - }; - - Ok(ProcessCollector { - namespace, + let system = linux::System::load(config.namespace.clone(), config.report_error); + + ProcessCollector { #[cfg(target_os = "linux")] system, - }) + } } } impl Collector for ProcessCollector { - fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { - let start_time_from_epoch = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map_err(|_| std::fmt::Error)?; - let start_time = ConstGauge::new(start_time_from_epoch.as_secs_f64()); - let metric_name = format!("{}process_start_time", self.namespace); - let start_time_metric = encoder.encode_descriptor( - &metric_name, - "Start time of the process since unix epoch in seconds.", - Some(&Unit::Seconds), - start_time.metric_type(), - )?; - start_time.encode(start_time_metric)?; - + fn encode(&self, encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { #[cfg(target_os = "linux")] self.system.encode(encoder)?; @@ -65,7 +57,7 @@ mod tests { #[test] fn register_start_time() { let mut registry = Registry::default(); - let processor_collector = ProcessCollector::new(None).unwrap(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); registry.register_collector(Box::new(processor_collector)); let mut encoded = String::new(); encode(&mut encoded, ®istry).unwrap(); @@ -84,7 +76,7 @@ mod tests { #[test] fn register_resident_memory() { let mut registry = Registry::default(); - let processor_collector = ProcessCollector::new(None).unwrap(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); registry.register_collector(Box::new(processor_collector)); let mut encoded = String::new(); encode(&mut encoded, ®istry).unwrap(); @@ -104,7 +96,7 @@ mod tests { #[test] fn register_virtual_memory() { let mut registry = Registry::default(); - let processor_collector = ProcessCollector::new(None).unwrap(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); registry.register_collector(Box::new(processor_collector)); let mut encoded = String::new(); encode(&mut encoded, ®istry).unwrap(); @@ -124,7 +116,7 @@ mod tests { #[test] fn register_virtual_memory_max() { let mut registry = Registry::default(); - let processor_collector = ProcessCollector::new(None).unwrap(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); registry.register_collector(Box::new(processor_collector)); let mut encoded = String::new(); encode(&mut encoded, ®istry).unwrap(); @@ -142,7 +134,7 @@ mod tests { #[test] fn register_open_fds() { let mut registry = Registry::default(); - let processor_collector = ProcessCollector::new(None).unwrap(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); registry.register_collector(Box::new(processor_collector)); let mut encoded = String::new(); encode(&mut encoded, ®istry).unwrap(); @@ -160,7 +152,7 @@ mod tests { #[test] fn register_max_fds() { let mut registry = Registry::default(); - let processor_collector = ProcessCollector::new(None).unwrap(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); registry.register_collector(Box::new(processor_collector)); let mut encoded = String::new(); encode(&mut encoded, ®istry).unwrap(); @@ -179,7 +171,7 @@ mod tests { #[test] fn register_cpu_seconds() { let mut registry = Registry::default(); - let processor_collector = ProcessCollector::new(None).unwrap(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); registry.register_collector(Box::new(processor_collector)); let mut encoded = String::new(); encode(&mut encoded, ®istry).unwrap(); @@ -200,7 +192,7 @@ mod tests { #[test] fn register_network_receive() { let mut registry = Registry::default(); - let processor_collector = ProcessCollector::new(None).unwrap(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); registry.register_collector(Box::new(processor_collector)); let mut encoded = String::new(); encode(&mut encoded, ®istry).unwrap(); @@ -219,7 +211,7 @@ mod tests { #[test] fn register_network_transmit() { let mut registry = Registry::default(); - let processor_collector = ProcessCollector::new(None).unwrap(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); registry.register_collector(Box::new(processor_collector)); let mut encoded = String::new(); encode(&mut encoded, ®istry).unwrap(); @@ -234,4 +226,57 @@ mod tests { "encoded does not contain expected network_transmit" ); } + + #[test] + fn include_namespace() { + let mut registry = Registry::default(); + let namespace = "namespace"; + let config = CollectorConfig::default().with_namespace(Some(namespace.to_string())); + let processor_collector = ProcessCollector::new(config); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let network_transmit = format!("{}_process_network_transmit_bytes_total ", namespace); + let network_receive = format!("{}_process_network_receive_bytes_total ", namespace); + let max_fds = format!("{}_process_max_fds ", namespace); + let open_fds = format!("{}_process_open_fds_total ", namespace); + let virtual_memory_max = format!("{}_process_virtual_memory_max ", namespace); + let virtual_memory = format!("{}_process_virtual_memory_bytes ", namespace); + let resident_memory = format!("{}_process_resident_memory_bytes ", namespace); + let start_time = format!("{}_process_start_time_seconds ", namespace); + + assert!( + encoded.contains(&network_transmit), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&network_receive), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&max_fds), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&open_fds), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&virtual_memory_max), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&virtual_memory), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&resident_memory), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&start_time), + "encoded does not contain expected network_transmit with namespace attached" + ); + } } diff --git a/process-collector/src/linux/system.rs b/process-collector/src/linux/system.rs index 47285bee..37bcac1a 100644 --- a/process-collector/src/linux/system.rs +++ b/process-collector/src/linux/system.rs @@ -1,3 +1,5 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + use procfs::process::{LimitValue, Process, Stat}; use prometheus_client::{ collector::Collector, @@ -8,16 +10,15 @@ use prometheus_client::{ use super::netstat::Netstat; -type SystemResult = Result<(), std::fmt::Error>; - #[derive(Debug)] pub struct System { namespace: String, page_size: u64, + report_error: bool, } impl System { - pub fn load(namespace: Option) -> std::io::Result { + pub fn load(namespace: Option, report_error: bool) -> Self { let page_size = procfs::page_size(); let namespace = match namespace { Some(mut n) => { @@ -26,17 +27,39 @@ impl System { } None => "".to_string(), }; - Ok(Self { + + Self { page_size, namespace, - }) + report_error, + } + } + + fn start_time( + &self, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let start_time_from_epoch = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|_| std::fmt::Error)?; + let start_time = ConstGauge::new(start_time_from_epoch.as_secs_f64()); + let metric_name = format!("{}process_start_time", self.namespace); + let start_time_metric = encoder.encode_descriptor( + &metric_name, + "Start time of the process since unix epoch in seconds.", + Some(&Unit::Seconds), + start_time.metric_type(), + )?; + start_time.encode(start_time_metric)?; + + Ok(()) } fn open_fds( &self, proc: &Process, encoder: &mut prometheus_client::encoding::DescriptorEncoder, - ) -> SystemResult { + ) -> Result<(), std::fmt::Error> { let open_file_descriptors = proc.fd_count().map_err(|_| std::fmt::Error)?; let counter = ConstCounter::new(open_file_descriptors as u32); let metric_name = format!("{}process_open_fds", &self.namespace); @@ -55,7 +78,7 @@ impl System { &self, stat: &Stat, encoder: &mut prometheus_client::encoding::DescriptorEncoder, - ) -> SystemResult { + ) -> Result<(), std::fmt::Error> { let tps = procfs::ticks_per_second(); let cpu_time = (stat.stime + stat.utime) / tps; let counter = ConstCounter::new(cpu_time); @@ -75,7 +98,7 @@ impl System { &self, proc: &Process, encoder: &mut prometheus_client::encoding::DescriptorEncoder, - ) -> SystemResult { + ) -> Result<(), std::fmt::Error> { // TODO: handle error if let Ok(limits) = proc.limits() { let max_open_files = limits.max_open_files; @@ -104,8 +127,7 @@ impl System { &self, proc: &Process, encoder: &mut prometheus_client::encoding::DescriptorEncoder, - ) -> SystemResult { - // TODO: handle error + ) -> Result<(), std::fmt::Error> { if let Ok(limits) = proc.limits() { let max_address_space = limits.max_address_space; let max_virtual_memory = match max_address_space.soft_limit { @@ -133,7 +155,7 @@ impl System { &self, stat: &Stat, encoder: &mut prometheus_client::encoding::DescriptorEncoder, - ) -> SystemResult { + ) -> Result<(), std::fmt::Error> { let vm_bytes = ConstGauge::new(stat.vsize as i64); let metric_name = format!("{}process_virtual_memory", &self.namespace); let vme = encoder.encode_descriptor( @@ -151,7 +173,7 @@ impl System { &self, stat: &Stat, encoder: &mut prometheus_client::encoding::DescriptorEncoder, - ) -> SystemResult { + ) -> Result<(), std::fmt::Error> { let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64); let metric_name = format!("{}process_resident_memory", &self.namespace); let rsse = encoder.encode_descriptor( @@ -169,35 +191,41 @@ impl System { &self, stat: &Stat, encoder: &mut prometheus_client::encoding::DescriptorEncoder, - ) -> SystemResult { - match Netstat::read(stat.pid) { - Ok(Netstat { ip_ext, .. }) => { - let recv_bytes = ConstCounter::new(ip_ext.in_octets.unwrap_or_default()); - let metric_name = format!("{}process_network_receive", &self.namespace); - let rbe = encoder.encode_descriptor( - &metric_name, - "Number of bytes received by the process over the network.", - Some(&Unit::Bytes), - recv_bytes.metric_type(), - )?; - recv_bytes.encode(rbe)?; - - let transmit_bytes = ConstCounter::new(ip_ext.out_octets.unwrap_or_default()); - let metric_name = format!("{}process_network_transmit", &self.namespace); - let tbe = encoder.encode_descriptor( - &metric_name, - "Number of bytes sent by the process over the network.", - Some(&Unit::Bytes), - transmit_bytes.metric_type(), - )?; - transmit_bytes.encode(tbe)?; - } - // TODO: handle error case - Err(e) => {} - } + ) -> Result<(), std::fmt::Error> { + let Netstat { ip_ext, .. } = Netstat::read(stat.pid).map_err(|_| std::fmt::Error)?; + let recv_bytes = ConstCounter::new(ip_ext.in_octets.unwrap_or_default()); + let metric_name = format!("{}process_network_receive", &self.namespace); + let rbe = encoder.encode_descriptor( + &metric_name, + "Number of bytes received by the process over the network.", + Some(&Unit::Bytes), + recv_bytes.metric_type(), + )?; + recv_bytes.encode(rbe)?; + + let transmit_bytes = ConstCounter::new(ip_ext.out_octets.unwrap_or_default()); + let metric_name = format!("{}process_network_transmit", &self.namespace); + let tbe = encoder.encode_descriptor( + &metric_name, + "Number of bytes sent by the process over the network.", + Some(&Unit::Bytes), + transmit_bytes.metric_type(), + )?; + transmit_bytes.encode(tbe)?; Ok(()) } + + fn handle_error_report( + &self, + result: Result<(), std::fmt::Error>, + ) -> Result<(), std::fmt::Error> { + if !self.report_error { + return Ok(()); + } + + result + } } impl Collector for System { @@ -218,14 +246,34 @@ impl Collector for System { } }; - self.resident_memory_bytes(&stat, &mut encoder)?; - self.virtual_memory_bytes(&stat, &mut encoder)?; - self.virtual_memory_max_bytes(&proc, &mut encoder)?; - self.open_fds(&proc, &mut encoder)?; - self.max_fds(&proc, &mut encoder)?; - self.cpu_seconds_total(&stat, &mut encoder)?; - self.network_in_out(&stat, &mut encoder)?; + self.handle_error_report(self.start_time(&mut encoder))?; + self.handle_error_report(self.resident_memory_bytes(&stat, &mut encoder))?; + self.handle_error_report(self.virtual_memory_bytes(&stat, &mut encoder))?; + self.handle_error_report(self.virtual_memory_max_bytes(&proc, &mut encoder))?; + self.handle_error_report(self.open_fds(&proc, &mut encoder))?; + self.handle_error_report(self.max_fds(&proc, &mut encoder))?; + self.handle_error_report(self.cpu_seconds_total(&stat, &mut encoder))?; + self.handle_error_report(self.network_in_out(&stat, &mut encoder))?; Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ignore_error_report() { + let system = System::load(None, false); + let result = system.handle_error_report(Err(std::fmt::Error)); + assert!(result.is_ok(), "handle_error_report did not ignore error"); + } + + #[test] + fn return_error() { + let system = System::load(None, true); + let result = system.handle_error_report(Err(std::fmt::Error)); + assert!(result.is_err(), "handle_error_report ignored error"); + } +}