Skip to content

Commit

Permalink
cloud_storage: refactor lazy_abort_source
Browse files Browse the repository at this point in the history
It was kind of odd that the predicate function got called
with a reference to the abort source to set the reason on it,
rather than just having the predicate return the reason.

No functional change, just a refactor.
  • Loading branch information
jcsp committed Jan 9, 2023
1 parent 7ee45fd commit 7491eec
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 29 deletions.
22 changes: 10 additions & 12 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -627,11 +627,7 @@ ntp_archiver::upload_segment(upload_candidate candidate) {
vlog(ctxlog.debug, "Uploading segment {} to {}", candidate, path);

auto lazy_abort_source = cloud_storage::lazy_abort_source{
"lost leadership or term changed during upload, "
"current leadership status: {}, "
"current term: {}, "
"original term: {}",
[this](auto& s) { return archiver_lost_leadership(s); },
[this]() { return upload_should_abort(); },
};

auto reset_func =
Expand All @@ -655,20 +651,22 @@ ntp_archiver::upload_segment(upload_candidate candidate) {
_segment_tags);
}

bool ntp_archiver::archiver_lost_leadership(
cloud_storage::lazy_abort_source& las) {
std::optional<ss::sstring> ntp_archiver::upload_should_abort() {
auto original_term = _parent.term();
auto lost_leadership = !_parent.is_elected_leader()
|| _parent.term() != original_term;
if (unlikely(lost_leadership)) {
std::string reason{las.abort_reason()};
las.abort_reason(fmt::format(
fmt::runtime(reason),
return fmt::format(
"lost leadership or term changed during upload, "
"current leadership status: {}, "
"current term: {}, "
"original term: {}",
_parent.is_elected_leader(),
_parent.term(),
original_term));
original_term);
} else {
return std::nullopt;
}
return lost_leadership;
}

ss::future<cloud_storage::upload_result>
Expand Down
2 changes: 1 addition & 1 deletion src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class ntp_archiver {
segment_path_for_candidate(const upload_candidate& candidate);

/// Method to use with lazy_abort_source
bool archiver_lost_leadership(cloud_storage::lazy_abort_source& las);
std::optional<ss::sstring> upload_should_abort();

const cloud_storage_clients::bucket_name& get_bucket_name() const;

Expand Down
12 changes: 8 additions & 4 deletions src/v/cloud_storage/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ ss::future<upload_result> remote::upload_segment(

auto reader_handle = co_await reset_str();
auto path = cloud_storage_clients::object_key(segment_path());
vlog(ctxlog.debug, "Uploading segment to path {}", segment_path);
// Segment upload attempt
auto res = co_await lease.client->put_object(
bucket,
Expand Down Expand Up @@ -626,9 +625,14 @@ ss::future<upload_result> remote::delete_object(

ss::sstring lazy_abort_source::abort_reason() const { return _abort_reason; }

bool lazy_abort_source::abort_requested() { return _predicate(*this); }
void lazy_abort_source::abort_reason(ss::sstring reason) {
_abort_reason = std::move(reason);
bool lazy_abort_source::abort_requested() {
auto maybe_abort = _predicate();
if (maybe_abort.has_value()) {
_abort_reason = *maybe_abort;
return true;
} else {
return false;
}
}

ss::future<>
Expand Down
10 changes: 3 additions & 7 deletions src/v/cloud_storage/remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,17 @@ class materialized_segments;
struct lazy_abort_source {
/// Predicate to be evaluated before an operation. Evaluates to true when
/// the operation should be aborted, false otherwise.
using predicate_t = ss::noncopyable_function<bool(lazy_abort_source&)>;
using predicate_t = ss::noncopyable_function<std::optional<ss::sstring>()>;

lazy_abort_source(ss::sstring abort_reason, predicate_t predicate)
: _abort_reason{std::move(abort_reason)}
, _predicate{std::move(predicate)} {}
lazy_abort_source(predicate_t predicate)
: _predicate{std::move(predicate)} {}

bool abort_requested();

ss::sstring abort_reason() const;

void abort_reason(ss::sstring reason);

private:
ss::sstring _abort_reason;

predicate_t _predicate;
};

Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_storage/tests/remote_partition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ using namespace cloud_storage;
inline ss::logger test_log("test"); // NOLINT

static cloud_storage::lazy_abort_source always_continue{
"no-op", [](auto&) { return false; }};
[]() { return std::nullopt; }};

static constexpr model::cloud_credentials_source config_file{
model::cloud_credentials_source::config_file};
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_storage/tests/remote_segment_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ using namespace cloud_storage;

inline ss::logger test_log("test"); // NOLINT

static cloud_storage::lazy_abort_source always_continue("no-op", [](auto&) {
return false;
static cloud_storage::lazy_abort_source always_continue([]() {
return std::nullopt;
});

/**
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_storage/tests/remote_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ static constexpr std::string_view manifest_payload = R"json({
})json";

static cloud_storage::lazy_abort_source always_continue{
"no-op", [](auto&) { return false; }};
[]() { return std::nullopt; }};

static constexpr model::cloud_credentials_source config_file{
model::cloud_credentials_source::config_file};
Expand Down Expand Up @@ -164,7 +164,7 @@ FIXTURE_TEST(
};
retry_chain_node fib(100ms, 20ms);
auto lost_leadership = lazy_abort_source{
"lost leadership", [](auto&) { return true; }};
[]() { return "lost leadership"; }};
auto res = remote
.upload_segment(
cloud_storage_clients::bucket_name("bucket"),
Expand Down

0 comments on commit 7491eec

Please sign in to comment.