Skip to content

Commit

Permalink
r/vote_stm: guard from concurrent elections where we are the candidate
Browse files Browse the repository at this point in the history
Use a lock held by critical part of the voting process,
released when decision is made
  • Loading branch information
bashtanov committed Jul 22, 2024
1 parent a7f1d75 commit 97e52de
Showing 1 changed file with 68 additions and 65 deletions.
133 changes: 68 additions & 65 deletions src/v/raft/vote_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,71 +126,74 @@ ss::future<election_success> vote_stm::vote(bool leadership_transfer) {
proceed_with_election,
immediate_success,
};
return _ptr->_op_lock
.with([this, leadership_transfer] {
_config = _ptr->config();
// check again while under op_sem
if (_ptr->should_skip_vote(leadership_transfer)) {
return ss::make_ready_future<prepare_election_result>(
prepare_election_result::skip_election);
}
// 5.2.1 mark node as candidate, and update leader id
_ptr->_vstate = consensus::vote_state::candidate;
// only trigger notification when we had a leader previously
if (_ptr->_leader_id) {
_ptr->_leader_id = std::nullopt;
_ptr->trigger_leadership_notification();
}

if (_prevote && leadership_transfer) {
return ssx::now(prepare_election_result::immediate_success);
}

// 5.2.1.2
/**
* Pre-voting doesn't increase the term
*/
if (!_prevote) {
_ptr->_term += model::term_id(1);
_ptr->_voted_for = {};
}

// special case, it may happen that node requesting votes is not a
// voter, it may happen if it is a learner in previous configuration
_replies.emplace(_ptr->_self, *this);

// vote is the only method under _op_sem
_config->for_each_voter(
[this](vnode id) { _replies.emplace(id, *this); });

auto lstats = _ptr->_log->offsets();
auto last_entry_term = _ptr->get_last_entry_term(lstats);

_req = vote_request{
.node_id = _ptr->_self,
.group = _ptr->group(),
.term = _ptr->term(),
.prev_log_index = lstats.dirty_offset,
.prev_log_term = last_entry_term,
.leadership_transfer = leadership_transfer};
// we have to self vote before dispatching vote request to
// other nodes, this vote has to be done under op semaphore as
// it changes voted_for state
return self_vote().then(
[] { return prepare_election_result::proceed_with_election; });
})
.then([this](prepare_election_result result) {
switch (result) {
case prepare_election_result::skip_election:
return ss::make_ready_future<election_success>(
election_success::no);
case prepare_election_result::proceed_with_election:
return do_vote();
case prepare_election_result::immediate_success:
return ss::make_ready_future<election_success>(
election_success::yes);
}
});
return _ptr->_election_lock.with([this, leadership_transfer] {
return _ptr->_op_lock
.with([this, leadership_transfer] {
_config = _ptr->config();
// check again while under op_sem
if (_ptr->should_skip_vote(leadership_transfer)) {
return ss::make_ready_future<prepare_election_result>(
prepare_election_result::skip_election);
}
// 5.2.1 mark node as candidate, and update leader id
_ptr->_vstate = consensus::vote_state::candidate;
// only trigger notification when we had a leader previously
if (_ptr->_leader_id) {
_ptr->_leader_id = std::nullopt;
_ptr->trigger_leadership_notification();
}

if (_prevote && leadership_transfer) {
return ssx::now(prepare_election_result::immediate_success);
}

// 5.2.1.2
/**
* Pre-voting doesn't increase the term
*/
if (!_prevote) {
_ptr->_term += model::term_id(1);
_ptr->_voted_for = {};
}

// special case, it may happen that node requesting votes is not a
// voter, it may happen if it is a learner in previous
// configuration
_replies.emplace(_ptr->_self, *this);

// vote is the only method under _op_sem
_config->for_each_voter(
[this](vnode id) { _replies.emplace(id, *this); });

auto lstats = _ptr->_log->offsets();
auto last_entry_term = _ptr->get_last_entry_term(lstats);

_req = vote_request{
.node_id = _ptr->_self,
.group = _ptr->group(),
.term = _ptr->term(),
.prev_log_index = lstats.dirty_offset,
.prev_log_term = last_entry_term,
.leadership_transfer = leadership_transfer};
// we have to self vote before dispatching vote request to
// other nodes, this vote has to be done under op semaphore as
// it changes voted_for state
return self_vote().then(
[] { return prepare_election_result::proceed_with_election; });
})
.then([this](prepare_election_result result) {
switch (result) {
case prepare_election_result::skip_election:
return ss::make_ready_future<election_success>(
election_success::no);
case prepare_election_result::proceed_with_election:
return do_vote();
case prepare_election_result::immediate_success:
return ss::make_ready_future<election_success>(
election_success::yes);
}
});
});
}

ss::future<election_success> vote_stm::do_vote() {
Expand Down

0 comments on commit 97e52de

Please sign in to comment.