Skip to content

Commit

Permalink
chore: unify kill local process into a process mod
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Sep 3, 2024
1 parent c29b008 commit 7367ddf
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 150 deletions.
8 changes: 5 additions & 3 deletions crates/fluvio-cluster/src/cli/diagnostics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use crate::cli::ClusterCliError;
use crate::cli::start::default_log_directory;
use crate::start::local::{DEFAULT_DATA_DIR as DEFAULT_LOCAL_DIR, DEFAULT_METADATA_SUB_DIR};

const FLUVIO_PROCESS_NAME: &str = "fluvio";

#[derive(Parser, Debug)]
pub struct DiagnosticsOpt {
#[arg(long)]
Expand Down Expand Up @@ -155,7 +157,7 @@ impl DiagnosticsOpt {
// Filter for only Fluvio pods
let pods = pods
.split(' ')
.filter(|pod| pod.contains("fluvio"))
.filter(|pod| pod.contains(FLUVIO_PROCESS_NAME))
.collect::<Vec<_>>();

for &pod in &pods {
Expand Down Expand Up @@ -196,7 +198,7 @@ impl DiagnosticsOpt {
// Filter for only Fluvio services
let objects = objects
.split(' ')
.filter(|obj| !filter_fluvio || obj.contains("fluvio"))
.filter(|obj| !filter_fluvio || obj.contains(FLUVIO_PROCESS_NAME))
.map(|name| name.trim())
.collect::<Vec<_>>();

Expand Down Expand Up @@ -456,7 +458,7 @@ impl ProcessInfo {
let process_name = process.name().to_str();

if let Some(process_name) = process_name {
if process_name.contains("fluvio") {
if process_name.contains(FLUVIO_PROCESS_NAME) {
processes.push(ProcessInfo {
pid: pid.as_u32(),
name: process_name.to_string(),
Expand Down
66 changes: 10 additions & 56 deletions crates/fluvio-cluster/src/cli/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use std::ffi::OsString;
use std::fs::remove_file;
use std::process::Command;

use anyhow::bail;
use anyhow::Result;
use clap::Parser;
use tracing::debug;
use sysinfo::System;

use fluvio_types::defaults::SPU_MONITORING_UNIX_SOCKET;
use fluvio_command::CommandExt;

use crate::process;
use crate::render::ProgressRenderer;
use crate::cli::ClusterCliError;
use crate::progress::ProgressBarFactory;
Expand All @@ -36,7 +34,7 @@ impl ShutdownOpt {

match installation_type {
InstallationType::Local | InstallationType::LocalK8 | InstallationType::ReadOnly => {
Self::kill_local_processes(&installation_type, &pb).await?;
Self::shutdown_local(&installation_type, &pb).await?;
}
InstallationType::Cloud => {
let profile = config.config().current_profile_name().unwrap_or("none");
Expand All @@ -50,67 +48,23 @@ impl ShutdownOpt {
Ok(())
}

async fn kill_local_processes(
async fn shutdown_local(
installation_type: &InstallationType,
pb: &ProgressRenderer,
) -> Result<()> {
pb.set_message("Uninstalling fluvio local components");

let kill_proc = |name: &str, command_args: Option<&[String]>| {
sysinfo::set_open_files_limit(0);
let mut sys = System::new();
sys.refresh_processes(sysinfo::ProcessesToUpdate::All); // Only load what we need.
for process in sys.processes_by_exact_name(name.as_ref()) {
if let Some(cmd_args) = command_args {
let proc_cmds = process.cmd();
if cmd_args.len() > proc_cmds.len() {
continue; // Ignore procs with less command_args than the target.
}
if cmd_args
.iter()
.map(OsString::from)
.collect::<Vec<_>>()
.iter()
.ne(proc_cmds[..cmd_args.len()].iter())
{
continue; // Ignore procs which don't match.
}
}
if !process.kill() {
// This will fail if called on a proc running as root, so only log failure.
debug!(
"Sysinto process.kill() returned false. pid: {}, name: {}: user: {:?}",
process.pid(),
process.name().to_str().unwrap_or("unknown"),
process.user_id(),
);
}
}
};
kill_proc("fluvio", Some(&["cluster".into(), "run".into()]));
kill_proc("fluvio", Some(&["run".into()]));
kill_proc("fluvio-run", None);
process::kill_local_processes(pb).await?;

if let InstallationType::LocalK8 = installation_type {
let _ = Self::remove_custom_objects("spus", true);
}

// remove monitoring socket
match remove_file(SPU_MONITORING_UNIX_SOCKET) {
Ok(_) => {
pb.println(format!(
"Removed spu monitoring socket: {SPU_MONITORING_UNIX_SOCKET}"
));
}
Err(io_err) if io_err.kind() == std::io::ErrorKind::NotFound => {
debug!("SPU monitoring socket not found: {SPU_MONITORING_UNIX_SOCKET}");
}
Err(err) => {
pb.println(format!(
"SPU monitoring socket {SPU_MONITORING_UNIX_SOCKET}, can't be removed: {err}"
));
}
}
process::delete_fs(
Some(SPU_MONITORING_UNIX_SOCKET),
"SPU monitoring socket",
true,
Some(pb),
);

pb.println("Uninstalled fluvio local components");
pb.finish_and_clear();
Expand Down
96 changes: 5 additions & 91 deletions crates/fluvio-cluster/src/delete.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
use std::ffi::OsString;
use std::path::Path;
use std::process::Command;
use std::fs::{remove_dir_all, remove_file};

use derive_builder::Builder;
use k8_client::meta_client::MetadataClient;
use tracing::{info, warn, debug, instrument};
use sysinfo::System;

use fluvio_command::CommandExt;
use fluvio_types::defaults::SPU_MONITORING_UNIX_SOCKET;

use crate::helm::HelmClient;
use crate::charts::{APP_CHART_NAME, SYS_CHART_NAME};
use crate::progress::ProgressBarFactory;
use crate::render::ProgressRenderer;
use crate::DEFAULT_NAMESPACE;
use crate::{process, DEFAULT_NAMESPACE};
use crate::error::UninstallError;
use crate::start::local::{DEFAULT_DATA_DIR, LOCAL_CONFIG_PATH};
use anyhow::Result;

/// Uninstalls different flavors of fluvio
Expand Down Expand Up @@ -147,97 +141,17 @@ impl ClusterUninstaller {

async fn uninstall_local(&self) -> Result<()> {
let pb = self.pb_factory.create()?;
pb.set_message("Uninstalling fluvio local components");

let kill_proc = |name: &str, command_args: Option<&[String]>| {
sysinfo::set_open_files_limit(0);
let mut sys = System::new();
sys.refresh_processes(sysinfo::ProcessesToUpdate::All); // Only load what we need.
for process in sys.processes_by_exact_name(name.as_ref()) {
if let Some(cmd_args) = command_args {
// First command is the executable so cut that out.
let proc_cmds = process.cmd();
if cmd_args.len() > proc_cmds.len() {
continue; // Ignore procs with less command_args than the target.
}
if cmd_args
.iter()
.map(OsString::from)
.collect::<Vec<_>>()
.iter()
.ne(proc_cmds[..cmd_args.len()].iter())
{
continue; // Ignore procs which don't match.
}
}
if !process.kill() {
// This will fail if called on a proc running as root, so only log failure.
debug!(
"Sysinto process.kill() returned false. pid: {}, name: {}: user: {:?}",
process.pid(),
process.name().to_str().unwrap_or("unknown"),
process.user_id(),
);
}
}
};
kill_proc("fluvio", Some(&["cluster".into(), "run".into()]));
kill_proc("fluvio", Some(&["run".into()]));
kill_proc("fluvio-run", None);

fn delete_fs<T: AsRef<Path>>(
path: Option<T>,
tag: &'static str,
is_file: bool,
pb: Option<&ProgressRenderer>,
) {
match path {
Some(path) => {
let path_ref = path.as_ref();
match if is_file {
remove_file(path_ref)
} else {
remove_dir_all(path_ref)
} {
Ok(_) => {
debug!("Removed {}: {}", tag, path_ref.display());
if let Some(pb) = pb {
pb.println(format!("Removed {}", tag))
}
}
Err(err) => {
warn!("{} can't be removed: {}", tag, err);
if let Some(pb) = pb {
pb.println(format!("{tag}, can't be removed: {err}"))
}
}
}
}
None => {
warn!("Unable to find {}, cannot remove", tag);
}
}
}
process::kill_local_processes(&pb).await?;

// delete fluvio file
debug!("Removing fluvio directory");
delete_fs(DEFAULT_DATA_DIR.as_ref(), "data dir", false, None);
process::delete_data_dir();

// delete local cluster config file
delete_fs(
LOCAL_CONFIG_PATH.as_ref(),
"local cluster config",
true,
None,
);
process::delete_local_config();

// remove monitoring socket
delete_fs(
Some(SPU_MONITORING_UNIX_SOCKET),
"SPU monitoring socket",
true,
Some(&pb),
);
process::delete_spu_socket();

pb.println("Uninstalled fluvio local components");
pb.finish_and_clear();
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mod delete;
mod error;
mod progress;
pub mod runtime;
mod process;

/// extensions
#[cfg(feature = "cli")]
Expand Down
112 changes: 112 additions & 0 deletions crates/fluvio-cluster/src/process/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::ffi::OsString;
use std::fs::{remove_dir_all, remove_file};
use std::path::Path;

use fluvio_types::defaults::SPU_MONITORING_UNIX_SOCKET;
use sysinfo::System;
use anyhow::Result;

use tracing::{debug, warn};

use crate::render::ProgressRenderer;
use crate::start::local::{DEFAULT_DATA_DIR, LOCAL_CONFIG_PATH};

pub async fn kill_local_processes(pb: &ProgressRenderer) -> Result<()> {
pb.set_message("Uninstalling fluvio local components");

let kill_proc = |name: &str, command_args: Option<&[String]>| {
sysinfo::set_open_files_limit(0);
let mut sys = System::new();
sys.refresh_processes(sysinfo::ProcessesToUpdate::All); // Only load what we need.
for process in sys.processes_by_exact_name(name.as_ref()) {
if let Some(cmd_args) = command_args {
let proc_cmds = process.cmd();
if cmd_args.len() > proc_cmds.len() {
continue; // Ignore procs with less command_args than the target.
}
if cmd_args
.iter()
.map(OsString::from)
.collect::<Vec<_>>()
.iter()
.ne(proc_cmds[..cmd_args.len()].iter())
{
continue; // Ignore procs which don't match.
}
}
if !process.kill() {
// This will fail if called on a proc running as root, so only log failure.
debug!(
"Sysinto process.kill() returned false. pid: {}, name: {}: user: {:?}",
process.pid(),
process.name().to_str().unwrap_or("unknown"),
process.user_id(),
);
}
}
};
kill_proc("fluvio", Some(&["cluster".into(), "run".into()]));
kill_proc("fluvio", Some(&["run".into()]));
kill_proc("fluvio-run", None);

Ok(())
}

pub fn delete_fs<T: AsRef<Path>>(
path: Option<T>,
tag: &'static str,
is_file: bool,
pb: Option<&ProgressRenderer>,
) {
match path {
Some(path) => {
let path_ref = path.as_ref();
match if is_file {
remove_file(path_ref)
} else {
remove_dir_all(path_ref)
} {
Ok(_) => {
debug!("Removed {}: {}", tag, path_ref.display());
if let Some(pb) = pb {
pb.println(format!("Removed {}", tag))
}
}
Err(io_err) if io_err.kind() == std::io::ErrorKind::NotFound => {
debug!("{} not found: {}", tag, path_ref.display());
}
Err(err) => {
warn!("{} can't be removed: {}", tag, err);
if let Some(pb) = pb {
pb.println(format!("{tag}, can't be removed: {err}"))
}
}
}
}
None => {
warn!("Unable to find {}, cannot remove", tag);
}
}
}

pub fn delete_spu_socket() {
delete_fs(
Some(SPU_MONITORING_UNIX_SOCKET),
"SPU monitoring socket",
true,
None,
);
}

pub fn delete_local_config() {
delete_fs(
LOCAL_CONFIG_PATH.as_ref(),
"local cluster config",
true,
None,
);
}

pub fn delete_data_dir() {
delete_fs(DEFAULT_DATA_DIR.as_ref(), "data dir", false, None);
}

0 comments on commit 7367ddf

Please sign in to comment.