Skip to content

Commit

Permalink
fix: delete topic partitions (#4094)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Jul 4, 2024
1 parent 04ddf2a commit a47bd8f
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 11 deletions.
8 changes: 4 additions & 4 deletions crates/fluvio-sc/src/controllers/topics/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl<C: MetadataItem> TopicNextState<C> {
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
topic.partitions_from_replicas(scheduler.partitions()).await;
}
next_state
}
Expand All @@ -239,7 +239,7 @@ impl<C: MetadataItem> TopicNextState<C> {
.await;
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
topic.partitions_from_replicas(scheduler.partitions()).await;
}
next_state
}
Expand All @@ -251,7 +251,7 @@ impl<C: MetadataItem> TopicNextState<C> {
let mut next_state = TopicNextState::same_next_state(topic);
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
topic.partitions_from_replicas(scheduler.partitions()).await;
}
next_state
}
Expand Down Expand Up @@ -335,7 +335,7 @@ impl<C: MetadataItem> TopicNextState<C> {
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
topic.partitions_from_replicas(scheduler.partitions()).await;
}
next_state
}
Expand Down
54 changes: 47 additions & 7 deletions crates/fluvio-sc/src/stores/topic/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub type DefaultTopicLocalStore = TopicLocalStore<u32>;

#[async_trait]
pub trait TopicMd<C: MetadataItem> {
async fn create_new_partitions(
async fn partitions_from_replicas(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>>;
Expand All @@ -30,8 +30,8 @@ impl<C: MetadataItem> TopicMd<C> for TopicMetadata<C>
where
C: MetadataItem + Send + Sync,
{
/// create new partitions from my replica map if it doesn't exists
async fn create_new_partitions(
/// get partitions from replica map
async fn partitions_from_replicas(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>> {
Expand All @@ -44,14 +44,16 @@ where
let replica_key = ReplicaKey::new(self.key(), *idx);

let partition_spec = PartitionSpec::from_replicas(replicas.clone(), &self.spec, mirror);
if !partition_store.contains_key(&replica_key).await {
let store = partition_store.read().await;
let partition = store.get(&replica_key);
if let Some(p) = partition {
partitions.push(p.inner().clone());
} else {
debug!(?replica_key, ?partition_spec, "creating new partition");
partitions.push(
MetadataStoreObject::with_spec(replica_key, partition_spec)
.with_context(self.ctx.create_child()),
)
} else {
debug!(?replica_key, "partition already exists");
}
}
partitions
Expand Down Expand Up @@ -109,8 +111,17 @@ where
#[cfg(test)]
mod test {
use fluvio_controlplane_metadata::topic::{TopicStatus, TopicResolution};
use fluvio_protocol::record::ReplicaKey;
use fluvio_sc_schema::{
partition::{PartitionSpec, PartitionStatus},
store::MetadataStoreObject,
topic::TopicSpec,
};

use crate::stores::topic::{DefaultTopicMd, DefaultTopicLocalStore};
use crate::stores::{
partition::DefaultPartitionStore,
topic::{DefaultTopicLocalStore, DefaultTopicMd, TopicMd},
};

#[test]
fn test_topic_replica_map() {
Expand Down Expand Up @@ -218,4 +229,33 @@ mod test {

assert_eq!(pending_state_names, expected);
}

#[fluvio_future::test]
async fn test_partitions_from_replicas() {
// already exist a partition with spu leader 0 and replica 1
let partition_stored = MetadataStoreObject::<PartitionSpec, u32>::new(
ReplicaKey::new("topic-1", 0_u32),
PartitionSpec::new(0, vec![0, 1]),
PartitionStatus::default(),
);

// create topic with 2 partitions (but already exist one)
let spec: TopicSpec = (2, 2, false).into();
let key = "topic-1";
let status = TopicStatus::new(
TopicResolution::Provisioned,
vec![vec![0, 1], vec![1, 2]],
"".to_owned(),
);
let topic = MetadataStoreObject::<TopicSpec, u32>::new(key, spec, status);
let partition_store = DefaultPartitionStore::bulk_new(vec![partition_stored]);

let partitions = topic.partitions_from_replicas(&partition_store).await;

assert_eq!(partitions.len(), 2);
assert_eq!(partitions[0].key, ReplicaKey::new("topic-1", 0_u32));
assert_eq!(partitions[0].spec.leader, 0);
assert_eq!(partitions[1].key, ReplicaKey::new("topic-1", 1_u32));
assert_eq!(partitions[1].spec.leader, 1);
}
}
16 changes: 16 additions & 0 deletions tests/cli/fluvio_smoke_tests/add-partitions.bats
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,19 @@ setup_file() {
assert_failure
}

@test "Delete topic" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi
debug_msg "Delete topic"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME"
assert_success

sleep 1
debug_msg "Check if the new partition received the message"
run bash -c 'timeout 15s "$FLUVIO_BIN" partition list | grep "$TOPIC_NAME"'
assert [ ${#lines[@]} -eq 0 ]
}

0 comments on commit a47bd8f

Please sign in to comment.