Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete orphan files for topics. #8185

Merged
merged 2 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,8 @@ controller_backend::execute_partition_op(const topic_table::delta& delta) {
"be handled by coproc::reconciliation_backend");
case op_t::del:
return delete_partition(delta.ntp, rev, partition_removal_mode::global)
.then(
[this, &delta, rev]() { return cleanup_orphan_files(delta, rev); })
.then([] { return std::error_code(errc::success); });
case op_t::update:
case op_t::force_abort_update:
Expand Down Expand Up @@ -1551,6 +1553,15 @@ ss::future<std::error_code> controller_backend::shutdown_on_current_shard(
}
}

ss::future<> controller_backend::cleanup_orphan_files(
const topic_table::delta& delta, model::revision_id rev) {
if (!has_local_replicas(_self, delta.new_assignment.replicas)) {
return ss::now();
}
return _storage.local().log_mgr().remove_orphan(
_data_directory, delta.ntp, rev);
}

ss::future<> controller_backend::delete_partition(
model::ntp ntp, model::revision_id rev, partition_removal_mode mode) {
auto part = _partition_manager.local().get(ntp);
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ class controller_backend
model::ntp, raft::group_id, ss::shard_id, model::revision_id);
ss::future<>
remove_from_shard_table(model::ntp, raft::group_id, model::revision_id);
ss::future<>
cleanup_orphan_files(const topic_table::delta&, model::revision_id);
ss::future<> delete_partition(
model::ntp, model::revision_id, partition_removal_mode mode);
template<typename Func>
Expand Down
21 changes: 21 additions & 0 deletions src/v/storage/fs_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,26 @@ struct segment_path {
};
}
};
struct ntp_directory_path {
struct metadata {
model::partition_id partition_id;
model::revision_id revision_id;
};

/// Parse ntp directory name
static std::optional<metadata>
parse_partition_directory(const ss::sstring& name) {
const std::regex re(R"(^(\d+)_(\d+)$)");
std::cmatch match;
if (!std::regex_match(name.c_str(), match, re)) {
return std::nullopt;
}
return metadata{
.partition_id = model::partition_id(
boost::lexical_cast<uint64_t>(match[1].str())),
.revision_id = model::revision_id(
boost::lexical_cast<uint64_t>(match[2].str()))};
}
};

} // namespace storage
70 changes: 70 additions & 0 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/file.hh>

#include <fmt/format.h>

Expand Down Expand Up @@ -401,6 +402,75 @@ ss::future<> log_manager::remove(model::ntp ntp) {
});
}

ss::future<> log_manager::remove_orphan(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coroutine?

ss::sstring data_directory_path, model::ntp ntp, model::revision_id rev) {
vlog(stlog.info, "Asked to remove orphan for: {} revision: {}", ntp, rev);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we be more specific here i.e. Asked to remove orphaned partition directory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no, because we remove all directories with rev less than provided.
We will log exact directories later

if (_logs.contains(ntp)) {
co_return;
}
Comment on lines +408 to +410
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check can be done before creating a directory path


const auto topic_directory_path
= (std::filesystem::path(data_directory_path) / ntp.topic_path())
.string();

auto topic_directory_exist = co_await ss::file_exists(topic_directory_path);
if (!topic_directory_exist) {
co_return;
}

std::exception_ptr eptr;
try {
co_await directory_walker::walk(
topic_directory_path,
[&ntp, &topic_directory_path, &rev](ss::directory_entry entry) {
auto ntp_directory_data
= ntp_directory_path::parse_partition_directory(entry.name);
if (!ntp_directory_data) {
return ss::now();
}
if (
ntp_directory_data->partition_id == ntp.tp.partition
&& ntp_directory_data->revision_id <= rev) {
auto ntp_directory = std::filesystem::path(
topic_directory_path)
/ std::filesystem::path(entry.name);
vlog(
stlog.info,
"Cleaning up ntp [{}] rev {} directory {} ",
ntp,
ntp_directory_data->revision_id,
ntp_directory);
return ss::recursive_remove_directory(ntp_directory);
}
return ss::now();
});
} catch (std::filesystem::filesystem_error const&) {
eptr = std::current_exception();
} catch (ss::broken_promise const&) {
// List directory can throw ss::broken_promise exception when directory
// was deleted while list directory is processing
eptr = std::current_exception();
}
if (eptr) {
topic_directory_exist = co_await ss::file_exists(topic_directory_path);
if (topic_directory_exist) {
std::rethrow_exception(eptr);
} else {
vlog(
stlog.debug,
"Cleaning orphan. Topic directory was deleted: {}",
topic_directory_path);
co_return;
}
}
vlog(
stlog.info,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: think we can lower this to debug (logged for every orphan cleanup).

Trying to clean up orphan topic directory:

We don't know if the topic directory is "orphan" at this point? there may be other partitions we are just attempting to schedule a deletion if it is empty.

"Trying to clean up orphan topic directory: {}",
topic_directory_path);
co_await dispatch_topic_dir_deletion(std::move(topic_directory_path));
co_return;
}

