Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix delete topic partitions #4094

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crates/fluvio-sc/src/controllers/topics/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl<C: MetadataItem> TopicNextState<C> {
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
topic.partitions_from_replicas(scheduler.partitions()).await;
}
next_state
}
Expand All @@ -239,7 +239,7 @@ impl<C: MetadataItem> TopicNextState<C> {
.await;
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
topic.partitions_from_replicas(scheduler.partitions()).await;
}
next_state
}
Expand All @@ -251,7 +251,7 @@ impl<C: MetadataItem> TopicNextState<C> {
let mut next_state = TopicNextState::same_next_state(topic);
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
topic.partitions_from_replicas(scheduler.partitions()).await;
}
next_state
}
Expand Down Expand Up @@ -335,7 +335,7 @@ impl<C: MetadataItem> TopicNextState<C> {
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
topic.partitions_from_replicas(scheduler.partitions()).await;
}
next_state
}
Expand Down
54 changes: 47 additions & 7 deletions crates/fluvio-sc/src/stores/topic/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub type DefaultTopicLocalStore = TopicLocalStore<u32>;

#[async_trait]
pub trait TopicMd<C: MetadataItem> {
async fn create_new_partitions(
async fn partitions_from_replicas(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>>;
Expand All @@ -30,8 +30,8 @@ impl<C: MetadataItem> TopicMd<C> for TopicMetadata<C>
where
C: MetadataItem + Send + Sync,
{
/// create new partitions from my replica map if it doesn't exists
async fn create_new_partitions(
/// get partitions from replica map
async fn partitions_from_replicas(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>> {
Expand All @@ -44,14 +44,16 @@ where
let replica_key = ReplicaKey::new(self.key(), *idx);

let partition_spec = PartitionSpec::from_replicas(replicas.clone(), &self.spec, mirror);
if !partition_store.contains_key(&replica_key).await {
let store = partition_store.read().await;
let partition = store.get(&replica_key);
if let Some(p) = partition {
partitions.push(p.inner().clone());
} else {
debug!(?replica_key, ?partition_spec, "creating new partition");
partitions.push(
MetadataStoreObject::with_spec(replica_key, partition_spec)
.with_context(self.ctx.create_child()),
)
} else {
debug!(?replica_key, "partition already exists");
}
}
partitions
Expand Down Expand Up @@ -109,8 +111,17 @@ where
#[cfg(test)]
mod test {
use fluvio_controlplane_metadata::topic::{TopicStatus, TopicResolution};
use fluvio_protocol::record::ReplicaKey;
use fluvio_sc_schema::{
partition::{PartitionSpec, PartitionStatus},
store::MetadataStoreObject,
topic::TopicSpec,
};

use crate::stores::topic::{DefaultTopicMd, DefaultTopicLocalStore};
use crate::stores::{
partition::DefaultPartitionStore,
topic::{DefaultTopicLocalStore, DefaultTopicMd, TopicMd},
};

#[test]
fn test_topic_replica_map() {
Expand Down Expand Up @@ -218,4 +229,33 @@ mod test {

assert_eq!(pending_state_names, expected);
}

#[fluvio_future::test]
async fn test_partitions_from_replicas() {
// already exist a partition with spu leader 0 and replica 1
let partition_stored = MetadataStoreObject::<PartitionSpec, u32>::new(
ReplicaKey::new("topic-1", 0_u32),
PartitionSpec::new(0, vec![0, 1]),
PartitionStatus::default(),
);

// create topic with 2 partitions (but already exist one)
let spec: TopicSpec = (2, 2, false).into();
let key = "topic-1";
let status = TopicStatus::new(
TopicResolution::Provisioned,
vec![vec![0, 1], vec![1, 2]],
"".to_owned(),
);
let topic = MetadataStoreObject::<TopicSpec, u32>::new(key, spec, status);
let partition_store = DefaultPartitionStore::bulk_new(vec![partition_stored]);

let partitions = topic.partitions_from_replicas(&partition_store).await;

assert_eq!(partitions.len(), 2);
assert_eq!(partitions[0].key, ReplicaKey::new("topic-1", 0_u32));
assert_eq!(partitions[0].spec.leader, 0);
assert_eq!(partitions[1].key, ReplicaKey::new("topic-1", 1_u32));
assert_eq!(partitions[1].spec.leader, 1);
}
}
16 changes: 16 additions & 0 deletions tests/cli/fluvio_smoke_tests/add-partitions.bats
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,19 @@ setup_file() {
assert_failure
}

@test "Delete topic" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi
debug_msg "Delete topic"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME"
assert_success

sleep 1
debug_msg "Check if the new partition received the message"
run bash -c 'timeout 15s "$FLUVIO_BIN" partition list | grep "$TOPIC_NAME"'
assert [ ${#lines[@]} -eq 0 ]
}
Loading