Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3810] forbid start when should be resumed #3965

Merged
merged 5 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions crates/fluvio-cluster/src/check/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ pub enum UnrecoverableCheckStatus {
#[error("Local Fluvio cluster still running")]
ExistingLocalCluster,

#[error("Local Fluvio cluster wasn't deleted. Use 'resume' to resume created cluster or 'delete' before starting a new one")]
CreateLocalConfigError,

#[error("Helm client error")]
HelmClientError,

Expand Down Expand Up @@ -691,6 +694,35 @@ impl ClusterCheck for LocalClusterCheck {
}
}

/// check for non deleted local cluster
#[derive(Debug)]
struct CleanLocalClusterCheck;

#[async_trait]
impl ClusterCheck for CleanLocalClusterCheck {
async fn perform_check(&self, _pb: &ProgressRenderer) -> CheckResult {
use crate::start::local::LOCAL_CONFIG_PATH;

let can_create_config = LOCAL_CONFIG_PATH
.as_ref()
.map(|p| !p.is_file())
.unwrap_or(false);
if !can_create_config {
return Ok(CheckStatus::Unrecoverable(
UnrecoverableCheckStatus::CreateLocalConfigError,
));
}

Ok(CheckStatus::pass(
"Previous local fluvio installation not found",
))
}

fn label(&self) -> &str {
"Clean Fluvio Local Installation"
}
}

/// Manages all cluster check operations
///
/// A `ClusterChecker` can be configured with different sets of checks to run.
Expand Down Expand Up @@ -745,10 +777,17 @@ impl ClusterChecker {
self
}

pub fn with_no_k8_checks(mut self) -> Self {
let checks: Vec<Box<(dyn ClusterCheck)>> = vec![Box::new(LocalClusterCheck)];
self.checks.extend(checks);
self
pub fn with_no_k8_checks(self) -> Self {
self.without_installed_local_cluster()
.with_clean_local_cluster()
}

pub fn without_installed_local_cluster(self) -> Self {
self.with_check(LocalClusterCheck)
}

pub fn with_clean_local_cluster(self) -> Self {
self.with_check(CleanLocalClusterCheck)
}

/// Adds all checks required for starting a cluster on minikube.
Expand Down
9 changes: 4 additions & 5 deletions crates/fluvio-cluster/src/cli/resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,29 @@ struct LocalResume {
impl LocalResume {
pub async fn resume(&self) -> Result<()> {
self.preflight_check().await?;

self.resume_previous_config().await
}

async fn resume_previous_config(&self) -> Result<()> {
let local_conf = match LOCAL_CONFIG_PATH.as_ref() {
None => {
return Err(ClusterCliError::Other(
"Can't find config from a previous run".to_string(),
"Configuration file for local cluster not found from previous run".to_string(),
)
.into())
}
Some(local_config_path) => LocalConfig::load_from(local_config_path),
}
.with_context(|| "Couldn't load local counfig file")?;
.with_context(|| "Unable to load configuration file for local cluster")?;

let installer = LocalInstaller::from_config(local_conf);
_ = installer.install().await?;
_ = installer.install_only().await?;
Ok(())
}

async fn preflight_check(&self) -> Result<()> {
ClusterChecker::empty()
.with_no_k8_checks()
.without_installed_local_cluster()
.run(&self.pb_factory, false)
.await?;
Ok(())
Expand Down
12 changes: 5 additions & 7 deletions crates/fluvio-cluster/src/cli/start/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ use crate::{LocalInstaller, LocalConfig};

use super::StartOpt;

/// Attempts to start a local Fluvio cluster
///
/// Returns `Ok(true)` on success, `Ok(false)` if pre-checks failed and are
/// reported, or `Err(e)` if something unexpected occurred.
/// Attempts to either start a local Fluvio cluster or check (and fix) the preliminery preflight checks.
/// Pass opt.setup = false, to only run the checks.
pub async fn process_local(opt: StartOpt, platform_version: Version) -> Result<()> {
let mut builder = LocalConfig::builder(platform_version);
builder
Expand Down Expand Up @@ -44,20 +42,20 @@ pub async fn process_local(opt: StartOpt, platform_version: Version) -> Result<(
let config = builder.build()?;
let installer = LocalInstaller::from_config(config);
if opt.setup {
setup_local(&installer).await?;
preflight_check(&installer).await?;
} else {
install_local(&installer).await?;
}

Ok(())
}

pub async fn install_local(installer: &LocalInstaller) -> Result<()> {
async fn install_local(installer: &LocalInstaller) -> Result<()> {
installer.install().await?;
Ok(())
}

pub async fn setup_local(installer: &LocalInstaller) -> Result<()> {
async fn preflight_check(installer: &LocalInstaller) -> Result<()> {
installer.preflight_check(false).await?;

Ok(())
Expand Down
5 changes: 5 additions & 0 deletions crates/fluvio-cluster/src/start/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ impl LocalInstaller {
self.preflight_check(true).await?;
};

self.install_only().await
}

#[instrument(skip(self))]
pub async fn install_only(&self) -> Result<StartStatus> {
let pb = self.pb_factory.create()?;

debug!("using log dir: {}", self.config.log_dir.display());
Expand Down
60 changes: 46 additions & 14 deletions tests/cli/fluvio_smoke_tests/non-concurrent/local-resume.bats
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,29 @@ run_resume() {
run bash -c "${close_fd_cmd} && ${run_resume_cmd} &" 3>&-
}

wait_for_spus() {
for retry in $(seq 1 10); do
run_list_spus
if [ "$status" -ne 0 ]; then
echo "retry listing SPUs..." >&3
sleep 3
else
echo "Got $output SPUs after ${retry} retries. It is expected to be the same as $CLUSTER_SPUS" >&3
break;
fi;
done;
}

setup_file() {
run_list_spus
# Expecting to have a running cluster
run_list_spus
assert_success

CLUSTER_SPUS=$output
export CLUSTER_SPUS
}

@test "Resume cluster maintains SPU replica number" {
skip "until 'resume' is released into a stable version; Otherwise, the CLI can't separate between stable and dev clusters, and the former is yet to support the new command"

run timeout 15s "$FLUVIO_BIN" cluster shutdown
assert_success

Expand All @@ -52,19 +64,39 @@ setup_file() {
resume_output=$output
run_resume $resume_output

for retry in $(seq 1 10); do
run_list_spus
if [ "$status" -ne 0 ]; then
echo "retry listing SPUs..." >&3
sleep 3
else
echo "Got $output SPUs after ${retry} retries. It is expected to be the same as $CLUSTER_SPUS" >&3
current_spus=$output
break;
fi;
done;
wait_for_spus
current_spus=$output

echo "Resume output ($resume_output):" >&3
cat $resume_output >&3
assert_equal $CLUSTER_SPUS $current_spus
}

@test "Can not start a running cluster" {
run_list_spus
assert_success

run timeout 15s "$FLUVIO_BIN" cluster start
assert_failure
}

@test "Can not start a shutdown cluster" {
# Ensure cluster is running
run_list_spus
assert_success

run timeout 15s "$FLUVIO_BIN" cluster shutdown
assert_success

# Start should fail, since cluster wasn't deleted
run timeout 15s "$FLUVIO_BIN" cluster start
assert_failure

# Restore cluster to restore test state
run mktemp
resume_output=$output
run_resume $resume_output

wait_for_spus
assert_success
}
Loading