Skip to content

Commit

Permalink
Election scheduler should be cancelled after cluster state publication
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Mar 15, 2024
1 parent 4a0feee commit 49f7a4a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ public void onFailure(String source, Exception e) {

@Override
public void onSuccess(String source) {
closePrevotingAndElectionScheduler();
applyListener.onResponse(null);
}
});
Expand Down Expand Up @@ -472,17 +473,29 @@ private static Optional<Join> joinWithDestination(Optional<Join> lastJoin, Disco
}

private void closePrevotingAndElectionScheduler() {
closePrevoting();
closeElectionScheduler();
}

private void closePrevoting() {
if (prevotingRound != null) {
prevotingRound.close();
prevotingRound = null;
}
}

private void closeElectionScheduler() {
if (electionScheduler != null) {
electionScheduler.close();
electionScheduler = null;
}
}

// package-visible for testing
boolean isElectionSchedulerRunning() {
return electionScheduler != null;
}

private void updateMaxTermSeen(final long term) {
synchronized (mutex) {
maxTermSeen = Math.max(maxTermSeen, term);
Expand Down Expand Up @@ -724,7 +737,7 @@ void becomeLeader(String method) {
lastKnownLeader = Optional.of(getLocalNode());
peerFinder.deactivate(getLocalNode());
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
closePrevoting();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());

assert leaderChecker.leader() == null : leaderChecker.leader();
Expand Down Expand Up @@ -761,7 +774,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
closePrevoting();
cancelActivePublication("become follower: " + method);
preVoteCollector.update(getPreVoteResponse(), leaderNode);

Expand Down Expand Up @@ -927,7 +940,6 @@ public void invariant() {
assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode());
assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator;
assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader;
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert becomingClusterManager || getStateForClusterManagerService().nodes().getClusterManagerNodeId() != null
: getStateForClusterManagerService();
Expand Down Expand Up @@ -972,7 +984,6 @@ assert getLocalNode().equals(applierState.nodes().getClusterManagerNode())
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
assert joinAccumulator instanceof JoinHelper.FollowerJoinAccumulator;
assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader;
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert getStateForClusterManagerService().nodes().getClusterManagerNodeId() == null : getStateForClusterManagerService();
assert leaderChecker.currentNodeIsClusterManager() == false;
Expand Down Expand Up @@ -1687,6 +1698,7 @@ public void onFailure(String source, Exception e) {
public void onSuccess(String source) {
synchronized (mutex) {
assert currentPublication.get() == CoordinatorPublication.this;
closePrevotingAndElectionScheduler();
currentPublication = Optional.empty();
logger.debug("publication ended successfully: {}", CoordinatorPublication.this);
// trigger term bump if new term was found during publication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public void testNodesJoinAfterStableCluster() {
public void testExpandsConfigurationWhenGrowingFromOneNodeToThreeButDoesNotShrink() {
try (Cluster cluster = new Cluster(1)) {
cluster.runRandomly();
cluster.stabilise();
cluster.stabilise(DEFAULT_STABILISATION_TIME * 2);

final ClusterNode leader = cluster.getAnyLeader();

Expand Down Expand Up @@ -1750,7 +1750,7 @@ public void testDoesNotPerformElectionWhenRestartingFollower() {
public void testImproveConfigurationPerformsVotingConfigExclusionStateCheck() {
try (Cluster cluster = new Cluster(1)) {
cluster.runRandomly();
cluster.stabilise();
cluster.stabilise(DEFAULT_STABILISATION_TIME * 2);

final Coordinator coordinator = cluster.getAnyLeader().coordinator;
final ClusterState currentState = coordinator.getLastAcceptedState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.ClusterNode;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.coordination.LinearizabilityChecker.History;
import org.opensearch.cluster.coordination.LinearizabilityChecker.SequentialSpec;
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
Expand Down Expand Up @@ -653,6 +654,12 @@ void stabilise(long stabilisationDurationMillis) {
leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)
);
}
if (clusterNode.coordinator.getMode() == Mode.LEADER || clusterNode.coordinator.getMode() == Mode.FOLLOWER) {
assertFalse(
"Election scheduler should stop after cluster has stabilised",
clusterNode.coordinator.isElectionSchedulerRunning()
);
}
}

final Set<String> connectedNodeIds = clusterNodes.stream()
Expand Down

0 comments on commit 49f7a4a

Please sign in to comment.