Skip to content

Commit

Permalink
feat: add consumer storage to spu (#3915)
Browse files Browse the repository at this point in the history
* feat: add consumer storage to spu

* rename consumer to consumer offset

* fix review comments

* fix review comments

* removed ttl from consumer offset requests

* removed ttl from consumer offsets

* fix after rebase from master
  • Loading branch information
galibey committed Mar 29, 2024
1 parent 4062e05 commit cc6b3a2
Show file tree
Hide file tree
Showing 27 changed files with 1,451 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ fluvio-storage = { path = "crates/fluvio-storage" }
fluvio-stream-dispatcher = { version = "0.13.0", path = "crates/fluvio-stream-dispatcher" }
fluvio-stream-model = { version = "0.11.0", path = "crates/fluvio-stream-model", default-features = false }
fluvio-types = { version = "0.4.4", path = "crates/fluvio-types", default-features = false }
fluvio-kv-storage = { path = "crates/fluvio-kv-storage", default-features = false }

# Used to make eyre faster on debug builds
# See https://github.com/yaahc/color-eyre#improving-perf-on-debug-builds
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-controlplane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ mod alias {

pub type PartitionMetadata<C> = MetadataStoreObject<PartitionSpec, C>;
}

pub const CONSUMER_STORAGE_TOPIC: &str = "consumer-offset";
4 changes: 4 additions & 0 deletions crates/fluvio-protocol/src/record/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ impl RecordData {
pub fn as_utf8_lossy_string(&self) -> Cow<'_, str> {
String::from_utf8_lossy(self.as_ref())
}

pub fn into_vec(self) -> Vec<u8> {
self.0.into()
}
}

