Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archival: keep locks until outcome is known #21348

Merged
merged 2 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions src/v/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
#include "serde/envelope.h"
#include "serde/serde.h"
#include "ssx/future-util.h"
#include "ssx/semaphore.h"
#include "storage/ntp_config.h"
#include "storage/record_batch_builder.h"
#include "storage/record_batch_utils.h"
#include "utils/named_type.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/future.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/lowres_clock.hh>
Expand Down Expand Up @@ -456,19 +458,34 @@ command_batch_builder& command_batch_builder::update_highest_producer_id(

ss::future<std::error_code> command_batch_builder::replicate() {
_as.check();
return _stm.get()._lock.with([this]() {
vlog(
_stm.get()._logger.debug, "command_batch_builder::replicate called");
auto now = ss::lowres_clock::now();
auto timeout = now < _deadline ? _deadline - now : 0ms;
return _stm.get().do_sync(timeout, &_as).then([this](bool success) {
if (!success) {
return ss::make_ready_future<std::error_code>(errc::not_leader);
}
auto batch = std::move(_builder).build();
return _stm.get().do_replicate_commands(std::move(batch), _as);
});
});

auto units = co_await _stm.get()._lock.get_units(_as);

vlog(_stm.get()._logger.debug, "command_batch_builder::replicate called");
auto now = ss::lowres_clock::now();
auto timeout = now < _deadline ? _deadline - now : 0ms;

// Block on syncing the STM.
auto did_sync = co_await _stm.get().do_sync(timeout, &_as);
if (!did_sync) {
co_return errc::not_leader;
}

auto batch = std::move(_builder).build();
auto f = _stm.get()
.do_replicate_commands(std::move(batch), _as)
.finally([u = std::move(units), h = _stm.get()._gate.hold()] {});

// The above do_replicate_commands call is not cancellable at every point
// due to the guarantees we need from the operation for linearizability. To
// respect callers' cancellation requests, we wrap the future in a
// cancellable future but leave the operation running.
//
// The operation can continue safely in background because it holds the
// lock and the gate. The lock also ensures that no concurrent replicate
// calls can be made and we won't leak continuations.
co_return co_await ssx::with_timeout_abortable(
std::move(f), model::no_timeout, _as);
}

command_batch_builder archival_metadata_stm::batch_start(
Expand Down Expand Up @@ -799,6 +816,16 @@ ss::future<bool> archival_metadata_stm::do_sync(

ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
model::record_batch batch, ss::abort_source& as) {
// It is critical that this method does not return except in the following
// cases:
// 1. The batch was successfully replicated with required consistency
// level.
// 2. The batch failed to replicate but the leader stepped down.
//
// Otherwise, it will lead to _lock and _active_operation_res being reset
// early allowing for concurrent sync and replicate calls which will lead
// to race conditions/corruption/undefined behavior.

vassert(
nvartolomei marked this conversation as resolved.
Show resolved Hide resolved
!_lock.try_get_units().has_value(),
"Attempt to replicate STM command while not under lock");
Expand Down Expand Up @@ -838,17 +865,11 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(

auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
opts.set_force_flush();
auto fut = _raft->replicate(

auto result = co_await _raft->replicate(
current_term,
model::make_memory_record_batch_reader(std::move(batch)),
opts);

// Raft's replicate() doesn't take an external abort source, and
// archiver is shut down before consensus, so we must wrap this
// with our abort source
fut = ssx::with_timeout_abortable(std::move(fut), model::no_timeout, as);

auto result = co_await std::move(fut);
if (!result) {
vlog(
_logger.warn,
Expand All @@ -866,7 +887,7 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
}

auto applied = co_await wait_no_throw(
result.value().last_offset, model::no_timeout, as);
result.value().last_offset, model::no_timeout);
if (!applied) {
if (as.abort_requested()) {
co_return errc::shutting_down;
Expand All @@ -879,9 +900,6 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
co_return errc::replication_error;
}

// We are under lock so it's guaranteed that the command that will
// trigger this is replicated by this call.
op_state_reset.cancel();
co_return co_await std::move(apply_result);
}

Expand Down
171 changes: 163 additions & 8 deletions src/v/archival/tests/archival_metadata_stm_gtest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <seastar/core/shared_future.hh>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/later.hh>

using cloud_storage::segment_name;
Expand Down Expand Up @@ -226,20 +227,29 @@ TEST_F_CORO(

ASSERT_TRUE_CORO(!res);

ss::shared_promise<> may_resume_append;
available_promise<bool> reached_dispatch_append;
// Allocated on the heap as the lambda will outlive the scope of the test.
auto may_resume_append = ss::make_shared<ss::shared_promise<>>();
auto reached_dispatch_append = ss::make_shared<available_promise<bool>>();

// Ensure that we always resume the append operation to allow the test to
// exit in case of failure.
auto deferred_fix_nodes = ss::defer([may_resume_append] {
if (!may_resume_append->available()) {
may_resume_append->set_value();
}
});

auto plagued_node = co_await with_leader(
10s,
[&reached_dispatch_append,
&may_resume_append](raft::raft_node_instance& node) {
node.on_dispatch([&reached_dispatch_append, &may_resume_append](
node.on_dispatch([reached_dispatch_append, may_resume_append](
model::node_id, raft::msg_type t) {
if (t == raft::msg_type::append_entries) {
if (!reached_dispatch_append.available()) {
reached_dispatch_append.set_value(true);
if (!reached_dispatch_append->available()) {
reached_dispatch_append->set_value(true);
}
return may_resume_append.get_shared_future();
return may_resume_append->get_shared_future();
}

return ss::now();
Expand Down Expand Up @@ -271,7 +281,152 @@ TEST_F_CORO(
cluster::segment_validated::yes);
});

co_await reached_dispatch_append.get_future();
co_await reached_dispatch_append->get_future();

// Expecting this to fail as we have the replication blocked.
auto sync_result_before_replication = co_await with_leader(
10s, [this, &plagued_node](raft::raft_node_instance& node) mutable {
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}
return get_leader_stm().sync(10ms);
});
ASSERT_FALSE_CORO(sync_result_before_replication);

// Subsequent calls to sync should fail too.
auto second_sync_result_before_replication = co_await with_leader(
10s, [this, &plagued_node](raft::raft_node_instance& node) mutable {
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}
return get_leader_stm().sync(10ms);
});
ASSERT_FALSE_CORO(second_sync_result_before_replication);

// Allow replication to progress.
may_resume_append->set_value();

// This sync will succeed and will wait for replication to progress.
auto synced = co_await with_leader(
10s, [this, &plagued_node](raft::raft_node_instance& node) mutable {
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}
return get_leader_stm().sync(10s);
});

ASSERT_TRUE_CORO(synced);

auto slow_replication_res = co_await std::move(slow_replication_fut);
ASSERT_TRUE_CORO(!slow_replication_res);

auto [committed_offset, term] = co_await with_leader(
10s, [](raft::raft_node_instance& node) mutable {
return std::make_tuple(
node.raft()->committed_offset(), node.raft()->term());
});

ASSERT_EQ_CORO(committed_offset, model::offset{2});
ASSERT_EQ_CORO(term, model::term_id{1});

co_await wait_for_apply();
}

TEST_F_CORO(
archival_metadata_stm_gtest_fixture, test_same_term_sync_abort_source) {
/*
* Test that archival_metadata_stm::sync is able to sync
* within the same term and that it will wait for on-going
* replication futures to complete before doing so. To simulate
* this scenario we introduce a small delay on append entries
* response processing.
*
* Like the test above, but this time we break out of the replicate call
* by using an abort source.
*/

ss::abort_source never_abort;

std::vector<cloud_storage::segment_meta> m;
m.push_back(segment_meta{
.base_offset = model::offset(0),
.committed_offset = model::offset(99),
.archiver_term = model::term_id(1),
.segment_term = model::term_id(1)});

co_await start();

auto res = co_await with_leader(
10s, [this, &m, &never_abort](raft::raft_node_instance&) {
return get_leader_stm().add_segments(
m,
std::nullopt,
model::producer_id{},
ss::lowres_clock::now() + 10s,
never_abort,
cluster::segment_validated::yes);
});

ASSERT_TRUE_CORO(!res);

// Allocated on the heap as the lambda will outlive the scope of the test.
auto may_resume_append = ss::make_shared<ss::shared_promise<>>();
auto reached_dispatch_append = ss::make_shared<available_promise<bool>>();

// Ensure that we always resume the append operation to allow the test to
// exit in case of failure.
auto deferred_fix_nodes = ss::defer([may_resume_append] {
if (!may_resume_append->available()) {
may_resume_append->set_value();
}
});

auto plagued_node = co_await with_leader(
10s,
[&reached_dispatch_append,
&may_resume_append](raft::raft_node_instance& node) {
node.on_dispatch([reached_dispatch_append, may_resume_append](
model::node_id, raft::msg_type t) {
if (t == raft::msg_type::append_entries) {
if (!reached_dispatch_append->available()) {
reached_dispatch_append->set_value(true);
}
return may_resume_append->get_shared_future();
}

return ss::now();
});

return node.get_vnode();
});

m.clear();
m.push_back(segment_meta{
.base_offset = model::offset(100),
.committed_offset = model::offset(199),
.archiver_term = model::term_id(2),
.segment_term = model::term_id(1)});

ss::abort_source replication_abort_source;
auto slow_replication_fut = with_leader(
10s,
[this, &m, &replication_abort_source, &plagued_node](
raft::raft_node_instance& node) {
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}

return get_leader_stm().add_segments(
m,
std::nullopt,
model::producer_id{},
ss::lowres_clock::now() + 10s,
replication_abort_source,
cluster::segment_validated::yes);
});

co_await reached_dispatch_append->get_future();
replication_abort_source.request_abort();

// Expecting this to fail as we have the replication blocked.
auto sync_result_before_replication = co_await with_leader(
Expand All @@ -294,7 +449,7 @@ TEST_F_CORO(
ASSERT_FALSE_CORO(second_sync_result_before_replication);

// Allow replication to progress.
may_resume_append.set_value();
may_resume_append->set_value();

// This sync will succeed and will wait for replication to progress.
auto synced = co_await with_leader(
Expand Down