Skip to content

Commit

Permalink
Merge pull request #9960 from ztlpn/v23.1.x-bp
Browse files Browse the repository at this point in the history
[v23.1.x] r/configuration_manager: add highest_known_offset checkpoint mutex
  • Loading branch information
ztlpn authored Apr 12, 2023
2 parents 74c0157 + ca2cde5 commit fcc11dd
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 31 deletions.
28 changes: 24 additions & 4 deletions src/v/raft/configuration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "vlog.h"

#include <seastar/core/coroutine.hh>
#include <seastar/util/defer.hh>

#include <absl/container/btree_map.h>
#include <boost/range/irange.hpp>
Expand Down Expand Up @@ -367,8 +368,8 @@ configuration_manager::start(bool reset, model::revision_id initial_revision) {
});
}

ss::future<> configuration_manager::maybe_store_highest_known_offset(
model::offset offset, size_t bytes) {
void configuration_manager::maybe_store_highest_known_offset_in_background(
model::offset offset, size_t bytes, ss::gate& gate) {
_highest_known_offset = offset;
_bytes_since_last_offset_update += bytes;

Expand All @@ -379,13 +380,32 @@ ss::future<> configuration_manager::maybe_store_highest_known_offset(
if (
_bytes_since_last_offset_update < offset_update_treshold
&& !checkpoint_hint) {
co_return;
return;
}

_bytes_since_last_offset_update = 0;
if (_hko_checkpoint_in_progress) {
return;
}

ssx::spawn_with_gate(gate, [this, bytes] {
return do_maybe_store_highest_known_offset(bytes);
});
}

ss::future<>
configuration_manager::do_maybe_store_highest_known_offset(size_t bytes) {
if (_hko_checkpoint_in_progress) {
// This could be a mutex, but it doesn't make much sense to wait on it.
// In an unlikely event checkpointing fails, immediate retry by another
// waiting fiber is likely to fail as well.
co_return;
}
_hko_checkpoint_in_progress = true;
auto deferred = ss::defer([&] { _hko_checkpoint_in_progress = false; });

co_await store_highest_known_offset();

_bytes_since_last_offset_update = 0;
_bytes_since_last_offset_update_units.return_all();
}

Expand Down
8 changes: 7 additions & 1 deletion src/v/raft/configuration_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ class configuration_manager {
* last append. Configuration manager tracks number of bytes that were
* appended since last write of `highest_known_offset` to kv-store
*/
ss::future<> maybe_store_highest_known_offset(model::offset, size_t);
void maybe_store_highest_known_offset_in_background(
model::offset, size_t bytes, ss::gate&);

/**
* Returns the highest offset for which the configuration manager
Expand Down Expand Up @@ -189,6 +190,8 @@ class configuration_manager {

void add_configuration(model::offset, group_configuration);

ss::future<> do_maybe_store_highest_known_offset(size_t bytes);

raft::group_id _group;
underlying_t _configurations;
/**
Expand All @@ -210,6 +213,9 @@ class configuration_manager {
// of data is currently pending checkpoint.
ssx::semaphore_units _bytes_since_last_offset_update_units;

// set to true when we checkpoint the highest known offset.
bool _hko_checkpoint_in_progress = false;

model::revision_id _initial_revision{};
ctx_log& _ctxlog;
configuration_idx _next_index{0};
Expand Down
9 changes: 4 additions & 5 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2410,11 +2410,10 @@ ss::future<storage::append_result> consensus::disk_append(
ssx::spawn_with_gate(
_bg, [this] { return _offset_translator.maybe_checkpoint(); });

ssx::spawn_with_gate(
_bg, [this, last_offset = ret.last_offset, sz = ret.byte_size] {
return _configuration_manager
.maybe_store_highest_known_offset(last_offset, sz);
});
_configuration_manager
.maybe_store_highest_known_offset_in_background(
ret.last_offset, ret.byte_size, _bg);

return ret;
});
});
Expand Down
36 changes: 20 additions & 16 deletions src/v/raft/offset_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,24 +333,28 @@ ss::future<> offset_translator::maybe_checkpoint() {

constexpr size_t checkpoint_threshold = 64_MiB;

co_await _checkpoint_lock.with([this]() {
if (
_bytes_processed
< _bytes_processed_at_checkpoint + checkpoint_threshold
&& !_checkpoint_hint) {
return ss::now();
}
auto maybe_locked = _checkpoint_lock.try_get_units();
if (!maybe_locked) {
// A checkpoint attempt is in progress, it doesn't make much sense to
// do another one.
co_return;
}

vlog(
_logger.trace,
"threshold reached, performing checkpoint; state: {}, "
"highest_known_offset: {} (hint={})",
_state,
_highest_known_offset,
_checkpoint_hint);
if (
_bytes_processed < _bytes_processed_at_checkpoint + checkpoint_threshold
&& !_checkpoint_hint) {
co_return;
}

return do_checkpoint();
});
vlog(
_logger.trace,
"threshold reached, performing checkpoint; state: {}, "
"highest_known_offset: {} (hint={})",
_state,
_highest_known_offset,
_checkpoint_hint);

co_await do_checkpoint();
}

ss::future<> offset_translator::do_checkpoint() {
Expand Down
12 changes: 7 additions & 5 deletions src/v/raft/tests/configuration_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ FIXTURE_TEST(test_getting_configurations, config_manager_fixture) {
BOOST_REQUIRE_EQUAL(
_cfg_mgr.get_highest_known_offset(), model::offset(1254));
validate_recovery();
_cfg_mgr
.maybe_store_highest_known_offset(
model::offset(10000),
raft::configuration_manager::offset_update_treshold + 1_KiB)
.get0();

ss::gate gate;
_cfg_mgr.maybe_store_highest_known_offset_in_background(
model::offset(10000),
raft::configuration_manager::offset_update_treshold + 1_KiB,
gate);
gate.close().get();

validate_recovery();
BOOST_REQUIRE_EQUAL(
Expand Down

0 comments on commit fcc11dd

Please sign in to comment.