From 84023b0675accd4543d8e6506d06a6271e23d4b0 Mon Sep 17 00:00:00 2001 From: ZeDRoman Date: Mon, 16 Jan 2023 17:01:46 +0300 Subject: [PATCH 1/2] controller: clean up topic orphan files When redpanda is restarted while delete operation is not finish Partition files might be left on disk. We need to cleanup orphan partition files --- src/v/cluster/controller_backend.cc | 11 +++++ src/v/cluster/controller_backend.h | 2 + src/v/storage/fs_utils.h | 21 +++++++++ src/v/storage/log_manager.cc | 70 +++++++++++++++++++++++++++++ src/v/storage/log_manager.h | 3 ++ 5 files changed, 107 insertions(+) diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 9e51e22d7ce4..687ade70802b 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -794,6 +794,8 @@ controller_backend::execute_partition_op(const topic_table::delta& delta) { "be handled by coproc::reconciliation_backend"); case op_t::del: return delete_partition(delta.ntp, rev, partition_removal_mode::global) + .then( + [this, &delta, rev]() { return cleanup_orphan_files(delta, rev); }) .then([] { return std::error_code(errc::success); }); case op_t::update: case op_t::force_abort_update: @@ -1551,6 +1553,15 @@ ss::future controller_backend::shutdown_on_current_shard( } } +ss::future<> controller_backend::cleanup_orphan_files( + const topic_table::delta& delta, model::revision_id rev) { + if (!has_local_replicas(_self, delta.new_assignment.replicas)) { + return ss::now(); + } + return _storage.local().log_mgr().remove_orphan( + _data_directory, delta.ntp, rev); +} + ss::future<> controller_backend::delete_partition( model::ntp ntp, model::revision_id rev, partition_removal_mode mode) { auto part = _partition_manager.local().get(ntp); diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index badc948e85b8..47d5e0f83a5a 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -294,6 +294,8 @@ class controller_backend model::ntp, raft::group_id, ss::shard_id, model::revision_id); ss::future<> remove_from_shard_table(model::ntp, raft::group_id, model::revision_id); + ss::future<> + cleanup_orphan_files(const topic_table::delta&, model::revision_id); ss::future<> delete_partition( model::ntp, model::revision_id, partition_removal_mode mode); template diff --git a/src/v/storage/fs_utils.h b/src/v/storage/fs_utils.h index fb5a75fa65c7..bb9bddb19cc7 100644 --- a/src/v/storage/fs_utils.h +++ b/src/v/storage/fs_utils.h @@ -62,5 +62,26 @@ struct segment_path { }; } }; +struct ntp_directory_path { + struct metadata { + model::partition_id partition_id; + model::revision_id revision_id; + }; + + /// Parse ntp directory name + static std::optional + parse_partition_directory(const ss::sstring& name) { + const std::regex re(R"(^(\d+)_(\d+)$)"); + std::cmatch match; + if (!std::regex_match(name.c_str(), match, re)) { + return std::nullopt; + } + return metadata{ + .partition_id = model::partition_id( + boost::lexical_cast(match[1].str())), + .revision_id = model::revision_id( + boost::lexical_cast(match[2].str()))}; + } +}; } // namespace storage diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index 302ea77bfbd8..946e00b826c1 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -45,6 +45,7 @@ #include #include #include +#include #include @@ -401,6 +402,75 @@ ss::future<> log_manager::remove(model::ntp ntp) { }); } +ss::future<> log_manager::remove_orphan( + ss::sstring data_directory_path, model::ntp ntp, model::revision_id rev) { + vlog(stlog.info, "Asked to remove orphan for: {} revision: {}", ntp, rev); + if (_logs.contains(ntp)) { + co_return; + } + + const auto topic_directory_path + = (std::filesystem::path(data_directory_path) / ntp.topic_path()) + .string(); + + auto topic_directory_exist = co_await ss::file_exists(topic_directory_path); + if (!topic_directory_exist) { + co_return; + } + + std::exception_ptr eptr; + try { + co_await directory_walker::walk( + topic_directory_path, + [&ntp, &topic_directory_path, &rev](ss::directory_entry entry) { + auto ntp_directory_data + = ntp_directory_path::parse_partition_directory(entry.name); + if (!ntp_directory_data) { + return ss::now(); + } + if ( + ntp_directory_data->partition_id == ntp.tp.partition + && ntp_directory_data->revision_id <= rev) { + auto ntp_directory = std::filesystem::path( + topic_directory_path) + / std::filesystem::path(entry.name); + vlog( + stlog.info, + "Cleaning up ntp [{}] rev {} directory {} ", + ntp, + ntp_directory_data->revision_id, + ntp_directory); + return ss::recursive_remove_directory(ntp_directory); + } + return ss::now(); + }); + } catch (std::filesystem::filesystem_error const&) { + eptr = std::current_exception(); + } catch (ss::broken_promise const&) { + // List directory can throw ss::broken_promise exception when directory + // was deleted while list directory is processing + eptr = std::current_exception(); + } + if (eptr) { + topic_directory_exist = co_await ss::file_exists(topic_directory_path); + if (topic_directory_exist) { + std::rethrow_exception(eptr); + } else { + vlog( + stlog.debug, + "Cleaning orphan. Topic directory was deleted: {}", + topic_directory_path); + co_return; + } + } + vlog( + stlog.info, + "Trying to clean up orphan topic directory: {}", + topic_directory_path); + co_await dispatch_topic_dir_deletion(std::move(topic_directory_path)); + co_return; +} + ss::future<> log_manager::dispatch_topic_dir_deletion(ss::sstring dir) { return ss::smp::submit_to( 0, diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index b405753f9c6f..2071f61c0c09 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -172,6 +172,9 @@ class log_manager { */ ss::future<> remove(model::ntp); + ss::future<> remove_orphan( + ss::sstring data_directory_path, model::ntp, model::revision_id); + ss::future<> stop(); ss::future> make_log_segment( From 79f90d3c9bcb0c1ee036d1ff5e1b33339ac4f50f Mon Sep 17 00:00:00 2001 From: ZeDRoman Date: Mon, 16 Jan 2023 17:01:58 +0300 Subject: [PATCH 2/2] ducktape: delete orphan partition files test --- tests/rptest/tests/topic_delete_test.py | 87 +++++++++++++++++++++---- 1 file changed, 75 insertions(+), 12 deletions(-) diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index b5222639d380..862e16f9c4c4 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -127,16 +127,23 @@ def __init__(self, test_context): self.kafka_tools = KafkaCliTools(self.redpanda) + def produce_until_partitions(self): + self.kafka_tools.produce(self.topic, 1024, 1024) + storage = self.redpanda.storage() + return len(list(storage.partitions("kafka", self.topic))) == 9 + + def dump_storage_listing(self): + for node in self.redpanda.nodes: + self.logger.error(f"Storage listing on {node.name}:") + for line in node.account.ssh_capture( + f"find {self.redpanda.DATA_DIR}"): + self.logger.error(line.strip()) + @cluster(num_nodes=3) @parametrize(with_restart=False) @parametrize(with_restart=True) def topic_delete_test(self, with_restart): - def produce_until_partitions(): - self.kafka_tools.produce(self.topic, 1024, 1024) - storage = self.redpanda.storage() - return len(list(storage.partitions("kafka", self.topic))) == 9 - - wait_until(lambda: produce_until_partitions(), + wait_until(lambda: self.produce_until_partitions(), timeout_sec=30, backoff_sec=2, err_msg="Expected partition did not materialize") @@ -160,13 +167,69 @@ def produce_until_partitions(): err_msg="Topic storage was not removed") except: - # On errors, dump listing of the storage location - for node in self.redpanda.nodes: - self.logger.error(f"Storage listing on {node.name}:") - for line in node.account.ssh_capture( - f"find {self.redpanda.DATA_DIR}"): - self.logger.error(line.strip()) + self.dump_storage_listing() + raise + + @cluster(num_nodes=3, log_allow_list=[r'filesystem error: remove failed']) + def topic_delete_orphan_files_test(self): + wait_until(lambda: self.produce_until_partitions(), + timeout_sec=30, + backoff_sec=2, + err_msg="Expected partition did not materialize") + + # Sanity check the kvstore checks: there should be at least one kvstore entry + # per partition while the topic exists. + assert sum(get_kvstore_topic_key_counts( + self.redpanda).values()) >= self.topics[0].partition_count + + down_node = self.redpanda.nodes[-1] + try: + # Make topic directory immutable to prevent deleting + down_node.account.ssh( + f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + self.kafka_tools.delete_topic(self.topic) + + def topic_deleted_on_all_nodes_except_one(redpanda, down_node, + topic_name): + storage = redpanda.storage() + log_not_removed_on_down = topic_name in next( + filter(lambda x: x.name == down_node.name, + storage.nodes)).ns["kafka"].topics + logs_removed_on_others = all( + map( + lambda n: topic_name not in n.ns["kafka"].topics, + filter(lambda x: x.name != down_node.name, + storage.nodes))) + return log_not_removed_on_down and logs_removed_on_others + try: + wait_until( + lambda: topic_deleted_on_all_nodes_except_one( + self.redpanda, down_node, self.topic), + timeout_sec=30, + backoff_sec=2, + err_msg= + "Topic storage was not removed from running nodes or removed from down node" + ) + except: + self.dump_storage_listing() + raise + + self.redpanda.stop_node(down_node) + finally: + down_node.account.ssh( + f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}") + + self.redpanda.start_node(down_node) + + try: + wait_until(lambda: topic_storage_purged(self.redpanda, self.topic), + timeout_sec=30, + backoff_sec=2, + err_msg="Topic storage was not removed") + except: + self.dump_storage_listing() raise