Skip to content

Commit

Permalink
Merge pull request #23227 from WillemKauf/backport-pr-23191-v23.3.x-797
Browse files Browse the repository at this point in the history
[v23.3.x] `cluster`: don't apply invalid property change in `topic_table` (Manual backport)
  • Loading branch information
piyushredpanda authored Sep 9, 2024
2 parents c03072f + 8b7dad1 commit 4b6e255
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 29 deletions.
28 changes: 28 additions & 0 deletions src/v/cluster/tests/topic_table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,34 @@ FIXTURE_TEST(test_topic_with_schema_id_validation_ops, topic_table_fixture) {
cfg = topics.get_topic_cfg(tp_ns);
BOOST_REQUIRE(cfg.has_value());
BOOST_REQUIRE(!cfg->properties.record_key_schema_id_validation.has_value());

// Ensure that an invalid update cmd does not get persisted in the topic
// table.
// Sanity check before starting.
BOOST_REQUIRE(!cfg->properties.record_key_schema_id_validation.has_value());
BOOST_REQUIRE(
!cfg->properties.record_key_schema_id_validation_compat.has_value());

update.record_key_schema_id_validation.op
= cluster::incremental_update_operation::set;
update.record_key_schema_id_validation.value.emplace(true);

update.record_key_schema_id_validation_compat.op
= cluster::incremental_update_operation::set;
update.record_key_schema_id_validation_compat.value.emplace(false);
ec = topics
.apply(
cluster::update_topic_properties_cmd{tp_ns, update},
model::offset{11})
.get();
BOOST_REQUIRE_EQUAL(ec, cluster::errc::topic_invalid_config);
cfg = topics.get_topic_cfg(tp_ns);
BOOST_REQUIRE(cfg.has_value());

// Properties from invalid configuration should not have been persisted.
BOOST_REQUIRE(!cfg->properties.record_key_schema_id_validation.has_value());
BOOST_REQUIRE(
!cfg->properties.record_key_schema_id_validation_compat.has_value());
}

FIXTURE_TEST(test_topic_table_iterator_basic, topic_table_fixture) {
Expand Down
66 changes: 37 additions & 29 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -833,76 +833,84 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
if (tp == _topics.end()) {
co_return make_error_code(errc::topic_not_exists);
}
auto& properties = tp->second.get_configuration().properties;
auto properties_snapshot = properties;
auto updated_properties = tp->second.get_configuration().properties;
auto& overrides = cmd.value;
/**
* Update topic properties
*/
incremental_update(
properties.cleanup_policy_bitflags, overrides.cleanup_policy_bitflags);
updated_properties.cleanup_policy_bitflags,
overrides.cleanup_policy_bitflags);
incremental_update(
properties.compaction_strategy, overrides.compaction_strategy);
incremental_update(properties.compression, overrides.compression);
incremental_update(properties.retention_bytes, overrides.retention_bytes);
updated_properties.compaction_strategy, overrides.compaction_strategy);
incremental_update(updated_properties.compression, overrides.compression);
incremental_update(
properties.retention_duration, overrides.retention_duration);
incremental_update(properties.segment_size, overrides.segment_size);
incremental_update(properties.timestamp_type, overrides.timestamp_type);

incremental_update(properties.shadow_indexing, overrides.shadow_indexing);
incremental_update(properties.batch_max_bytes, overrides.batch_max_bytes);

updated_properties.retention_bytes, overrides.retention_bytes);
incremental_update(
updated_properties.retention_duration, overrides.retention_duration);
incremental_update(updated_properties.segment_size, overrides.segment_size);
incremental_update(
updated_properties.timestamp_type, overrides.timestamp_type);
incremental_update(
updated_properties.shadow_indexing, overrides.shadow_indexing);
incremental_update(
properties.retention_local_target_bytes,
updated_properties.batch_max_bytes, overrides.batch_max_bytes);
incremental_update(
updated_properties.retention_local_target_bytes,
overrides.retention_local_target_bytes);
incremental_update(
properties.retention_local_target_ms,
updated_properties.retention_local_target_ms,
overrides.retention_local_target_ms);
incremental_update(
properties.remote_delete,
updated_properties.remote_delete,
overrides.remote_delete,
storage::ntp_config::default_remote_delete);
incremental_update(properties.segment_ms, overrides.segment_ms);
incremental_update(updated_properties.segment_ms, overrides.segment_ms);
incremental_update(
properties.record_key_schema_id_validation,
updated_properties.record_key_schema_id_validation,
overrides.record_key_schema_id_validation);
incremental_update(
properties.record_key_schema_id_validation_compat,
updated_properties.record_key_schema_id_validation_compat,
overrides.record_key_schema_id_validation_compat);
incremental_update(
properties.record_key_subject_name_strategy,
updated_properties.record_key_subject_name_strategy,
overrides.record_key_subject_name_strategy);
incremental_update(
properties.record_key_subject_name_strategy_compat,
updated_properties.record_key_subject_name_strategy_compat,
overrides.record_key_subject_name_strategy_compat);
incremental_update(
properties.record_value_schema_id_validation,
updated_properties.record_value_schema_id_validation,
overrides.record_value_schema_id_validation);
incremental_update(
properties.record_value_schema_id_validation_compat,
updated_properties.record_value_schema_id_validation_compat,
overrides.record_value_schema_id_validation_compat);
incremental_update(
properties.record_value_subject_name_strategy,
updated_properties.record_value_subject_name_strategy,
overrides.record_value_subject_name_strategy);
incremental_update(
properties.record_value_subject_name_strategy_compat,
updated_properties.record_value_subject_name_strategy_compat,
overrides.record_value_subject_name_strategy_compat);
incremental_update(
properties.initial_retention_local_target_bytes,
updated_properties.initial_retention_local_target_bytes,
overrides.initial_retention_local_target_bytes);
incremental_update(
properties.initial_retention_local_target_ms,
updated_properties.initial_retention_local_target_ms,
overrides.initial_retention_local_target_ms);

auto& properties = tp->second.get_configuration().properties;

// no configuration change, no need to generate delta
if (properties == properties_snapshot) {
if (updated_properties == properties) {
co_return errc::success;
}

if (!schema_id_validation_validator::is_valid(properties)) {
if (!schema_id_validation_validator::is_valid(updated_properties)) {
co_return schema_id_validation_validator::ec;
}

// Apply the changes
properties = std::move(updated_properties);

// generate deltas for controller backend
const auto& assignments = tp->second.get_assignments();
for (auto& p_as : assignments) {
Expand Down

0 comments on commit 4b6e255

Please sign in to comment.