diff --git a/Cargo.toml b/Cargo.toml index a0f5e2a..cc20d1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,13 @@ authors = ["Max Inden "] edition = "2021" description = "Open Metrics client library allowing users to natively instrument applications." license = "Apache-2.0 OR MIT" -keywords = ["openmetrics", "prometheus", "metrics", "instrumentation", "monitoring"] +keywords = [ + "openmetrics", + "prometheus", + "metrics", + "instrumentation", + "monitoring", +] repository = "https://github.com/prometheus/client_rust" homepage = "https://github.com/prometheus/client_rust" documentation = "https://docs.rs/prometheus-client" @@ -16,6 +22,7 @@ protobuf = ["dep:prost", "dep:prost-types", "dep:prost-build"] [workspace] members = ["derive-encode"] +exclude = ["process-collector"] [dependencies] dtoa = "1.0" @@ -36,7 +43,12 @@ quickcheck = "1" rand = "0.8.4" tide = "0.16" actix-web = "4" -tokio = { version = "1", features = ["rt-multi-thread", "net", "macros", "signal"] } +tokio = { version = "1", features = [ + "rt-multi-thread", + "net", + "macros", + "signal", +] } hyper = { version = "1.3.1", features = ["server", "http1"] } hyper-util = { version = "0.1.3", features = ["tokio"] } http-body-util = "0.1.1" diff --git a/process-collector/.gitignore b/process-collector/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/process-collector/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/process-collector/Cargo.toml b/process-collector/Cargo.toml new file mode 100644 index 0000000..c90952b --- /dev/null +++ b/process-collector/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "process-collector" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +procfs = "0.17.0" +prometheus-client = { path = "../" } diff --git a/process-collector/src/lib.rs b/process-collector/src/lib.rs new file mode 100644 index 0000000..c8614e0 --- /dev/null +++ b/process-collector/src/lib.rs @@ -0,0 +1,282 @@ +use prometheus_client::{collector::Collector, encoding::DescriptorEncoder}; + +mod linux; + +#[derive(Debug, Default)] +pub struct CollectorConfig { + namespace: Option, + report_error: bool, +} + +impl CollectorConfig { + pub fn with_namespace(mut self, namespace: Option) -> Self { + self.namespace = namespace; + + self + } + + pub fn with_report_error(mut self, report_error: bool) -> Self { + self.report_error = report_error; + + self + } +} + +#[derive(Debug)] +pub struct ProcessCollector { + #[cfg(target_os = "linux")] + system: linux::System, +} + +impl ProcessCollector { + pub fn new(config: CollectorConfig) -> Self { + #[cfg(target_os = "linux")] + let system = linux::System::load(config.namespace.clone(), config.report_error); + + ProcessCollector { + #[cfg(target_os = "linux")] + system, + } + } +} + +impl Collector for ProcessCollector { + fn encode(&self, encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { + #[cfg(target_os = "linux")] + self.system.encode(encoder)?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use prometheus_client::{encoding::text::encode, registry::Registry}; + + #[test] + fn register_start_time() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let start_time = "# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.\n".to_owned() + + "# TYPE process_start_time_seconds gauge\n" + + "# UNIT process_start_time_seconds seconds\n" + + "process_start_time_seconds "; + + assert!( + encoded.contains(&start_time), + "encoded does not contain expected start_time" + ); + } + + #[test] + fn register_resident_memory() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let resident_memory = + "# HELP process_resident_memory_bytes Resident memory size in bytes.\n".to_owned() + + "# TYPE process_resident_memory_bytes gauge\n" + + "# UNIT process_resident_memory_bytes bytes\n" + + "process_resident_memory_bytes "; + + assert!( + encoded.contains(&resident_memory), + "encoded does not contain expected resident_memory" + ); + } + + #[test] + fn register_virtual_memory() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let virtual_memory = "# HELP process_virtual_memory_bytes Virtual memory size in bytes\n" + .to_owned() + + "# TYPE process_virtual_memory_bytes gauge\n" + + "# UNIT process_virtual_memory_bytes bytes\n" + + "process_virtual_memory_bytes "; + + assert!( + encoded.contains(&virtual_memory), + "encoded does not contain expected virtual_memory" + ); + } + + #[test] + fn register_virtual_memory_max() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let virtual_memory_max = "# HELP process_virtual_memory_max Maximum amount of virtual memory available in bytes.\n".to_owned() + + "# TYPE process_virtual_memory_max gauge\n" + + "process_virtual_memory_max "; + + assert!( + encoded.contains(&virtual_memory_max), + "encoded does not contain expected virtual_memory_max" + ); + } + + #[test] + fn register_open_fds() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let open_fds = "# HELP process_open_fds Number of open file descriptors.\n".to_owned() + + "# TYPE process_open_fds counter\n" + + "process_open_fds_total "; + + assert!( + encoded.contains(&open_fds), + "encoded does not contain expected open_fds" + ); + } + + #[test] + fn register_max_fds() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let max_fds = "# HELP process_max_fds Maximum number of open file descriptors.\n" + .to_owned() + + "# TYPE process_max_fds gauge\n" + + "process_max_fds "; + + assert!( + encoded.contains(&max_fds), + "encoded does not contain expected max_fds" + ); + } + + #[test] + fn register_cpu_seconds() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let cpu_seconds = + "# HELP process_cpu_seconds Total user and system CPU time spent in seconds.\n" + .to_owned() + + "# TYPE process_cpu_seconds counter\n" + + "# UNIT process_cpu_seconds seconds\n" + + "process_cpu_seconds_total "; + + assert!( + encoded.contains(&cpu_seconds), + "encoded does not contain expected cpu_seconds" + ); + } + + #[test] + fn register_network_receive() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let network_receive = "# HELP process_network_receive_bytes Number of bytes received by the process over the network.\n".to_owned() + + "# TYPE process_network_receive_bytes counter\n" + + "# UNIT process_network_receive_bytes bytes\n" + + "process_network_receive_bytes_total "; + + assert!( + encoded.contains(&network_receive), + "encoded does not contain expected network_receive" + ); + } + + #[test] + fn register_network_transmit() { + let mut registry = Registry::default(); + let processor_collector = ProcessCollector::new(CollectorConfig::default()); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let network_transmit = "# HELP process_network_transmit_bytes Number of bytes sent by the process over the network.\n".to_owned() + + "# TYPE process_network_transmit_bytes counter\n" + + "# UNIT process_network_transmit_bytes bytes\n" + + "process_network_transmit_bytes_total "; + + assert!( + encoded.contains(&network_transmit), + "encoded does not contain expected network_transmit" + ); + } + + #[test] + fn include_namespace() { + let mut registry = Registry::default(); + let namespace = "namespace"; + let config = CollectorConfig::default().with_namespace(Some(namespace.to_string())); + let processor_collector = ProcessCollector::new(config); + registry.register_collector(Box::new(processor_collector)); + let mut encoded = String::new(); + encode(&mut encoded, ®istry).unwrap(); + + let network_transmit = format!("{}_process_network_transmit_bytes_total ", namespace); + let network_receive = format!("{}_process_network_receive_bytes_total ", namespace); + let max_fds = format!("{}_process_max_fds ", namespace); + let open_fds = format!("{}_process_open_fds_total ", namespace); + let virtual_memory_max = format!("{}_process_virtual_memory_max ", namespace); + let virtual_memory = format!("{}_process_virtual_memory_bytes ", namespace); + let resident_memory = format!("{}_process_resident_memory_bytes ", namespace); + let start_time = format!("{}_process_start_time_seconds ", namespace); + + assert!( + encoded.contains(&network_transmit), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&network_receive), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&max_fds), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&open_fds), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&virtual_memory_max), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&virtual_memory), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&resident_memory), + "encoded does not contain expected network_transmit with namespace attached" + ); + assert!( + encoded.contains(&start_time), + "encoded does not contain expected network_transmit with namespace attached" + ); + } +} diff --git a/process-collector/src/linux/mod.rs b/process-collector/src/linux/mod.rs new file mode 100644 index 0000000..3c76913 --- /dev/null +++ b/process-collector/src/linux/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod netstat; +mod system; +pub(crate) use system::System; diff --git a/process-collector/src/linux/netstat.rs b/process-collector/src/linux/netstat.rs new file mode 100644 index 0000000..18de690 --- /dev/null +++ b/process-collector/src/linux/netstat.rs @@ -0,0 +1,68 @@ +use std::fs; +use std::io::{self, BufRead, BufReader}; + +#[derive(Debug, Default)] +pub struct Netstat { + pub ip_ext: IpExt, +} + +#[derive(Debug, Default)] +pub struct IpExt { + pub in_octets: Option, + pub out_octets: Option, +} + +impl Netstat { + pub fn read(pid: i32) -> io::Result { + let filename = format!("/proc/{}/net/netstat", pid); + let proc_netstat = read_from_file(&filename)?; + Ok(proc_netstat) + } +} +fn read_from_file(path: &str) -> io::Result { + let data = fs::read(path)?; + parse_proc_netstat(&data[..], path) +} + +fn parse_proc_netstat(reader: R, file_name: &str) -> io::Result { + let mut proc_netstat = Netstat::default(); + let reader = BufReader::new(reader); + let mut lines = reader.lines(); + + while let Some(header_line) = lines.next() { + let header = header_line?; + let name_parts: Vec<&str> = header.split_whitespace().collect(); + + let value_line = match lines.next() { + Some(l) => l?, + None => break, + }; + let value_parts: Vec<&str> = value_line.split_whitespace().collect(); + + let protocol = name_parts[0].trim_end_matches(':'); + if name_parts.len() != value_parts.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("mismatch field count in {}: {}", file_name, protocol), + )); + } + + for i in 1..name_parts.len() { + let value: f64 = value_parts[i].parse().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid value in {}: {}", file_name, e), + ) + })?; + let key = name_parts[i]; + if protocol == "IpExt" { + match key { + "InOctets" => proc_netstat.ip_ext.in_octets = Some(value), + "OutOctets" => proc_netstat.ip_ext.out_octets = Some(value), + _ => {} + } + } + } + } + Ok(proc_netstat) +} diff --git a/process-collector/src/linux/system.rs b/process-collector/src/linux/system.rs new file mode 100644 index 0000000..37bcac1 --- /dev/null +++ b/process-collector/src/linux/system.rs @@ -0,0 +1,279 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use procfs::process::{LimitValue, Process, Stat}; +use prometheus_client::{ + collector::Collector, + encoding::EncodeMetric, + metrics::{counter::ConstCounter, gauge::ConstGauge}, + registry::Unit, +}; + +use super::netstat::Netstat; + +#[derive(Debug)] +pub struct System { + namespace: String, + page_size: u64, + report_error: bool, +} + +impl System { + pub fn load(namespace: Option, report_error: bool) -> Self { + let page_size = procfs::page_size(); + let namespace = match namespace { + Some(mut n) => { + n.push('_'); + n + } + None => "".to_string(), + }; + + Self { + page_size, + namespace, + report_error, + } + } + + fn start_time( + &self, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let start_time_from_epoch = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|_| std::fmt::Error)?; + let start_time = ConstGauge::new(start_time_from_epoch.as_secs_f64()); + let metric_name = format!("{}process_start_time", self.namespace); + let start_time_metric = encoder.encode_descriptor( + &metric_name, + "Start time of the process since unix epoch in seconds.", + Some(&Unit::Seconds), + start_time.metric_type(), + )?; + start_time.encode(start_time_metric)?; + + Ok(()) + } + + fn open_fds( + &self, + proc: &Process, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let open_file_descriptors = proc.fd_count().map_err(|_| std::fmt::Error)?; + let counter = ConstCounter::new(open_file_descriptors as u32); + let metric_name = format!("{}process_open_fds", &self.namespace); + let metric_encoder = encoder.encode_descriptor( + &metric_name, + "Number of open file descriptors.", + None, + counter.metric_type(), + )?; + counter.encode(metric_encoder)?; + + Ok(()) + } + + fn cpu_seconds_total( + &self, + stat: &Stat, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let tps = procfs::ticks_per_second(); + let cpu_time = (stat.stime + stat.utime) / tps; + let counter = ConstCounter::new(cpu_time); + let metric_name = format!("{}process_cpu", &self.namespace); + let metric_encoder = encoder.encode_descriptor( + &metric_name, + "Total user and system CPU time spent in seconds.", + Some(&Unit::Seconds), + counter.metric_type(), + )?; + counter.encode(metric_encoder)?; + + Ok(()) + } + + fn max_fds( + &self, + proc: &Process, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + // TODO: handle error + if let Ok(limits) = proc.limits() { + let max_open_files = limits.max_open_files; + let max_fds = match max_open_files.soft_limit { + LimitValue::Unlimited => match max_open_files.hard_limit { + LimitValue::Unlimited => 0, + LimitValue::Value(hard) => hard, + }, + LimitValue::Value(soft) => soft, + }; + let gauge = ConstGauge::new(max_fds as i64); + let metric_name = format!("{}process_max_fds", &self.namespace); + let metric_encoder = encoder.encode_descriptor( + &metric_name, + "Maximum number of open file descriptors.", + None, + gauge.metric_type(), + )?; + gauge.encode(metric_encoder)?; + } + + Ok(()) + } + + fn virtual_memory_max_bytes( + &self, + proc: &Process, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + if let Ok(limits) = proc.limits() { + let max_address_space = limits.max_address_space; + let max_virtual_memory = match max_address_space.soft_limit { + LimitValue::Unlimited => match max_address_space.hard_limit { + LimitValue::Unlimited => 0, + LimitValue::Value(hard) => hard, + }, + LimitValue::Value(soft) => soft, + }; + let gauge = ConstGauge::new(max_virtual_memory as i64); + let metric_name = format!("{}process_virtual_memory_max", &self.namespace); + let metric_encoder = encoder.encode_descriptor( + &metric_name, + "Maximum amount of virtual memory available in bytes.", + None, + gauge.metric_type(), + )?; + gauge.encode(metric_encoder)?; + } + + Ok(()) + } + + fn virtual_memory_bytes( + &self, + stat: &Stat, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let vm_bytes = ConstGauge::new(stat.vsize as i64); + let metric_name = format!("{}process_virtual_memory", &self.namespace); + let vme = encoder.encode_descriptor( + &metric_name, + "Virtual memory size in bytes", + Some(&Unit::Bytes), + vm_bytes.metric_type(), + )?; + vm_bytes.encode(vme)?; + + Ok(()) + } + + fn resident_memory_bytes( + &self, + stat: &Stat, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64); + let metric_name = format!("{}process_resident_memory", &self.namespace); + let rsse = encoder.encode_descriptor( + &metric_name, + "Resident memory size in bytes.", + Some(&Unit::Bytes), + rss_bytes.metric_type(), + )?; + rss_bytes.encode(rsse)?; + + Ok(()) + } + + fn network_in_out( + &self, + stat: &Stat, + encoder: &mut prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let Netstat { ip_ext, .. } = Netstat::read(stat.pid).map_err(|_| std::fmt::Error)?; + let recv_bytes = ConstCounter::new(ip_ext.in_octets.unwrap_or_default()); + let metric_name = format!("{}process_network_receive", &self.namespace); + let rbe = encoder.encode_descriptor( + &metric_name, + "Number of bytes received by the process over the network.", + Some(&Unit::Bytes), + recv_bytes.metric_type(), + )?; + recv_bytes.encode(rbe)?; + + let transmit_bytes = ConstCounter::new(ip_ext.out_octets.unwrap_or_default()); + let metric_name = format!("{}process_network_transmit", &self.namespace); + let tbe = encoder.encode_descriptor( + &metric_name, + "Number of bytes sent by the process over the network.", + Some(&Unit::Bytes), + transmit_bytes.metric_type(), + )?; + transmit_bytes.encode(tbe)?; + + Ok(()) + } + + fn handle_error_report( + &self, + result: Result<(), std::fmt::Error>, + ) -> Result<(), std::fmt::Error> { + if !self.report_error { + return Ok(()); + } + + result + } +} + +impl Collector for System { + fn encode( + &self, + mut encoder: prometheus_client::encoding::DescriptorEncoder, + ) -> Result<(), std::fmt::Error> { + let proc = match Process::myself() { + Ok(proc) => proc, + Err(_) => { + return Ok(()); + } + }; + let stat = match proc.stat() { + Ok(stat) => stat, + Err(_) => { + return Ok(()); + } + }; + + self.handle_error_report(self.start_time(&mut encoder))?; + self.handle_error_report(self.resident_memory_bytes(&stat, &mut encoder))?; + self.handle_error_report(self.virtual_memory_bytes(&stat, &mut encoder))?; + self.handle_error_report(self.virtual_memory_max_bytes(&proc, &mut encoder))?; + self.handle_error_report(self.open_fds(&proc, &mut encoder))?; + self.handle_error_report(self.max_fds(&proc, &mut encoder))?; + self.handle_error_report(self.cpu_seconds_total(&stat, &mut encoder))?; + self.handle_error_report(self.network_in_out(&stat, &mut encoder))?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ignore_error_report() { + let system = System::load(None, false); + let result = system.handle_error_report(Err(std::fmt::Error)); + assert!(result.is_ok(), "handle_error_report did not ignore error"); + } + + #[test] + fn return_error() { + let system = System::load(None, true); + let result = system.handle_error_report(Err(std::fmt::Error)); + assert!(result.is_err(), "handle_error_report ignored error"); + } +}