From f26be3e2a6c0901e361ea604e97849ae981db74a Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Tue, 28 Mar 2023 11:48:17 -0700 Subject: [PATCH] archival: don't instantiate merger on read replicas Read replicas should perform no mutations on storage state. We previously instantiated a segment merger and registered housekeeping jobs for them. This adds safeguards at a few levels to avoid this happening: - returning early on read replicas instead of constructing a segment merger - an assert in the constructor of the segment merger - explicitly not registering housekeeping jobs on read replicas (cherry picked from commit f4737f5c9c590d16067d2470f934fec8fe176488) --- src/v/archival/adjacent_segment_merger.cc | 9 +++++++-- src/v/archival/ntp_archiver_service.cc | 8 ++++---- src/v/archival/ntp_archiver_service.h | 4 ++++ src/v/cluster/partition.cc | 11 +++++++---- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/v/archival/adjacent_segment_merger.cc b/src/v/archival/adjacent_segment_merger.cc index 441531fdf176..def6a547b3ec 100644 --- a/src/v/archival/adjacent_segment_merger.cc +++ b/src/v/archival/adjacent_segment_merger.cc @@ -43,7 +43,12 @@ adjacent_segment_merger::adjacent_segment_merger( , _target_segment_size( config::shard_local_cfg().cloud_storage_segment_size_target.bind()) , _min_segment_size( - config::shard_local_cfg().cloud_storage_segment_size_min.bind()) {} + config::shard_local_cfg().cloud_storage_segment_size_min.bind()) { + vassert( + !_archiver.ntp_config().is_read_replica_mode_enabled(), + "Constructed adjacent segment merger on read replica {}", + _archiver.get_ntp()); +} ss::future<> adjacent_segment_merger::stop() { return _gate.close(); } @@ -183,4 +188,4 @@ bool adjacent_segment_merger::interrupted() const { return _as.abort_requested(); } -} // namespace archival \ No newline at end of file +} // namespace archival diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 17666eae5e5c..8eb2523636bb 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -56,7 +56,9 @@ static std::unique_ptr maybe_make_adjacent_segment_merger( ntp_archiver& self, retry_chain_logger& log, const storage::ntp_config& cfg) { std::unique_ptr result = nullptr; - if (cfg.is_archival_enabled() && !cfg.is_compacted()) { + if ( + cfg.is_archival_enabled() && !cfg.is_compacted() + && !cfg.is_read_replica_mode_enabled()) { result = std::make_unique(self, log, true); result->set_enabled(config::shard_local_cfg() .cloud_storage_enable_segment_merging.value()); @@ -100,9 +102,7 @@ ntp_archiver::ntp_archiver( // Override bucket for read-replica if (_parent.is_read_replica_mode_enabled()) { _bucket_override = _parent.get_read_replica_bucket(); - } - - if ( + } else if ( parent.log().config().is_archival_enabled() && !parent.log().config().is_compacted()) { _segment_merging_enabled.watch([this] { diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index 6d6c7ca22cc6..c057a5c405f4 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -234,6 +234,10 @@ class ntp_archiver { /// it will resume uploads. void complete_transfer_leadership(); + const storage::ntp_config& ntp_config() const { + return _parent.log().config(); + } + private: ss::future do_upload_local( upload_candidate_with_locks candidate, diff --git a/src/v/cluster/partition.cc b/src/v/cluster/partition.cc index 010260529abf..dbb5c829270a 100644 --- a/src/v/cluster/partition.cc +++ b/src/v/cluster/partition.cc @@ -494,15 +494,18 @@ void partition::maybe_construct_archiver() { // NOTE: construct and archiver even if shadow indexing isn't enabled, e.g. // in the case of read replicas -- we still need the archiver to drive // manifest updates, etc. + auto& ntp_config = _raft->log().config(); if ( config::shard_local_cfg().cloud_storage_enabled() && _cloud_storage_api.local_is_initialized() && _raft->ntp().ns == model::kafka_namespace - && (_raft->log().config().is_archival_enabled() || _raft->log().config().is_read_replica_mode_enabled())) { + && (ntp_config.is_archival_enabled() || ntp_config.is_read_replica_mode_enabled())) { _archiver = std::make_unique( - log().config(), _archival_conf, _cloud_storage_api.local(), *this); - _upload_housekeeping.local().register_jobs( - _archiver->get_housekeeping_jobs()); + ntp_config, _archival_conf, _cloud_storage_api.local(), *this); + if (!ntp_config.is_read_replica_mode_enabled()) { + _upload_housekeeping.local().register_jobs( + _archiver->get_housekeeping_jobs()); + } } }