From c2f09239951a91a0ffa64c82998823afc116d0e0 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Thu, 4 Jul 2024 16:43:08 -0300 Subject: [PATCH] fix: delete topic partitions --- .../src/controllers/topics/policy.rs | 8 +-- crates/fluvio-sc/src/stores/topic/store.rs | 54 ++++++++++++++++--- .../fluvio_smoke_tests/add-partitions.bats | 16 ++++++ 3 files changed, 67 insertions(+), 11 deletions(-) diff --git a/crates/fluvio-sc/src/controllers/topics/policy.rs b/crates/fluvio-sc/src/controllers/topics/policy.rs index 5c1b84e8aa..f54b1206a3 100644 --- a/crates/fluvio-sc/src/controllers/topics/policy.rs +++ b/crates/fluvio-sc/src/controllers/topics/policy.rs @@ -222,7 +222,7 @@ impl TopicNextState { if next_state.resolution == TopicResolution::Provisioned { debug!("creating new partitions"); next_state.partitions = - topic.create_new_partitions(scheduler.partitions()).await; + topic.partitions_from_replicas(scheduler.partitions()).await; } next_state } @@ -239,7 +239,7 @@ impl TopicNextState { .await; if next_state.resolution == TopicResolution::Provisioned { next_state.partitions = - topic.create_new_partitions(scheduler.partitions()).await; + topic.partitions_from_replicas(scheduler.partitions()).await; } next_state } @@ -251,7 +251,7 @@ impl TopicNextState { let mut next_state = TopicNextState::same_next_state(topic); if next_state.resolution == TopicResolution::Provisioned { next_state.partitions = - topic.create_new_partitions(scheduler.partitions()).await; + topic.partitions_from_replicas(scheduler.partitions()).await; } next_state } @@ -335,7 +335,7 @@ impl TopicNextState { if next_state.resolution == TopicResolution::Provisioned { debug!("creating new partitions"); next_state.partitions = - topic.create_new_partitions(scheduler.partitions()).await; + topic.partitions_from_replicas(scheduler.partitions()).await; } next_state } diff --git a/crates/fluvio-sc/src/stores/topic/store.rs b/crates/fluvio-sc/src/stores/topic/store.rs index fa1e0a683c..30ef23c177 100644 --- a/crates/fluvio-sc/src/stores/topic/store.rs +++ b/crates/fluvio-sc/src/stores/topic/store.rs @@ -19,7 +19,7 @@ pub type DefaultTopicLocalStore = TopicLocalStore; #[async_trait] pub trait TopicMd { - async fn create_new_partitions( + async fn partitions_from_replicas( &self, partition_store: &PartitionLocalStore, ) -> Vec>; @@ -30,8 +30,8 @@ impl TopicMd for TopicMetadata where C: MetadataItem + Send + Sync, { - /// create new partitions from my replica map if it doesn't exists - async fn create_new_partitions( + /// get partitions from replica map + async fn partitions_from_replicas( &self, partition_store: &PartitionLocalStore, ) -> Vec> { @@ -44,14 +44,16 @@ where let replica_key = ReplicaKey::new(self.key(), *idx); let partition_spec = PartitionSpec::from_replicas(replicas.clone(), &self.spec, mirror); - if !partition_store.contains_key(&replica_key).await { + let store = partition_store.read().await; + let partition = store.get(&replica_key); + if let Some(p) = partition { + partitions.push(p.inner().clone()); + } else { debug!(?replica_key, ?partition_spec, "creating new partition"); partitions.push( MetadataStoreObject::with_spec(replica_key, partition_spec) .with_context(self.ctx.create_child()), ) - } else { - debug!(?replica_key, "partition already exists"); } } partitions @@ -109,8 +111,17 @@ where #[cfg(test)] mod test { use fluvio_controlplane_metadata::topic::{TopicStatus, TopicResolution}; + use fluvio_protocol::record::ReplicaKey; + use fluvio_sc_schema::{ + partition::{PartitionSpec, PartitionStatus}, + store::MetadataStoreObject, + topic::TopicSpec, + }; - use crate::stores::topic::{DefaultTopicMd, DefaultTopicLocalStore}; + use crate::stores::{ + partition::DefaultPartitionStore, + topic::{DefaultTopicLocalStore, DefaultTopicMd, TopicMd}, + }; #[test] fn test_topic_replica_map() { @@ -218,4 +229,33 @@ mod test { assert_eq!(pending_state_names, expected); } + + #[fluvio_future::test] + async fn test_partitions_from_replicas() { + // already exist a partition with spu leader 0 and replica 1 + let partition_stored = MetadataStoreObject::::new( + ReplicaKey::new("topic-1", 0_u32), + PartitionSpec::new(0, vec![0, 1]), + PartitionStatus::default(), + ); + + // create topic with 2 partitions (but already exist one) + let spec: TopicSpec = (2, 2, false).into(); + let key = "topic-1"; + let status = TopicStatus::new( + TopicResolution::Provisioned, + vec![vec![0, 1], vec![1, 2]], + "".to_owned(), + ); + let topic = MetadataStoreObject::::new(key, spec, status); + let partition_store = DefaultPartitionStore::bulk_new(vec![partition_stored]); + + let partitions = topic.partitions_from_replicas(&partition_store).await; + + assert_eq!(partitions.len(), 2); + assert_eq!(partitions[0].key, ReplicaKey::new("topic-1", 0_u32)); + assert_eq!(partitions[0].spec.leader, 0); + assert_eq!(partitions[1].key, ReplicaKey::new("topic-1", 1_u32)); + assert_eq!(partitions[1].spec.leader, 1); + } } diff --git a/tests/cli/fluvio_smoke_tests/add-partitions.bats b/tests/cli/fluvio_smoke_tests/add-partitions.bats index 200afd0195..af9a23b1e1 100644 --- a/tests/cli/fluvio_smoke_tests/add-partitions.bats +++ b/tests/cli/fluvio_smoke_tests/add-partitions.bats @@ -86,3 +86,19 @@ setup_file() { assert_failure } +@test "Delete topic" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + debug_msg "Delete topic" + run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME" + assert_success + + sleep 1 + debug_msg "Check if the new partition received the message" + run bash -c 'timeout 15s "$FLUVIO_BIN" partition list | grep "$TOPIC_NAME"' + assert [ ${#lines[@]} -eq 0 ] +}