diff --git a/src/v/archival/archival_metadata_stm.cc b/src/v/archival/archival_metadata_stm.cc index 65fa71cab5e2..b7cdda1eb37e 100644 --- a/src/v/archival/archival_metadata_stm.cc +++ b/src/v/archival/archival_metadata_stm.cc @@ -35,12 +35,14 @@ #include "serde/envelope.h" #include "serde/serde.h" #include "ssx/future-util.h" +#include "ssx/semaphore.h" #include "storage/ntp_config.h" #include "storage/record_batch_builder.h" #include "storage/record_batch_utils.h" #include "utils/named_type.h" #include +#include #include #include #include @@ -454,19 +456,34 @@ command_batch_builder& command_batch_builder::update_highest_producer_id( ss::future command_batch_builder::replicate() { _as.check(); - return _stm.get()._lock.with([this]() { - vlog( - _stm.get()._logger.debug, "command_batch_builder::replicate called"); - auto now = ss::lowres_clock::now(); - auto timeout = now < _deadline ? _deadline - now : 0ms; - return _stm.get().do_sync(timeout, &_as).then([this](bool success) { - if (!success) { - return ss::make_ready_future(errc::not_leader); - } - auto batch = std::move(_builder).build(); - return _stm.get().do_replicate_commands(std::move(batch), _as); - }); - }); + + auto units = co_await _stm.get()._lock.get_units(_as); + + vlog(_stm.get()._logger.debug, "command_batch_builder::replicate called"); + auto now = ss::lowres_clock::now(); + auto timeout = now < _deadline ? _deadline - now : 0ms; + + // Block on syncing the STM. + auto did_sync = co_await _stm.get().do_sync(timeout, &_as); + if (!did_sync) { + co_return errc::not_leader; + } + + auto batch = std::move(_builder).build(); + auto f = _stm.get() + .do_replicate_commands(std::move(batch), _as) + .finally([u = std::move(units), h = _stm.get()._gate.hold()] {}); + + // The above do_replicate_commands call is not cancellable at every point + // due to the guarantees we need from the operation for linearizability. To + // respect callers' cancellation requests, we wrap the future in a + // cancellable future but leave the operation running. + // + // The operation can continue safely in background because it holds the + // lock and the gate. The lock also ensures that no concurrent replicate + // calls can be made and we won't leak continuations. + co_return co_await ssx::with_timeout_abortable( + std::move(f), model::no_timeout, _as); } command_batch_builder archival_metadata_stm::batch_start( @@ -795,6 +812,16 @@ ss::future archival_metadata_stm::do_sync( ss::future archival_metadata_stm::do_replicate_commands( model::record_batch batch, ss::abort_source& as) { + // It is critical that this method does not return except in the following + // cases: + // 1. The batch was successfully replicated with required consistency + // level. + // 2. The batch failed to replicate but the leader stepped down. + // + // Otherwise, it will lead to _lock and _active_operation_res being reset + // early allowing for concurrent sync and replicate calls which will lead + // to race conditions/corruption/undefined behavior. + vassert( !_lock.try_get_units().has_value(), "Attempt to replicate STM command while not under lock"); @@ -834,17 +861,11 @@ ss::future archival_metadata_stm::do_replicate_commands( auto opts = raft::replicate_options(raft::consistency_level::quorum_ack); opts.set_force_flush(); - auto fut = _raft->replicate( + + auto result = co_await _raft->replicate( current_term, model::make_memory_record_batch_reader(std::move(batch)), opts); - - // Raft's replicate() doesn't take an external abort source, and - // archiver is shut down before consensus, so we must wrap this - // with our abort source - fut = ssx::with_timeout_abortable(std::move(fut), model::no_timeout, as); - - auto result = co_await std::move(fut); if (!result) { vlog( _logger.warn, @@ -862,7 +883,7 @@ ss::future archival_metadata_stm::do_replicate_commands( } auto applied = co_await wait_no_throw( - result.value().last_offset, model::no_timeout, as); + result.value().last_offset, model::no_timeout); if (!applied) { if (as.abort_requested()) { co_return errc::shutting_down; @@ -875,9 +896,6 @@ ss::future archival_metadata_stm::do_replicate_commands( co_return errc::replication_error; } - // We are under lock so it's guaranteed that the command that will - // trigger this is replicated by this call. - op_state_reset.cancel(); co_return co_await std::move(apply_result); } diff --git a/src/v/archival/tests/archival_metadata_stm_gtest.cc b/src/v/archival/tests/archival_metadata_stm_gtest.cc index 5e3211085260..c94cbdb0d60a 100644 --- a/src/v/archival/tests/archival_metadata_stm_gtest.cc +++ b/src/v/archival/tests/archival_metadata_stm_gtest.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include using cloud_storage::segment_name; @@ -224,20 +225,29 @@ TEST_F_CORO( ASSERT_TRUE_CORO(!res); - ss::shared_promise<> may_resume_append; - available_promise reached_dispatch_append; + // Allocated on the heap as the lambda will outlive the scope of the test. + auto may_resume_append = ss::make_shared>(); + auto reached_dispatch_append = ss::make_shared>(); + + // Ensure that we always resume the append operation to allow the test to + // exit in case of failure. + auto deferred_fix_nodes = ss::defer([may_resume_append] { + if (!may_resume_append->available()) { + may_resume_append->set_value(); + } + }); auto plagued_node = co_await with_leader( 10s, [&reached_dispatch_append, &may_resume_append](raft::raft_node_instance& node) { - node.on_dispatch([&reached_dispatch_append, &may_resume_append]( + node.on_dispatch([reached_dispatch_append, may_resume_append]( model::node_id, raft::msg_type t) { if (t == raft::msg_type::append_entries) { - if (!reached_dispatch_append.available()) { - reached_dispatch_append.set_value(true); + if (!reached_dispatch_append->available()) { + reached_dispatch_append->set_value(true); } - return may_resume_append.get_shared_future(); + return may_resume_append->get_shared_future(); } return ss::now(); @@ -269,7 +279,152 @@ TEST_F_CORO( cluster::segment_validated::yes); }); - co_await reached_dispatch_append.get_future(); + co_await reached_dispatch_append->get_future(); + + // Expecting this to fail as we have the replication blocked. + auto sync_result_before_replication = co_await with_leader( + 10s, [this, &plagued_node](raft::raft_node_instance& node) mutable { + if (node.get_vnode() != plagued_node) { + throw std::runtime_error{"Leadership moved"}; + } + return get_leader_stm().sync(10ms); + }); + ASSERT_FALSE_CORO(sync_result_before_replication); + + // Subsequent calls to sync should fail too. + auto second_sync_result_before_replication = co_await with_leader( + 10s, [this, &plagued_node](raft::raft_node_instance& node) mutable { + if (node.get_vnode() != plagued_node) { + throw std::runtime_error{"Leadership moved"}; + } + return get_leader_stm().sync(10ms); + }); + ASSERT_FALSE_CORO(second_sync_result_before_replication); + + // Allow replication to progress. + may_resume_append->set_value(); + + // This sync will succeed and will wait for replication to progress. + auto synced = co_await with_leader( + 10s, [this, &plagued_node](raft::raft_node_instance& node) mutable { + if (node.get_vnode() != plagued_node) { + throw std::runtime_error{"Leadership moved"}; + } + return get_leader_stm().sync(10s); + }); + + ASSERT_TRUE_CORO(synced); + + auto slow_replication_res = co_await std::move(slow_replication_fut); + ASSERT_TRUE_CORO(!slow_replication_res); + + auto [committed_offset, term] = co_await with_leader( + 10s, [](raft::raft_node_instance& node) mutable { + return std::make_tuple( + node.raft()->committed_offset(), node.raft()->term()); + }); + + ASSERT_EQ_CORO(committed_offset, model::offset{2}); + ASSERT_EQ_CORO(term, model::term_id{1}); + + co_await wait_for_apply(); +} + +TEST_F_CORO( + archival_metadata_stm_gtest_fixture, test_same_term_sync_abort_source) { + /* + * Test that archival_metadata_stm::sync is able to sync + * within the same term and that it will wait for on-going + * replication futures to complete before doing so. To simulate + * this scenario we introduce a small delay on append entries + * response processing. + * + * Like the test above, but this time we break out of the replicate call + * by using an abort source. + */ + + ss::abort_source never_abort; + + std::vector m; + m.push_back(segment_meta{ + .base_offset = model::offset(0), + .committed_offset = model::offset(99), + .archiver_term = model::term_id(1), + .segment_term = model::term_id(1)}); + + co_await start(); + + auto res = co_await with_leader( + 10s, [this, &m, &never_abort](raft::raft_node_instance&) { + return get_leader_stm().add_segments( + m, + std::nullopt, + model::producer_id{}, + ss::lowres_clock::now() + 10s, + never_abort, + cluster::segment_validated::yes); + }); + + ASSERT_TRUE_CORO(!res); + + // Allocated on the heap as the lambda will outlive the scope of the test. + auto may_resume_append = ss::make_shared>(); + auto reached_dispatch_append = ss::make_shared>(); + + // Ensure that we always resume the append operation to allow the test to + // exit in case of failure. + auto deferred_fix_nodes = ss::defer([may_resume_append] { + if (!may_resume_append->available()) { + may_resume_append->set_value(); + } + }); + + auto plagued_node = co_await with_leader( + 10s, + [&reached_dispatch_append, + &may_resume_append](raft::raft_node_instance& node) { + node.on_dispatch([reached_dispatch_append, may_resume_append]( + model::node_id, raft::msg_type t) { + if (t == raft::msg_type::append_entries) { + if (!reached_dispatch_append->available()) { + reached_dispatch_append->set_value(true); + } + return may_resume_append->get_shared_future(); + } + + return ss::now(); + }); + + return node.get_vnode(); + }); + + m.clear(); + m.push_back(segment_meta{ + .base_offset = model::offset(100), + .committed_offset = model::offset(199), + .archiver_term = model::term_id(2), + .segment_term = model::term_id(1)}); + + ss::abort_source replication_abort_source; + auto slow_replication_fut = with_leader( + 10s, + [this, &m, &replication_abort_source, &plagued_node]( + raft::raft_node_instance& node) { + if (node.get_vnode() != plagued_node) { + throw std::runtime_error{"Leadership moved"}; + } + + return get_leader_stm().add_segments( + m, + std::nullopt, + model::producer_id{}, + ss::lowres_clock::now() + 10s, + replication_abort_source, + cluster::segment_validated::yes); + }); + + co_await reached_dispatch_append->get_future(); + replication_abort_source.request_abort(); // Expecting this to fail as we have the replication blocked. auto sync_result_before_replication = co_await with_leader( @@ -292,7 +447,7 @@ TEST_F_CORO( ASSERT_FALSE_CORO(second_sync_result_before_replication); // Allow replication to progress. - may_resume_append.set_value(); + may_resume_append->set_value(); // This sync will succeed and will wait for replication to progress. auto synced = co_await with_leader(