diff --git a/Cargo.lock b/Cargo.lock index 7486ec25b0..ff8dd75473 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2746,7 +2746,7 @@ dependencies = [ [[package]] name = "fluvio-protocol" -version = "0.10.13" +version = "0.10.14" dependencies = [ "bytes", "content_inspector", diff --git a/crates/fluvio-controlplane-metadata/src/partition/spec.rs b/crates/fluvio-controlplane-metadata/src/partition/spec.rs index 6d55dcebca..c424f02606 100644 --- a/crates/fluvio-controlplane-metadata/src/partition/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/partition/spec.rs @@ -148,6 +148,10 @@ impl PartitionMirrorConfig { Self::Home(h) => h.remote_cluster.clone(), } } + + pub fn is_home_mirror(&self) -> bool { + matches!(self, Self::Home(_)) + } } impl std::fmt::Display for PartitionMirrorConfig { diff --git a/crates/fluvio-protocol/Cargo.toml b/crates/fluvio-protocol/Cargo.toml index d9f97e125d..f6cf551b75 100644 --- a/crates/fluvio-protocol/Cargo.toml +++ b/crates/fluvio-protocol/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-protocol" edition = "2021" -version = "0.10.13" +version = "0.10.14" authors = ["Fluvio Contributors "] description = "Fluvio streaming protocol" repository = "https://github.com/infinyon/fluvio" diff --git a/crates/fluvio-protocol/src/link/error_code.rs b/crates/fluvio-protocol/src/link/error_code.rs index e9fa523dbf..a155c0654b 100644 --- a/crates/fluvio-protocol/src/link/error_code.rs +++ b/crates/fluvio-protocol/src/link/error_code.rs @@ -215,6 +215,9 @@ pub enum ErrorCode { #[fluvio(tag = 11002)] #[error("the mirror already exists")] MirrorAlreadyExists, + #[fluvio(tag = 11003)] + #[error("produce from home is not allowed")] + MirrorProduceFromHome, // Specs #[fluvio(tag = 12001)] diff --git a/crates/fluvio-spu/src/services/public/produce_handler.rs b/crates/fluvio-spu/src/services/public/produce_handler.rs index 1d4ea5ce14..c12507ad1a 100644 --- a/crates/fluvio-spu/src/services/public/produce_handler.rs +++ b/crates/fluvio-spu/src/services/public/produce_handler.rs @@ -114,6 +114,17 @@ async fn handle_produce_topic( } }; + if let Some(mirror) = &leader_state.get_replica().mirror { + if mirror.is_home_mirror() { + debug!(%replica_id, "Mirror replica is not supported for produce"); + topic_result.partitions.push(PartitionWriteResult::error( + replica_id, + ErrorCode::MirrorProduceFromHome, + )); + continue; + } + } + if let Err(err) = apply_smartmodules( &mut partition_request, smartmodules, diff --git a/tests/cli/mirroring_smoke_tests/export.bats b/tests/cli/mirroring_smoke_tests/export.bats index 4a6b27b5bb..0690310f3b 100644 --- a/tests/cli/mirroring_smoke_tests/export.bats +++ b/tests/cli/mirroring_smoke_tests/export.bats @@ -14,7 +14,7 @@ setup_file() { REMOTE_NAME=remote-test-1 export REMOTE_NAME - debug_msg "Topic name: $REMOTE_NAME" + debug_msg "Remote name: $REMOTE_NAME" MESSAGE="$(random_string 7)" export MESSAGE diff --git a/tests/cli/mirroring_smoke_tests/mirror-topic.bats b/tests/cli/mirroring_smoke_tests/mirror-topic.bats new file mode 100644 index 0000000000..4fed1b3352 --- /dev/null +++ b/tests/cli/mirroring_smoke_tests/mirror-topic.bats @@ -0,0 +1,49 @@ +#!/usr/bin/env bats + +TEST_HELPER_DIR="$BATS_TEST_DIRNAME/../test_helper" +export TEST_HELPER_DIR + +load "$TEST_HELPER_DIR"/tools_check.bash +load "$TEST_HELPER_DIR"/fluvio_dev.bash +load "$TEST_HELPER_DIR"/bats-support/load.bash +load "$TEST_HELPER_DIR"/bats-assert/load.bash + +setup_file() { + CURRENT_DATE=$(date +%Y-%m) + export CURRENT_DATE + + REMOTE_NAME="$(random_string 7)" + export REMOTE_NAME + debug_msg "Remote name: $REMOTE_NAME" + + MESSAGE="$(random_string 7)" + export MESSAGE + debug_msg "$MESSAGE" + + TOPIC_NAME="$(random_string 7)" + export TOPIC_NAME + debug_msg "Topic name: $TOPIC_NAME" +} + +@test "Can register an remote cluster" { + run timeout 15s "$FLUVIO_BIN" remote register "$REMOTE_NAME" + + assert_output "remote cluster \"$REMOTE_NAME\" was registered" + assert_success +} + +@test "Can create a mirror topic" { + echo "[\"$REMOTE_NAME\"]" > remotes.json + run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME" --mirror-apply remotes.json + + assert_output "topic \"$TOPIC_NAME\" created" + assert_success +} + +@test "Can't produce to a mirror topic from home" { + MESSAGE="$(random_string 7)" + run bash -c 'echo "$MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME"' + + assert_output "Producer error: Producer received an error code: produce from home is not allowed" + assert_failure +} diff --git a/tests/cli/mirroring_smoke_tests/remote.bats b/tests/cli/mirroring_smoke_tests/remote.bats index 3f7f9801cf..390a75da55 100644 --- a/tests/cli/mirroring_smoke_tests/remote.bats +++ b/tests/cli/mirroring_smoke_tests/remote.bats @@ -14,7 +14,7 @@ setup_file() { REMOTE_NAME=remote-test-1 export REMOTE_NAME - debug_msg "Topic name: $REMOTE_NAME" + debug_msg "Remote name: $REMOTE_NAME" MESSAGE="$(random_string 7)" export MESSAGE