impl<V: Into<Vec<u8>>> From<V> for RecordData {
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ macro_rules! api_loop {
/// wait for a single request
#[macro_export]
macro_rules! wait_for_request {
( $api_stream:ident, $matcher:pat => $result:expr) => {{
( $api_stream:ident, $($matcher:pat => $result:expr),+) => {{
use futures_util::stream::StreamExt;

if let Some(msg) = $api_stream.next().await {
if let Ok(req_message) = msg {
tracing::trace!("received request: {:#?}", req_message);
match req_message {
$matcher => $result,
$($matcher => $result,)+
_ => {
tracing::error!("unexpected request: {:#?}", req_message);
return Ok(());
Expand Down
85 changes: 85 additions & 0 deletions crates/fluvio-spu-schema/src/server/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use fluvio_protocol::api::Request;
use fluvio_protocol::record::{Offset, ReplicaKey};
use fluvio_protocol::{Encoder, Decoder};
use fluvio_types::PartitionId;

use crate::COMMON_VERSION;
use crate::errors::ErrorCode;
use super::SpuServerApiKey;

#[derive(Decoder, Encoder, Default, Debug)]
pub struct UpdateConsumerOffsetRequest {
pub offset: Offset,
pub session_id: u32,
}

impl Request for UpdateConsumerOffsetRequest {
const API_KEY: u16 = SpuServerApiKey::UpdateConsumerOffset as u16;
const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
type Response = UpdateConsumerOffsetResponse;
}

impl UpdateConsumerOffsetRequest {
pub fn new(offset: Offset, session_id: u32) -> Self {
Self { offset, session_id }
}
}

#[derive(Encoder, Decoder, Default, Debug)]
pub struct UpdateConsumerOffsetResponse {
pub offset: Offset,
pub error_code: ErrorCode,
}

#[derive(Decoder, Encoder, Default, Debug)]
pub struct DeleteConsumerOffsetRequest {
pub replica_id: ReplicaKey,
pub consumer_id: String,
}

impl DeleteConsumerOffsetRequest {
pub fn new(
topic: impl Into<String>,
partition: PartitionId,
consumer_id: impl Into<String>,
) -> Self {
let replica_id = ReplicaKey::new(topic, partition);
Self {
replica_id,
consumer_id: consumer_id.into(),
}
}
}

impl Request for DeleteConsumerOffsetRequest {
const API_KEY: u16 = SpuServerApiKey::DeleteConsumerOffset as u16;
const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
type Response = DeleteConsumerOffsetResponse;
}

#[derive(Encoder, Decoder, Default, Debug)]
pub struct DeleteConsumerOffsetResponse {
pub error_code: ErrorCode,
}

#[derive(Decoder, Encoder, Default, Debug)]
pub struct FetchConsumerOffsetsRequest;

impl Request for FetchConsumerOffsetsRequest {
const API_KEY: u16 = SpuServerApiKey::FetchConsumerOffsets as u16;
const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
type Response = FetchConsumerOffsetsResponse;
}

#[derive(Encoder, Decoder, Default, Debug)]
pub struct FetchConsumerOffsetsResponse {
pub error_code: ErrorCode,
pub consumers: Vec<ConsumerOffset>,
}

#[derive(Encoder, Decoder, Default, Debug)]
pub struct ConsumerOffset {
pub id: String,
pub offset: Offset,
pub modified_time: u64,
}
6 changes: 1 addition & 5 deletions crates/fluvio-spu-schema/src/server/stream_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//!
use std::fmt::Debug;
use std::marker::PhantomData;
use std::time::Duration;

use educe::Educe;
use derive_builder::Builder;
Expand Down Expand Up @@ -89,9 +88,6 @@ pub struct StreamFetchRequest<R> {
#[builder(default)]
#[fluvio(min_version = 23)]
pub consumer_id: Option<String>,
#[builder(default)]
#[fluvio(min_version = 23)]
pub consumer_ttl: Option<Duration>,
#[builder(setter(skip))]
data: PhantomData<R>,
}
Expand Down Expand Up @@ -225,7 +221,7 @@ mod tests {
0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
];
assert_eq!(dest, expected);
}
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-spu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ fluvio-future = { workspace = true,features = [
] }
fluvio-smartengine = { workspace = true, optional = true }
fluvio-smartmodule = { workspace = true}
fluvio-kv-storage = { workspace = true}

[dev-dependencies]
once_cell = { workspace = true }
Expand Down
9 changes: 8 additions & 1 deletion crates/fluvio-spu/src/core/global_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use fluvio_types::SpuId;
use fluvio_storage::ReplicaStorage;

use crate::config::SpuConfig;
use crate::kv::consumer::SharedConsumerOffsetStorages;
use crate::replication::follower::FollowersState;
use crate::replication::follower::SharedFollowersState;
use crate::replication::leader::{
Expand Down Expand Up @@ -45,6 +46,7 @@ pub struct GlobalContext<S> {
sm_engine: SmartEngine,
leaders: Arc<LeaderConnections>,
metrics: Arc<SpuMetrics>,
consumer_offset: SharedConsumerOffsetStorages,
}

// -----------------------------------
Expand Down Expand Up @@ -76,6 +78,7 @@ where
sm_engine: SmartEngine::new(),
leaders: LeaderConnections::shared(spus, replicas),
metrics,
consumer_offset: SharedConsumerOffsetStorages::default(),
}
}

Expand Down Expand Up @@ -120,7 +123,7 @@ where
self.config.clone()
}

pub fn follower_notifier(&self) -> &FollowerNotifier {
pub fn follower_notifier(&self) -> &Arc<FollowerNotifier> {
&self.spu_followers
}

Expand Down Expand Up @@ -153,6 +156,10 @@ where
pub(crate) fn metrics(&self) -> Arc<SpuMetrics> {
self.metrics.clone()
}

pub(crate) fn consumer_offset(&self) -> &SharedConsumerOffsetStorages {
&self.consumer_offset
}
}

mod file_replica {
Expand Down
Loading

0 comments on commit cc6b3a2

Please sign in to comment.