Skip to content

Commit

Permalink
Upload incremental cluster state on master re-election
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 committed Aug 7, 2024
1 parent 59302a3 commit 9710a15
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -459,6 +461,10 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
clusterState.term()
);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
if (isRemoteStateEnabled && publishRequest.getManifest() != null && localNode.isClusterManagerNode()) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState);
((GatewayMetaState.RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)).setLastAcceptedManifest(publishRequest.getManifest());
}
assert getLastAcceptedState() == clusterState;

return new PublishResponse(clusterState.term(), clusterState.version());
Expand Down Expand Up @@ -571,6 +577,9 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
);

persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted();
if (isRemoteStateEnabled && localNode.isClusterManagerNode()) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
}
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

Expand Down Expand Up @@ -661,14 +670,7 @@ public interface PersistedState extends Closeable {
*/
default void markLastAcceptedStateAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState);
// if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet,
// the cluster uuid might not been known yet.
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
Expand All @@ -693,6 +695,18 @@ default void markLastAcceptedStateAsCommitted() {
}
}

default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) {
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
return metadataBuilder;
}

default void close() throws IOException {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.set(incomingState);
return response;
} else {
Expand Down Expand Up @@ -220,7 +220,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
incomingState.stateUUID(),
request.bytes().length()
);
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
return response;
}
Expand Down Expand Up @@ -270,7 +270,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
lastSeenClusterState.set(clusterState);
return response;
} else {
Expand All @@ -289,13 +289,13 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
}

private PublishWithJoinResponse acceptState(ClusterState incomingState) {
private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterMetadataManifest manifest) {
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
Expand All @@ -305,7 +305,7 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
return handlePublishRequest.apply(publishRequest);
}
}
return handlePublishRequest.apply(new PublishRequest(incomingState));
return handlePublishRequest.apply(new PublishRequest(incomingState, manifest));
}

private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishRequest remotePublishRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.cluster.coordination;

import org.opensearch.cluster.ClusterState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.util.Objects;

Expand All @@ -44,15 +45,26 @@
public class PublishRequest {

private final ClusterState acceptedState;
private final ClusterMetadataManifest manifest;

public PublishRequest(ClusterState acceptedState) {
this.acceptedState = acceptedState;
this.manifest = null;
}

public PublishRequest(ClusterState acceptedState, ClusterMetadataManifest manifest) {
this.acceptedState = acceptedState;
this.manifest = manifest;
}

public ClusterState getAcceptedState() {
return acceptedState;
}

public ClusterMetadataManifest getManifest() {
return manifest;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -70,6 +82,6 @@ public int hashCode() {

@Override
public String toString() {
return "PublishRequest{term=" + acceptedState.term() + ", version=" + acceptedState.version() + ", state=" + acceptedState + '}';
return "PublishRequest{term=" + acceptedState.term() + ", version=" + acceptedState.version() + ", state=" + acceptedState + (manifest != null ? ", manifest=" + manifest : "") + '}';
}
}
34 changes: 25 additions & 9 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,11 @@ public String getLastUploadedManifestFile() {

@Override
public void setLastAcceptedState(ClusterState clusterState) {
// for non leader node, update the lastAcceptedClusterState
if (clusterState.getNodes().isLocalNodeElectedClusterManager() == false) {
lastAcceptedState = clusterState;
return;
}
try {
final RemoteClusterStateManifestInfo manifestDetails;
if (shouldWriteFullClusterState(clusterState)) {
Expand Down Expand Up @@ -730,7 +735,7 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) ==
}
assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(), clusterState) == true
: "Manifest and ClusterState are not in sync";
lastAcceptedManifest = manifestDetails.getClusterMetadataManifest();
setLastAcceptedManifest(manifestDetails.getClusterMetadataManifest());
lastAcceptedState = clusterState;
lastUploadedManifestFile = manifestDetails.getManifestFileName();
} catch (Exception e) {
Expand All @@ -739,6 +744,10 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
}
}

public void setLastAcceptedManifest(ClusterMetadataManifest manifest) {
this.lastAcceptedManifest = manifest;
}

@Override
public PersistedStateStats getStats() {
return remoteClusterStateService.getStats();
Expand All @@ -761,7 +770,6 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest,
private boolean shouldWriteFullClusterState(ClusterState clusterState) {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.term() != clusterState.term()
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) {
return true;
}
Expand All @@ -774,19 +782,27 @@ public void markLastAcceptedStateAsCommitted() {
assert lastAcceptedState != null : "Last accepted state is not present";
assert lastAcceptedManifest != null : "Last accepted manifest is not present";
ClusterState clusterState = lastAcceptedState;
Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState);
if (lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
&& lastAcceptedState.metadata().clusterUUIDCommitted() == false) {
Metadata.Builder metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
if (metadataBuilder == null) {
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
}
metadataBuilder.clusterUUIDCommitted(true);
clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build();
}
final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted(
clusterState,
lastAcceptedManifest
);
lastAcceptedManifest = committedManifestDetails.getClusterMetadataManifest();
if (clusterState.getNodes().isLocalNodeElectedClusterManager()) {
final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted(
clusterState,
lastAcceptedManifest
);
assert committedManifestDetails != null;
setLastAcceptedManifest(committedManifestDetails.getClusterMetadataManifest());
lastUploadedManifestFile = committedManifestDetails.getManifestFileName();
} else {
setLastAcceptedManifest(ClusterMetadataManifest.builder(lastAcceptedManifest).committed(true).build());
}
lastAcceptedState = clusterState;
lastUploadedManifestFile = committedManifestDetails.getManifestFileName();
} catch (Exception e) {
handleExceptionOnWrite(e);
}
Expand Down
Loading

0 comments on commit 9710a15

Please sign in to comment.