Skip to content

Commit

Permalink
Merge pull request #21521 from vbotbuildovich/backport-pr-21348-v24.1…
Browse files Browse the repository at this point in the history
….x-487

[v24.1.x] archival: keep locks until outcome is known
  • Loading branch information
piyushredpanda authored Jul 21, 2024
2 parents e2b904d + 8738d48 commit 2006379
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 33 deletions.
68 changes: 43 additions & 25 deletions src/v/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
#include "serde/envelope.h"
#include "serde/serde.h"
#include "ssx/future-util.h"
#include "ssx/semaphore.h"
#include "storage/ntp_config.h"
#include "storage/record_batch_builder.h"
#include "storage/record_batch_utils.h"
#include "utils/named_type.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/future.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/lowres_clock.hh>
Expand Down Expand Up @@ -454,19 +456,34 @@ command_batch_builder& command_batch_builder::update_highest_producer_id(

ss::future<std::error_code> command_batch_builder::replicate() {
_as.check();
return _stm.get()._lock.with([this]() {
vlog(
_stm.get()._logger.debug, "command_batch_builder::replicate called");
auto now = ss::lowres_clock::now();
auto timeout = now < _deadline ? _deadline - now : 0ms;
return _stm.get().do_sync(timeout, &_as).then([this](bool success) {
if (!success) {
return ss::make_ready_future<std::error_code>(errc::not_leader);
}
auto batch = std::move(_builder).build();
return _stm.get().do_replicate_commands(std::move(batch), _as);
});
});

auto units = co_await _stm.get()._lock.get_units(_as);

vlog(_stm.get()._logger.debug, "command_batch_builder::replicate called");
auto now = ss::lowres_clock::now();
auto timeout = now < _deadline ? _deadline - now : 0ms;

// Block on syncing the STM.
auto did_sync = co_await _stm.get().do_sync(timeout, &_as);
if (!did_sync) {
co_return errc::not_leader;
}

auto batch = std::move(_builder).build();
auto f = _stm.get()
.do_replicate_commands(std::move(batch), _as)
.finally([u = std::move(units), h = _stm.get()._gate.hold()] {});

// The above do_replicate_commands call is not cancellable at every point
// due to the guarantees we need from the operation for linearizability. To
// respect callers' cancellation requests, we wrap the future in a
// cancellable future but leave the operation running.
//
// The operation can continue safely in background because it holds the
// lock and the gate. The lock also ensures that no concurrent replicate
// calls can be made and we won't leak continuations.
co_return co_await ssx::with_timeout_abortable(
std::move(f), model::no_timeout, _as);
}

command_batch_builder archival_metadata_stm::batch_start(
Expand Down Expand Up @@ -795,6 +812,16 @@ ss::future<bool> archival_metadata_stm::do_sync(

ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
model::record_batch batch, ss::abort_source& as) {
// It is critical that this method does not return except in the following
// cases:
// 1. The batch was successfully replicated with required consistency
// level.
// 2. The batch failed to replicate but the leader stepped down.
//
// Otherwise, it will lead to _lock and _active_operation_res being reset
// early allowing for concurrent sync and replicate calls which will lead
// to race conditions/corruption/undefined behavior.

vassert(
!_lock.try_get_units().has_value(),
"Attempt to replicate STM command while not under lock");
Expand Down Expand Up @@ -834,17 +861,11 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(

auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
opts.set_force_flush();
auto fut = _raft->replicate(

auto result = co_await _raft->replicate(
current_term,
model::make_memory_record_batch_reader(std::move(batch)),
opts);

// Raft's replicate() doesn't take an external abort source, and
// archiver is shut down before consensus, so we must wrap this
// with our abort source
fut = ssx::with_timeout_abortable(std::move(fut), model::no_timeout, as);

auto result = co_await std::move(fut);
if (!result) {
vlog(
_logger.warn,
Expand All @@ -862,7 +883,7 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
}

auto applied = co_await wait_no_throw(
result.value().last_offset, model::no_timeout, as);
result.value().last_offset, model::no_timeout);
if (!applied) {
if (as.abort_requested()) {
co_return errc::shutting_down;
Expand All @@ -875,9 +896,6 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(
co_return errc::replication_error;
}

// We are under lock so it's guaranteed that the command that will
// trigger this is replicated by this call.
op_state_reset.cancel();
co_return co_await std::move(apply_result);
}

Expand Down
171 changes: 163 additions & 8 deletions src/v/archival/tests/archival_metadata_stm_gtest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <seastar/core/shared_future.hh>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/later.hh>

using cloud_storage::segment_name;
Expand Down Expand Up @@ -224,20 +225,29 @@ TEST_F_CORO(

ASSERT_TRUE_CORO(!res);

ss::shared_promise<> may_resume_append;
available_promise<bool> reached_dispatch_append;
// Allocated on the heap as the lambda will outlive the scope of the test.
auto may_resume_append = ss::make_shared<ss::shared_promise<>>();
auto reached_dispatch_append = ss::make_shared<available_promise<bool>>();

// Ensure that we always resume the append operation to allow the test to
// exit in case of failure.
auto deferred_fix_nodes = ss::defer([may_resume_append] {
if (!may_resume_append->available()) {
may_resume_append->set_value();
}
});

auto plagued_node = co_await with_leader(
10s,
[&reached_dispatch_append,
&may_resume_append](raft::raft_node_instance& node) {
node.on_dispatch([&reached_dispatch_append, &may_resume_append](
node.on_dispatch([reached_dispatch_append, may_resume_append](
model::node_id, raft::msg_type t) {
if (t == raft::msg_type::append_entries) {
if (!reached_dispatch_append.available()) {
reached_dispatch_append.set_value(true);
if (!reached_dispatch_append->available()) {
reached_dispatch_append->set_value(true);
}
return may_resume_append.get_shared_future();
return may_resume_append->get_shared_future();
}

return ss::now();
Expand Down Expand Up @@ -269,7 +279,152 @@ TEST_F_CORO(
cluster::segment_validated::yes);
});

co_await reached_dispatch_append.get_future();
co_await reached_dispatch_append->get_future();

// Expecting this to fail as we have the replication blocked.
auto sync_result_before_replication = co_await with_leader(
10s, [this, &plagued_node](raft::raft_node_instance& node) mutable {
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}
return get_leader_stm().sync(10ms);
});
ASSERT_FALSE_CORO(sync_result_before_replication);

// Subsequent calls to sync should fail too.
auto second_sync_result_before_replication = co_await with_leader(
10s, [this, &plagued_node](raft::raft_node_instance& node) mutable {
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}
return get_leader_stm().sync(10ms);
});
ASSERT_FALSE_CORO(second_sync_result_before_replication);

// Allow replication to progress.
may_resume_append->set_value();

// This sync will succeed and will wait for replication to progress.
auto synced = co_await with_leader(
10s, [this, &plagued_node](raft::raft_node_instance& node) mutable {
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}
return get_leader_stm().sync(10s);
});

ASSERT_TRUE_CORO(synced);

auto slow_replication_res = co_await std::move(slow_replication_fut);
ASSERT_TRUE_CORO(!slow_replication_res);

auto [committed_offset, term] = co_await with_leader(
10s, [](raft::raft_node_instance& node) mutable {
return std::make_tuple(
node.raft()->committed_offset(), node.raft()->term());
});

ASSERT_EQ_CORO(committed_offset, model::offset{2});
ASSERT_EQ_CORO(term, model::term_id{1});

co_await wait_for_apply();
}

TEST_F_CORO(
archival_metadata_stm_gtest_fixture, test_same_term_sync_abort_source) {
/*
* Test that archival_metadata_stm::sync is able to sync
* within the same term and that it will wait for on-going
* replication futures to complete before doing so. To simulate
* this scenario we introduce a small delay on append entries
* response processing.
*
* Like the test above, but this time we break out of the replicate call
* by using an abort source.
*/

ss::abort_source never_abort;

std::vector<cloud_storage::segment_meta> m;
m.push_back(segment_meta{
.base_offset = model::offset(0),
.committed_offset = model::offset(99),
.archiver_term = model::term_id(1),
.segment_term = model::term_id(1)});

co_await start();

auto res = co_await with_leader(
10s, [this, &m, &never_abort](raft::raft_node_instance&) {
return get_leader_stm().add_segments(
m,
std::nullopt,
model::producer_id{},
ss::lowres_clock::now() + 10s,
never_abort,
cluster::segment_validated::yes);
});

ASSERT_TRUE_CORO(!res);

// Allocated on the heap as the lambda will outlive the scope of the test.
auto may_resume_append = ss::make_shared<ss::shared_promise<>>();
auto reached_dispatch_append = ss::make_shared<available_promise<bool>>();

// Ensure that we always resume the append operation to allow the test to
// exit in case of failure.
auto deferred_fix_nodes = ss::defer([may_resume_append] {
if (!may_resume_append->available()) {
may_resume_append->set_value();
}
});

auto plagued_node = co_await with_leader(
10s,
[&reached_dispatch_append,
&may_resume_append](raft::raft_node_instance& node) {
node.on_dispatch([reached_dispatch_append, may_resume_append](
model::node_id, raft::msg_type t) {
if (t == raft::msg_type::append_entries) {
if (!reached_dispatch_append->available()) {
reached_dispatch_append->set_value(true);
}
return may_resume_append->get_shared_future();
}

return ss::now();
});

return node.get_vnode();
});

m.clear();
m.push_back(segment_meta{
.base_offset = model::offset(100),
.committed_offset = model::offset(199),
.archiver_term = model::term_id(2),
.segment_term = model::term_id(1)});

ss::abort_source replication_abort_source;
auto slow_replication_fut = with_leader(
10s,
[this, &m, &replication_abort_source, &plagued_node](
raft::raft_node_instance& node) {
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}

return get_leader_stm().add_segments(
m,
std::nullopt,
model::producer_id{},
ss::lowres_clock::now() + 10s,
replication_abort_source,
cluster::segment_validated::yes);
});

co_await reached_dispatch_append->get_future();
replication_abort_source.request_abort();

// Expecting this to fail as we have the replication blocked.
auto sync_result_before_replication = co_await with_leader(
Expand All @@ -292,7 +447,7 @@ TEST_F_CORO(
ASSERT_FALSE_CORO(second_sync_result_before_replication);

// Allow replication to progress.
may_resume_append.set_value();
may_resume_append->set_value();

// This sync will succeed and will wait for replication to progress.
auto synced = co_await with_leader(
Expand Down

0 comments on commit 2006379

Please sign in to comment.