ss::future<> log_manager::dispatch_topic_dir_deletion(ss::sstring dir) {
return ss::smp::submit_to(
0,
Expand Down
3 changes: 3 additions & 0 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class log_manager {
*/
ss::future<> remove(model::ntp);

ss::future<> remove_orphan(
ss::sstring data_directory_path, model::ntp, model::revision_id);

ss::future<> stop();

ss::future<ss::lw_shared_ptr<segment>> make_log_segment(
Expand Down
87 changes: 75 additions & 12 deletions tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,23 @@ def __init__(self, test_context):

self.kafka_tools = KafkaCliTools(self.redpanda)

def produce_until_partitions(self):
self.kafka_tools.produce(self.topic, 1024, 1024)
storage = self.redpanda.storage()
return len(list(storage.partitions("kafka", self.topic))) == 9

def dump_storage_listing(self):
for node in self.redpanda.nodes:
self.logger.error(f"Storage listing on {node.name}:")
for line in node.account.ssh_capture(
f"find {self.redpanda.DATA_DIR}"):
self.logger.error(line.strip())

@cluster(num_nodes=3)
@parametrize(with_restart=False)
@parametrize(with_restart=True)
def topic_delete_test(self, with_restart):
def produce_until_partitions():
self.kafka_tools.produce(self.topic, 1024, 1024)
storage = self.redpanda.storage()
return len(list(storage.partitions("kafka", self.topic))) == 9

wait_until(lambda: produce_until_partitions(),
wait_until(lambda: self.produce_until_partitions(),
timeout_sec=30,
backoff_sec=2,
err_msg="Expected partition did not materialize")
Expand All @@ -160,13 +167,69 @@ def produce_until_partitions():
err_msg="Topic storage was not removed")

except:
# On errors, dump listing of the storage location
for node in self.redpanda.nodes:
self.logger.error(f"Storage listing on {node.name}:")
for line in node.account.ssh_capture(
f"find {self.redpanda.DATA_DIR}"):
self.logger.error(line.strip())
self.dump_storage_listing()
raise

@cluster(num_nodes=3, log_allow_list=[r'filesystem error: remove failed'])
def topic_delete_orphan_files_test(self):
wait_until(lambda: self.produce_until_partitions(),
timeout_sec=30,
backoff_sec=2,
err_msg="Expected partition did not materialize")

# Sanity check the kvstore checks: there should be at least one kvstore entry
# per partition while the topic exists.
assert sum(get_kvstore_topic_key_counts(
self.redpanda).values()) >= self.topics[0].partition_count

down_node = self.redpanda.nodes[-1]
try:
# Make topic directory immutable to prevent deleting
down_node.account.ssh(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice trick 👍

f"chattr +i {self.redpanda.DATA_DIR}/kafka/{self.topic}")

self.kafka_tools.delete_topic(self.topic)

def topic_deleted_on_all_nodes_except_one(redpanda, down_node,
topic_name):
storage = redpanda.storage()
log_not_removed_on_down = topic_name in next(
filter(lambda x: x.name == down_node.name,
storage.nodes)).ns["kafka"].topics
logs_removed_on_others = all(
map(
lambda n: topic_name not in n.ns["kafka"].topics,
filter(lambda x: x.name != down_node.name,
storage.nodes)))
return log_not_removed_on_down and logs_removed_on_others

try:
wait_until(
lambda: topic_deleted_on_all_nodes_except_one(
self.redpanda, down_node, self.topic),
timeout_sec=30,
backoff_sec=2,
err_msg=
"Topic storage was not removed from running nodes or removed from down node"
)
except:
self.dump_storage_listing()
raise

self.redpanda.stop_node(down_node)
finally:
down_node.account.ssh(
f"chattr -i {self.redpanda.DATA_DIR}/kafka/{self.topic}")

self.redpanda.start_node(down_node)

try:
wait_until(lambda: topic_storage_purged(self.redpanda, self.topic),
timeout_sec=30,
backoff_sec=2,
err_msg="Topic storage was not removed")
except:
self.dump_storage_listing()
raise


Expand Down