diff --git a/src/v/raft/state_machine_manager.cc b/src/v/raft/state_machine_manager.cc index dee348909973..6ccd43c363ef 100644 --- a/src/v/raft/state_machine_manager.cc +++ b/src/v/raft/state_machine_manager.cc @@ -215,7 +215,10 @@ ss::future<> state_machine_manager::apply_raft_snapshot() { } auto fut = co_await ss::coroutine::as_future( - do_apply_raft_snapshot(std::move(snapshot->metadata), snapshot->reader)); + acquire_background_apply_mutexes().then([&, this](auto units) mutable { + return do_apply_raft_snapshot( + std::move(snapshot->metadata), snapshot->reader, std::move(units)); + })); co_await snapshot->reader.close(); if (fut.failed()) { const auto e = fut.get_exception(); @@ -228,7 +231,9 @@ ss::future<> state_machine_manager::apply_raft_snapshot() { } ss::future<> state_machine_manager::do_apply_raft_snapshot( - snapshot_metadata metadata, storage::snapshot_reader& reader) { + snapshot_metadata metadata, + storage::snapshot_reader& reader, + std::vector background_apply_units) { const auto snapshot_file_sz = co_await reader.get_snapshot_size(); const auto last_offset = metadata.last_included_index; @@ -276,6 +281,7 @@ ss::future<> state_machine_manager::do_apply_raft_snapshot( }); } _next = model::next_offset(metadata.last_included_index); + background_apply_units.clear(); } ss::future<> state_machine_manager::apply_snapshot_to_stm( diff --git a/src/v/raft/state_machine_manager.h b/src/v/raft/state_machine_manager.h index 18d1ca8db09e..e60d36446df7 100644 --- a/src/v/raft/state_machine_manager.h +++ b/src/v/raft/state_machine_manager.h @@ -152,7 +152,9 @@ class state_machine_manager final { ss::future<> apply_raft_snapshot(); ss::future<> do_apply_raft_snapshot( - raft::snapshot_metadata metadata, storage::snapshot_reader& reader); + raft::snapshot_metadata metadata, + storage::snapshot_reader& reader, + std::vector background_apply_units); ss::future<> apply(); ss::future<> try_apply_in_foreground(); diff --git a/src/v/raft/tests/stm_manager_test.cc b/src/v/raft/tests/stm_manager_test.cc index 7452ad6d4f04..4044a864bac3 100644 --- a/src/v/raft/tests/stm_manager_test.cc +++ b/src/v/raft/tests/stm_manager_test.cc @@ -75,6 +75,47 @@ struct local_snapshot_stm : public simple_kv { }; }; +// State machine that induces lag from the tip of +// of the log +class slow_kv : public simple_kv { +public: + static constexpr std::string_view name = "slow_kv"; + + explicit slow_kv(raft_node_instance& rn) + : simple_kv(rn) {} + + ss::future<> apply(const model::record_batch& batch) override { + co_await ss::sleep(5ms); + co_return co_await simple_kv::apply(batch); + } + + ss::future<> apply_raft_snapshot(const iobuf&) override { + return ss::now(); + } +}; + +// Fails the first apply, starts a background fiber and not lets the +// background apply fiber finish relative to slow_kv +class bg_only_kv : public slow_kv { +public: + static constexpr std::string_view name = "bg_only_stm"; + + explicit bg_only_kv(raft_node_instance& rn) + : slow_kv(rn) {} + + ss::future<> apply(const model::record_batch& batch) override { + if (_first_apply) { + _first_apply = false; + throw std::runtime_error("induced failure"); + } + co_await ss::sleep(5ms); + co_return co_await slow_kv::apply(batch); + } + +private: + bool _first_apply = true; +}; + TEST_F_CORO(state_machine_fixture, test_basic_apply) { /** * Create 3 replicas group with simple_kv STM @@ -100,6 +141,40 @@ TEST_F_CORO(state_machine_fixture, test_basic_apply) { } } +TEST_F_CORO(state_machine_fixture, test_snapshot_with_bg_fibers) { + create_nodes(); + std::vector> stms; + for (auto& [id, node] : nodes()) { + raft::state_machine_manager_builder builder; + auto slow_kv_stm = builder.create_stm(*node); + auto bg_kv_stm = builder.create_stm(*node); + co_await node->init_and_start(all_vnodes(), std::move(builder)); + stms.push_back(ss::dynamic_pointer_cast(slow_kv_stm)); + stms.push_back(ss::dynamic_pointer_cast(bg_kv_stm)); + } + auto& leader_node = node(co_await wait_for_leader(10s)); + bool stop = false; + auto write_sleep_f = ss::do_until( + [&stop] { return stop; }, + [&] { + return build_random_state(1000).discard_result().then( + [] { return ss::sleep(3ms); }); + }); + + auto truncate_sleep_f = ss::do_until( + [&stop] { return stop; }, + [&] { + return leader_node.raft() + ->write_snapshot({leader_node.raft()->committed_offset(), iobuf{}}) + .then([] { return ss::sleep(3ms); }); + }); + + co_await ss::sleep(10s); + stop = true; + co_await std::move(write_sleep_f); + co_await std::move(truncate_sleep_f); +} + TEST_F_CORO(state_machine_fixture, test_apply_throwing_exception) { /** * Create 3 replicas group with simple_kv STM