Skip to content

Commit

Permalink
Merge pull request #21565 from vbotbuildovich/backport-pr-21361-v24.1…
Browse files Browse the repository at this point in the history
….x-563

[v24.1.x] Guard from concurrent elections
  • Loading branch information
mmaslankaprv authored Jul 23, 2024
2 parents 62a7617 + ab8ee3f commit 252041c
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 79 deletions.
9 changes: 4 additions & 5 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ ss::future<> consensus::stop() {
co_await _append_requests_buffer.stop();
co_await _batcher.stop();

_election_lock.broken();
_op_lock.broken();
_deferred_flusher.cancel();
co_await _bg.close();
Expand Down Expand Up @@ -1039,9 +1040,7 @@ void consensus::dispatch_vote(bool leadership_transfer) {
}
// background
ssx::spawn_with_gate(
_bg,
[vstm = std::move(vstm),
f = std::move(f)]() mutable {
_bg, [f = std::move(f)]() mutable {
return std::move(f);
});

Expand Down Expand Up @@ -1811,15 +1810,15 @@ ss::future<vote_reply> consensus::do_vote(vote_request r) {
_term = r.term;
_voted_for = {};
term_changed = true;
do_step_down("voter_term_greater");
do_step_down("candidate_term_greater");
if (_leader_id) {
_leader_id = std::nullopt;
trigger_leadership_notification();
}

// do not grant vote if log isn't ok
if (!reply.log_ok) {
// even tough we step down we do not want to update the hbeat as it
// even though we step down we do not want to update the hbeat as it
// would cause subsequent votes to fail (_hbeat is updated by the
// leader)
_hbeat = clock_type::time_point::min();
Expand Down
15 changes: 11 additions & 4 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ class consensus {
bool _transferring_leadership{false};

/// useful for when we are not the leader
clock_type::time_point _hbeat = clock_type::now();
clock_type::time_point _hbeat = clock_type::now(); // is max() iff leader
clock_type::time_point _became_leader_at = clock_type::now();
clock_type::time_point _instantiated_at = clock_type::now();

Expand All @@ -841,15 +841,22 @@ class consensus {
/// used to wait for background ops before shutting down
ss::gate _bg;

/**
* Locks listed in the order of nestedness, election being the outermost
* and snapshot the innermost. I.e. if any of these locks are used at the
* same time, they should be acquired in the listed order and released in
* reverse order.
*/
/// guards from concurrent election where this instance is a candidate
mutex _election_lock{"consensus::election_lock"};
/// all raft operations must happen exclusively since the common case
/// is for the operation to touch the disk
mutex _op_lock{"consensus::op_lock"};
/// since snapshot state is orthogonal to raft state when writing snapshot
/// it is enough to grab the snapshot mutex, there is no need to keep
/// oplock, if the two locks are expected to be acquired at the same time
/// the snapshot lock should always be an internal (taken after the
/// _op_lock)
/// oplock
mutex _snapshot_lock{"consensus::snapshot_lock"};

/// used for notifying when commits happened to log
event_manager _event_manager;
std::unique_ptr<probe> _probe;
Expand Down
152 changes: 82 additions & 70 deletions src/v/raft/vote_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,71 +126,74 @@ ss::future<election_success> vote_stm::vote(bool leadership_transfer) {
proceed_with_election,
immediate_success,
};
return _ptr->_op_lock
.with([this, leadership_transfer] {
_config = _ptr->config();
// check again while under op_sem
if (_ptr->should_skip_vote(leadership_transfer)) {
return ss::make_ready_future<prepare_election_result>(
prepare_election_result::skip_election);
}
// 5.2.1 mark node as candidate, and update leader id
_ptr->_vstate = consensus::vote_state::candidate;
// only trigger notification when we had a leader previously
if (_ptr->_leader_id) {
_ptr->_leader_id = std::nullopt;
_ptr->trigger_leadership_notification();
}

if (_prevote && leadership_transfer) {
return ssx::now(prepare_election_result::immediate_success);
}

// 5.2.1.2
/**
* Pre-voting doesn't increase the term
*/
if (!_prevote) {
_ptr->_term += model::term_id(1);
_ptr->_voted_for = {};
}

// special case, it may happen that node requesting votes is not a
// voter, it may happen if it is a learner in previous configuration
_replies.emplace(_ptr->_self, *this);

// vote is the only method under _op_sem
_config->for_each_voter(
[this](vnode id) { _replies.emplace(id, *this); });

auto lstats = _ptr->_log->offsets();
auto last_entry_term = _ptr->get_last_entry_term(lstats);

_req = vote_request{
.node_id = _ptr->_self,
.group = _ptr->group(),
.term = _ptr->term(),
.prev_log_index = lstats.dirty_offset,
.prev_log_term = last_entry_term,
.leadership_transfer = leadership_transfer};
// we have to self vote before dispatching vote request to
// other nodes, this vote has to be done under op semaphore as
// it changes voted_for state
return self_vote().then(
[] { return prepare_election_result::proceed_with_election; });
})
.then([this](prepare_election_result result) {
switch (result) {
case prepare_election_result::skip_election:
return ss::make_ready_future<election_success>(
election_success::no);
case prepare_election_result::proceed_with_election:
return do_vote();
case prepare_election_result::immediate_success:
return ss::make_ready_future<election_success>(
election_success::yes);
}
});
return _ptr->_election_lock.with([this, leadership_transfer] {
return _ptr->_op_lock
.with([this, leadership_transfer] {
_config = _ptr->config();
// check again while under op_sem
if (_ptr->should_skip_vote(leadership_transfer)) {
return ss::make_ready_future<prepare_election_result>(
prepare_election_result::skip_election);
}
// 5.2.1 mark node as candidate, and update leader id
_ptr->_vstate = consensus::vote_state::candidate;
// only trigger notification when we had a leader previously
if (_ptr->_leader_id) {
_ptr->_leader_id = std::nullopt;
_ptr->trigger_leadership_notification();
}

if (_prevote && leadership_transfer) {
return ssx::now(prepare_election_result::immediate_success);
}

// 5.2.1.2
/**
* Pre-voting doesn't increase the term
*/
if (!_prevote) {
_ptr->_term += model::term_id(1);
_ptr->_voted_for = {};
}

// special case, it may happen that node requesting votes is not a
// voter, it may happen if it is a learner in previous
// configuration
_replies.emplace(_ptr->_self, *this);

// vote is the only method under _op_sem
_config->for_each_voter(
[this](vnode id) { _replies.emplace(id, *this); });

auto lstats = _ptr->_log->offsets();
auto last_entry_term = _ptr->get_last_entry_term(lstats);

_req = vote_request{
.node_id = _ptr->_self,
.group = _ptr->group(),
.term = _ptr->term(),
.prev_log_index = lstats.dirty_offset,
.prev_log_term = last_entry_term,
.leadership_transfer = leadership_transfer};
// we have to self vote before dispatching vote request to
// other nodes, this vote has to be done under op semaphore as
// it changes voted_for state
return self_vote().then(
[] { return prepare_election_result::proceed_with_election; });
})
.then([this](prepare_election_result result) {
switch (result) {
case prepare_election_result::skip_election:
return ss::make_ready_future<election_success>(
election_success::no);
case prepare_election_result::proceed_with_election:
return do_vote();
case prepare_election_result::immediate_success:
return ss::make_ready_future<election_success>(
election_success::yes);
}
});
});
}

ss::future<election_success> vote_stm::do_vote() {
Expand Down Expand Up @@ -342,7 +345,7 @@ ss::future<> vote_stm::wait_for_next_reply() {
ss::future<> vote_stm::wait() { return _vote_bg.close(); }

ss::future<> vote_stm::update_vote_state(ssx::semaphore_units u) {
// use reply term to update voter term
// use reply term to update our term
for (auto& [_, r] : _replies) {
if (r.value && r.value->has_value()) {
auto term = r.value->value().term;
Expand All @@ -354,7 +357,7 @@ ss::future<> vote_stm::update_vote_state(ssx::semaphore_units u) {
term);
_ptr->_term = term;
_ptr->_voted_for = {};
_ptr->_vstate = consensus::vote_state::follower;
fail_election();
co_return;
}
}
Expand All @@ -376,7 +379,7 @@ ss::future<> vote_stm::update_vote_state(ssx::semaphore_units u) {
}
if (!_success) {
vlog(_ctxlog.info, "[pre-vote: {}] vote failed", _prevote);
_ptr->_vstate = consensus::vote_state::follower;
fail_election();
co_return;
}
/**
Expand All @@ -394,7 +397,7 @@ ss::future<> vote_stm::update_vote_state(ssx::semaphore_units u) {
"[pre-vote: false] Ignoring successful vote. Node priority too low: "
"{}",
_ptr->_node_priority_override.value());
_ptr->_vstate = consensus::vote_state::follower;
fail_election();
co_return;
}

Expand All @@ -420,7 +423,7 @@ ss::future<> vote_stm::update_vote_state(ssx::semaphore_units u) {

auto ec = co_await replicate_config_as_new_leader(std::move(u));

// if we didn't replicated configuration, step down
// even if failed to replicate, don't step down: followers may be behind
if (ec) {
vlog(
_ctxlog.info,
Expand Down Expand Up @@ -476,4 +479,13 @@ ss::future<> vote_stm::self_vote() {
auto m = _replies.find(_ptr->self());
m->second.set_value(reply);
}

void vote_stm::fail_election() {
vassert(
_ptr->_vstate != consensus::vote_state::leader
&& _ptr->_hbeat != clock_type::time_point::max(),
"Became a leader outside current election");
_ptr->_vstate = consensus::vote_state::follower;
}

} // namespace raft
2 changes: 2 additions & 0 deletions src/v/raft/vote_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class vote_stm {

ss::future<> wait_for_next_reply();

void fail_election();

friend std::ostream& operator<<(std::ostream&, const vmeta&);

ss::future<election_success> do_vote();
Expand Down

0 comments on commit 252041c

Please sign in to comment.