Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update sysinfo and fix fd leak on linux #4163

Merged
merged 2 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ sha2 = { version = "0.10" }
siphasher = "1.0.0"
static_assertions = "1.1.0"
syn = "2.0"
sysinfo = { version = "0.30.13", default-features = false }
sysinfo = { version = "0.31.4", default-features = false, features = ["system"] }
tar = { version = "0.4.38", default-features = false }
tempfile = "3.4.0"
thiserror = "1.0.30"
Expand Down
5 changes: 3 additions & 2 deletions crates/cdk/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,9 @@ mod local_index {
table.load_preset(comfy_table::presets::NOTHING);
table.set_header(LIST_TABLE_HEADERS);

sysinfo::set_open_files_limit(0);
let mut system = sysinfo::System::new();
system.refresh_processes();
system.refresh_processes(sysinfo::ProcessesToUpdate::All);
for connector in self.entries {
let status = self.operator.status(&connector)?;
let Entry::Local {
Expand Down Expand Up @@ -524,7 +525,7 @@ mod local_index {
impl Default for LocalProcesses {
fn default() -> Self {
let mut system: sysinfo::System = Default::default();
system.refresh_processes();
system.refresh_processes(sysinfo::ProcessesToUpdate::All);
Self { system }
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ duct = { workspace = true, optional = true }
comfy-table = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
tar = { workspace = true , optional = true }
sysinfo = { workspace = true, default-features = false }
sysinfo = { workspace = true, default-features = false, features = ["system", "network", "disk"] }


# External Fluvio dependencies
Expand Down
5 changes: 3 additions & 2 deletions crates/fluvio-cluster/src/check/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,10 +686,11 @@ struct LocalClusterCheck;
#[async_trait]
impl ClusterCheck for LocalClusterCheck {
async fn perform_check(&self, _pb: &ProgressRenderer) -> CheckResult {
sysinfo::set_open_files_limit(0);
let mut sys = System::new();
sys.refresh_processes(); // Only load what we need.
sys.refresh_processes(sysinfo::ProcessesToUpdate::All); // Only load what we need.
let proc_count = sys
.processes_by_exact_name("fluvio-run")
.processes_by_exact_name("fluvio-run".as_ref())
.map(|x| println!(" found existing fluvio-run process. pid: {}", x.pid()))
.count();
if proc_count > 0 {
Expand Down
25 changes: 16 additions & 9 deletions crates/fluvio-cluster/src/cli/diagnostics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use crate::cli::ClusterCliError;
use crate::cli::start::default_log_directory;
use crate::start::local::{DEFAULT_DATA_DIR as DEFAULT_LOCAL_DIR, DEFAULT_METADATA_SUB_DIR};

const FLUVIO_PROCESS_NAME: &str = "fluvio";

#[derive(Parser, Debug)]
pub struct DiagnosticsOpt {
#[arg(long)]
Expand Down Expand Up @@ -155,7 +157,7 @@ impl DiagnosticsOpt {
// Filter for only Fluvio pods
let pods = pods
.split(' ')
.filter(|pod| pod.contains("fluvio"))
.filter(|pod| pod.contains(FLUVIO_PROCESS_NAME))
.collect::<Vec<_>>();

for &pod in &pods {
Expand Down Expand Up @@ -196,7 +198,7 @@ impl DiagnosticsOpt {
// Filter for only Fluvio services
let objects = objects
.split(' ')
.filter(|obj| !filter_fluvio || obj.contains("fluvio"))
.filter(|obj| !filter_fluvio || obj.contains(FLUVIO_PROCESS_NAME))
.map(|name| name.trim())
.collect::<Vec<_>>();

Expand Down Expand Up @@ -262,6 +264,7 @@ impl DiagnosticsOpt {
Ok(())
};

sysinfo::set_open_files_limit(0);
let mut sys = System::new_all();
let mut net = Networks::new();

Expand Down Expand Up @@ -452,13 +455,17 @@ impl ProcessInfo {
let mut processes = Vec::new();

for (pid, process) in sys.processes() {
if process.name().contains("fluvio") {
processes.push(ProcessInfo {
pid: pid.as_u32(),
name: process.name().to_string(),
disk_usage: format!("{:?}", process.disk_usage()),
cmd: format!("{:?}", process.cmd()),
});
let process_name = process.name().to_str();

if let Some(process_name) = process_name {
if process_name.contains(FLUVIO_PROCESS_NAME) {
processes.push(ProcessInfo {
pid: pid.as_u32(),
name: process_name.to_string(),
disk_usage: format!("{:?}", process.disk_usage()),
cmd: format!("{:?}", process.cmd()),
});
}
}
}

Expand Down
58 changes: 10 additions & 48 deletions crates/fluvio-cluster/src/cli/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::fs::remove_file;
use std::process::Command;

use anyhow::bail;
use anyhow::Result;
use clap::Parser;
use tracing::debug;
use sysinfo::System;

use fluvio_types::defaults::SPU_MONITORING_UNIX_SOCKET;
use fluvio_command::CommandExt;

use crate::process;
use crate::render::ProgressRenderer;
use crate::cli::ClusterCliError;
use crate::progress::ProgressBarFactory;
Expand All @@ -35,7 +34,7 @@ impl ShutdownOpt {

match installation_type {
InstallationType::Local | InstallationType::LocalK8 | InstallationType::ReadOnly => {
Self::kill_local_processes(&installation_type, &pb).await?;
Self::shutdown_local(&installation_type, &pb).await?;
}
InstallationType::Cloud => {
let profile = config.config().current_profile_name().unwrap_or("none");
Expand All @@ -49,60 +48,23 @@ impl ShutdownOpt {
Ok(())
}

async fn kill_local_processes(
async fn shutdown_local(
installation_type: &InstallationType,
pb: &ProgressRenderer,
) -> Result<()> {
pb.set_message("Uninstalling fluvio local components");

let kill_proc = |name: &str, command_args: Option<&[String]>| {
let mut sys = System::new();
sys.refresh_processes(); // Only load what we need.
for process in sys.processes_by_exact_name(name) {
if let Some(cmd_args) = command_args {
let proc_cmds = process.cmd();
if cmd_args.len() > proc_cmds.len() {
continue; // Ignore procs with less command_args than the target.
}
if cmd_args.iter().ne(proc_cmds[..cmd_args.len()].iter()) {
continue; // Ignore procs which don't match.
}
}
if !process.kill() {
// This will fail if called on a proc running as root, so only log failure.
debug!(
"Sysinto process.kill() returned false. pid: {}, name: {}: user: {:?}",
process.pid(),
process.name(),
process.user_id(),
);
}
}
};
kill_proc("fluvio", Some(&["cluster".into(), "run".into()]));
kill_proc("fluvio", Some(&["run".into()]));
kill_proc("fluvio-run", None);
process::kill_local_processes(pb).await?;

if let InstallationType::LocalK8 = installation_type {
let _ = Self::remove_custom_objects("spus", true);
}

// remove monitoring socket
match remove_file(SPU_MONITORING_UNIX_SOCKET) {
Ok(_) => {
pb.println(format!(
"Removed spu monitoring socket: {SPU_MONITORING_UNIX_SOCKET}"
));
}
Err(io_err) if io_err.kind() == std::io::ErrorKind::NotFound => {
debug!("SPU monitoring socket not found: {SPU_MONITORING_UNIX_SOCKET}");
}
Err(err) => {
pb.println(format!(
"SPU monitoring socket {SPU_MONITORING_UNIX_SOCKET}, can't be removed: {err}"
));
}
}
process::delete_fs(
Some(SPU_MONITORING_UNIX_SOCKET),
"SPU monitoring socket",
true,
Some(pb),
);

pb.println("Uninstalled fluvio local components");
pb.finish_and_clear();
Expand Down
Loading
Loading