Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added hidden topics and partitions #3930

Merged
merged 3 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ fluvio-cli-common = { path = "crates/fluvio-cli-common"}
fluvio-compression = { version = "0.3", path = "crates/fluvio-compression" }
fluvio-connector-package = { path = "crates/fluvio-connector-package/" }
fluvio-controlplane = { path = "crates/fluvio-controlplane" }
fluvio-controlplane-metadata = { version = "0.25.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }
fluvio-controlplane-metadata = { version = "0.26.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }
fluvio-extension-common = { path = "crates/fluvio-extension-common", default-features = false }
fluvio-hub-util = { path = "crates/fluvio-hub-util" }
fluvio-package-index = { version = "0.7.0", path = "crates/fluvio-package-index", default-features = false }
fluvio-protocol = { version = "0.10.6", path = "crates/fluvio-protocol" }
fluvio-sc-schema = { version = "0.21.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-sc-schema = { version = "0.22.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-service = { path = "crates/fluvio-service" }
fluvio-smartengine = { version = "0.7.0", path = "crates/fluvio-smartengine", default-features = false }
fluvio-smartmodule = { version = "0.7.0", path = "crates/fluvio-smartmodule", default-features = false }
Expand Down
8 changes: 7 additions & 1 deletion crates/fluvio-cli/src/client/partition/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use anyhow::Result;

use fluvio::Fluvio;
use fluvio::metadata::partition::*;
use fluvio_sc_schema::objects::ListRequest;

use crate::common::output::Terminal;
use crate::common::OutputFormat;
Expand All @@ -18,6 +19,9 @@ use crate::common::OutputFormat;
pub struct ListPartitionOpt {
#[clap(flatten)]
output: OutputFormat,
/// Show system partitions only
#[arg(long, short, required = false)]
system: bool,
}

impl ListPartitionOpt {
Expand All @@ -29,7 +33,9 @@ impl ListPartitionOpt {
let output = self.output.format;
let admin = fluvio.admin().await;

let partitions = admin.all::<PartitionSpec>().await?;
let partitions = admin
.list_with_config::<PartitionSpec, String>(ListRequest::default().system(self.system))
.await?;

// format and dump to screen
display::format_partition_response_output(out, partitions, output)?;
Expand Down
10 changes: 8 additions & 2 deletions crates/fluvio-cli/src/client/topic/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use std::sync::Arc;

use clap::Parser;
use fluvio_sc_schema::objects::ListRequest;
use tracing::debug;
use anyhow::Result;

Expand All @@ -25,6 +26,9 @@ pub struct ListTopicsOpt {
/// Output
#[clap(flatten)]
output: OutputFormat,
/// Show system topics only
#[arg(long, short, required = false)]
system: bool,
}

impl ListTopicsOpt {
Expand All @@ -33,7 +37,9 @@ impl ListTopicsOpt {
debug!("list topics {:#?} ", output_type);
let admin = fluvio.admin().await;

let topics = admin.all::<TopicSpec>().await?;
let topics = admin
.list_with_config::<TopicSpec, String>(ListRequest::default().system(self.system))
.await?;
display::format_response_output(out, topics, output_type)?;
Ok(())
}
Expand All @@ -43,7 +49,7 @@ mod display {

use std::time::Duration;

use humantime::{format_duration};
use humantime::format_duration;
use comfy_table::{Row, Cell, CellAlignment};
use serde::Serialize;

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-controlplane-metadata/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-controlplane-metadata"
edition = "2021"
version = "0.25.1"
version = "0.26.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "Metadata definition for Fluvio control plane"
repository = "https://github.com/infinyon/fluvio"
Expand Down
4 changes: 4 additions & 0 deletions crates/fluvio-controlplane-metadata/src/partition/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub struct PartitionSpec {
pub compression_type: CompressionAlgorithm,
#[fluvio(min_version = 12)]
pub deduplication: Option<Deduplication>,
#[cfg_attr(feature = "use_serde", serde(default))]
#[fluvio(min_version = 13)]
pub system: bool,
}

impl PartitionSpec {
Expand All @@ -52,6 +55,7 @@ impl PartitionSpec {
storage: topic.get_storage().cloned(),
compression_type: topic.get_compression_type().clone(),
deduplication: topic.get_deduplication().cloned(),
system: topic.is_system(),
}
}

Expand Down
11 changes: 11 additions & 0 deletions crates/fluvio-controlplane-metadata/src/topic/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub struct TopicSpec {
#[cfg_attr(feature = "use_serde", serde(default))]
#[fluvio(min_version = 12)]
deduplication: Option<Deduplication>,
#[cfg_attr(feature = "use_serde", serde(default))]
#[fluvio(min_version = 13)]
system: bool,
}

impl From<ReplicaSpec> for TopicSpec {
Expand Down Expand Up @@ -109,6 +112,14 @@ impl TopicSpec {
self.deduplication = deduplication;
}

pub fn is_system(&self) -> bool {
self.system
}

pub fn set_system(&mut self, system: bool) {
self.system = system;
}

/// get retention secs that can be displayed
pub fn retention_secs(&self) -> u32 {
self.get_clean_policy()
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc-schema/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-sc-schema"
version = "0.21.2"
version = "0.22.0"
edition = "2021"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "Fluvio API for SC"
Expand Down
8 changes: 8 additions & 0 deletions crates/fluvio-sc-schema/src/objects/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub struct ListRequest<S> {
pub name_filters: ListFilters,
#[fluvio(min_version = 10)]
pub summary: bool, // if true, only return summary
#[fluvio(min_version = 13)]
pub system: bool, // if true, only return system specs
data: PhantomData<S>, // satisfy generic
}

Expand All @@ -76,9 +78,15 @@ impl<S> ListRequest<S> {
Self {
name_filters: name_filters.into(),
summary,
system: false,
data: PhantomData,
}
}

pub fn system(mut self, system: bool) -> Self {
self.system = system;
self
}
}

#[derive(Debug, Default, Encoder)]
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc-schema/src/objects/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub use list::*;
pub use watch::*;
pub use metadata::*;

pub(crate) const COMMON_VERSION: i16 = 12; // from now, we use a single version for all objects
pub(crate) const COMMON_VERSION: i16 = 13; // from now, we use a single version for all objects
pub(crate) const DYN_OBJ: i16 = 11; // version indicate dynamic object

#[cfg(test)]
Expand Down
5 changes: 3 additions & 2 deletions crates/fluvio-sc/src/services/public_api/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ pub async fn handle_list_request<AC: AuthContext, C: MetadataItem>(

let response = if let Some(req) = req.downcast()? as Option<ListRequest<TopicSpec>> {
ObjectApiListResponse::try_encode_from(
super::topic::handle_fetch_topics_request(req.name_filters, auth_ctx).await?,
super::topic::handle_fetch_topics_request(req.name_filters, req.system, auth_ctx)
.await?,
header.api_version(),
)?
} else if let Some(req) = req.downcast()? as Option<ListRequest<SpuSpec>> {
Expand All @@ -50,7 +51,7 @@ pub async fn handle_list_request<AC: AuthContext, C: MetadataItem>(
)?
} else if let Some(req) = req.downcast()? as Option<ListRequest<PartitionSpec>> {
ObjectApiListResponse::try_encode_from(
super::partition::handle_fetch_request(req.name_filters, auth_ctx).await?,
super::partition::handle_fetch_request(req.name_filters, req.system, auth_ctx).await?,
header.api_version(),
)?
} else if let Some(req) = req.downcast()? as Option<ListRequest<SmartModuleSpec>> {
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-sc/src/services/public_api/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::services::auth::AuthServiceContext;
#[instrument(skip(_filters, auth_ctx))]
pub async fn handle_fetch_request<AC: AuthContext, C: MetadataItem>(
_filters: ListFilters,
system: bool,
auth_ctx: &AuthServiceContext<AC, C>,
) -> Result<ListResponse<PartitionSpec>> {
debug!("fetching custom spu list");
Expand All @@ -38,6 +39,7 @@ pub async fn handle_fetch_request<AC: AuthContext, C: MetadataItem>(
.read()
.await
.values()
.filter(|value| value.inner().spec().system == system)
.map(|value| value.inner().clone().into())
.collect();

Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-sc/src/services/public_api/topic/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::services::auth::AuthServiceContext;
#[instrument(skip(filters, auth_ctx))]
pub async fn handle_fetch_topics_request<AC: AuthContext, C: MetadataItem>(
filters: ListFilters,
system: bool,
auth_ctx: &AuthServiceContext<AC, C>,
) -> Result<ListResponse<TopicSpec>> {
debug!("retrieving topic list: {:#?}", filters);
Expand All @@ -37,6 +38,7 @@ pub async fn handle_fetch_topics_request<AC: AuthContext, C: MetadataItem>(
.read()
.await
.values()
.filter(|value| value.inner().spec().is_system() == system)
.filter_map(|value| {
if filters.filter(value.key()) {
Some(value.inner().clone().into())
Expand Down
12 changes: 11 additions & 1 deletion crates/fluvio/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,18 @@ impl FluvioAdmin {
let filter_list: Vec<ListFilter> = filters.into_iter().map(Into::into).collect();
let list_request: ListRequest<S> = ListRequest::new(filter_list, summary);

self.list_with_config(list_request).await
}

#[instrument(skip(self, config))]
pub async fn list_with_config<S, F>(&self, config: ListRequest<S>) -> Result<Vec<Metadata<S>>>
where
S: AdminSpec,
ListFilter: From<F>,
S::Status: Encoder + Decoder + Debug,
{
let response = self
.send_receive_admin::<ObjectApiListRequest, _>(list_request)
.send_receive_admin::<ObjectApiListRequest, _>(config)
.await?;
trace!("list response: {:#?}", response);
response
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ where
};
}
Err(err) => {
error!("stream to server channel broken: {err:?}");
debug!("stream to server channel closed: {err:?}");
break;
}
}
Expand Down
2 changes: 2 additions & 0 deletions k8-util/helm/fluvio-sys/templates/crd_partition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ spec:
age:
type: string
nullable: true
system:
type: boolean
status:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
4 changes: 3 additions & 1 deletion k8-util/helm/fluvio-sys/templates/crd_topic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ spec:
minimum: 0
age:
type: string
nullable: true
nullable: true
system:
type: boolean
subresources:
status: {}
additionalPrinterColumns:
Expand Down
Loading