From 9710a15b7534bfc975a9437ca942d1ac77582364 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 7 Aug 2024 12:30:53 +0530 Subject: [PATCH] Upload incremental cluster state on master re-election Signed-off-by: Shivansh Arora --- .../coordination/CoordinationState.java | 30 +++++-- .../PublicationTransportHandler.java | 12 +-- .../cluster/coordination/PublishRequest.java | 14 +++- .../opensearch/gateway/GatewayMetaState.java | 34 +++++--- .../GatewayMetaStatePersistedStateTests.java | 78 ++++++++++--------- 5 files changed, 107 insertions(+), 61 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 7fa63ae8abc62..efca3396dfb2e 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -41,6 +41,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.gateway.GatewayMetaState; +import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.Closeable; import java.io.IOException; @@ -459,6 +461,10 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) { clusterState.term() ); persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState); + if (isRemoteStateEnabled && publishRequest.getManifest() != null && localNode.isClusterManagerNode()) { + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState); + ((GatewayMetaState.RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)).setLastAcceptedManifest(publishRequest.getManifest()); + } assert getLastAcceptedState() == clusterState; return new PublishResponse(clusterState.term(), clusterState.version()); @@ -571,6 +577,9 @@ public void handleCommit(ApplyCommitRequest applyCommit) { ); persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted(); + if (isRemoteStateEnabled && localNode.isClusterManagerNode()) { + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted(); + } assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()); } @@ -661,14 +670,7 @@ public interface PersistedState extends Closeable { */ default void markLastAcceptedStateAsCommitted() { final ClusterState lastAcceptedState = getLastAcceptedState(); - Metadata.Builder metadataBuilder = null; - if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) { - final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata()) - .lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration()) - .build(); - metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); - metadataBuilder.coordinationMetadata(coordinationMetadata); - } + Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState); // if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet, // the cluster uuid might not been known yet. assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false @@ -693,6 +695,18 @@ default void markLastAcceptedStateAsCommitted() { } } + default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) { + Metadata.Builder metadataBuilder = null; + if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) { + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata()) + .lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration()) + .build(); + metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); + metadataBuilder.coordinationMetadata(coordinationMetadata); + } + return metadataBuilder; + } + default void close() throws IOException {} } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 62885a12222be..d85385eecbcc5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -189,7 +189,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); - final PublishWithJoinResponse response = acceptState(incomingState); + final PublishWithJoinResponse response = acceptState(incomingState, null); lastSeenClusterState.set(incomingState); return response; } else { @@ -220,7 +220,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque incomingState.stateUUID(), request.bytes().length() ); - final PublishWithJoinResponse response = acceptState(incomingState); + final PublishWithJoinResponse response = acceptState(incomingState, null); lastSeenClusterState.compareAndSet(lastSeen, incomingState); return response; } @@ -270,7 +270,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest true ); fullClusterStateReceivedCount.incrementAndGet(); - final PublishWithJoinResponse response = acceptState(clusterState); + final PublishWithJoinResponse response = acceptState(clusterState, manifest); lastSeenClusterState.set(clusterState); return response; } else { @@ -289,13 +289,13 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest transportService.getLocalNode().getId() ); compatibleClusterStateDiffReceivedCount.incrementAndGet(); - final PublishWithJoinResponse response = acceptState(clusterState); + final PublishWithJoinResponse response = acceptState(clusterState, manifest); lastSeenClusterState.compareAndSet(lastSeen, clusterState); return response; } } - private PublishWithJoinResponse acceptState(ClusterState incomingState) { + private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterMetadataManifest manifest) { // if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation) if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) { final PublishRequest publishRequest = currentPublishRequestToSelf.get(); @@ -305,7 +305,7 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) { return handlePublishRequest.apply(publishRequest); } } - return handlePublishRequest.apply(new PublishRequest(incomingState)); + return handlePublishRequest.apply(new PublishRequest(incomingState, manifest)); } private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishRequest remotePublishRequest) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java index e7c3e2d2c965b..f361cc40806db 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.coordination; import org.opensearch.cluster.ClusterState; +import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.util.Objects; @@ -44,15 +45,26 @@ public class PublishRequest { private final ClusterState acceptedState; + private final ClusterMetadataManifest manifest; public PublishRequest(ClusterState acceptedState) { this.acceptedState = acceptedState; + this.manifest = null; + } + + public PublishRequest(ClusterState acceptedState, ClusterMetadataManifest manifest) { + this.acceptedState = acceptedState; + this.manifest = manifest; } public ClusterState getAcceptedState() { return acceptedState; } + public ClusterMetadataManifest getManifest() { + return manifest; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -70,6 +82,6 @@ public int hashCode() { @Override public String toString() { - return "PublishRequest{term=" + acceptedState.term() + ", version=" + acceptedState.version() + ", state=" + acceptedState + '}'; + return "PublishRequest{term=" + acceptedState.term() + ", version=" + acceptedState.version() + ", state=" + acceptedState + (manifest != null ? ", manifest=" + manifest : "") + '}'; } } diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 80ba57b7db4a9..fb409421195e2 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -699,6 +699,11 @@ public String getLastUploadedManifestFile() { @Override public void setLastAcceptedState(ClusterState clusterState) { + // for non leader node, update the lastAcceptedClusterState + if (clusterState.getNodes().isLocalNodeElectedClusterManager() == false) { + lastAcceptedState = clusterState; + return; + } try { final RemoteClusterStateManifestInfo manifestDetails; if (shouldWriteFullClusterState(clusterState)) { @@ -730,7 +735,7 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == } assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(), clusterState) == true : "Manifest and ClusterState are not in sync"; - lastAcceptedManifest = manifestDetails.getClusterMetadataManifest(); + setLastAcceptedManifest(manifestDetails.getClusterMetadataManifest()); lastAcceptedState = clusterState; lastUploadedManifestFile = manifestDetails.getManifestFileName(); } catch (Exception e) { @@ -739,6 +744,10 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest( } } + public void setLastAcceptedManifest(ClusterMetadataManifest manifest) { + this.lastAcceptedManifest = manifest; + } + @Override public PersistedStateStats getStats() { return remoteClusterStateService.getStats(); @@ -761,7 +770,6 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, private boolean shouldWriteFullClusterState(ClusterState clusterState) { if (lastAcceptedState == null || lastAcceptedManifest == null - || lastAcceptedState.term() != clusterState.term() || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) { return true; } @@ -774,19 +782,27 @@ public void markLastAcceptedStateAsCommitted() { assert lastAcceptedState != null : "Last accepted state is not present"; assert lastAcceptedManifest != null : "Last accepted manifest is not present"; ClusterState clusterState = lastAcceptedState; + Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState); if (lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false && lastAcceptedState.metadata().clusterUUIDCommitted() == false) { - Metadata.Builder metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); + if (metadataBuilder == null) { + metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); + } metadataBuilder.clusterUUIDCommitted(true); clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build(); } - final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted( - clusterState, - lastAcceptedManifest - ); - lastAcceptedManifest = committedManifestDetails.getClusterMetadataManifest(); + if (clusterState.getNodes().isLocalNodeElectedClusterManager()) { + final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted( + clusterState, + lastAcceptedManifest + ); + assert committedManifestDetails != null; + setLastAcceptedManifest(committedManifestDetails.getClusterMetadataManifest()); + lastUploadedManifestFile = committedManifestDetails.getManifestFileName(); + } else { + setLastAcceptedManifest(ClusterMetadataManifest.builder(lastAcceptedManifest).committed(true).build()); + } lastAcceptedState = clusterState; - lastUploadedManifestFile = committedManifestDetails.getManifestFileName(); } catch (Exception e) { handleExceptionOnWrite(e); } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 8e8d80c870ddf..64beae9d9067d 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -205,9 +205,13 @@ public void testSetCurrentTerm() throws IOException { } } - private ClusterState createClusterState(long version, Metadata metadata) { + private ClusterState createClusterState(long version, Metadata metadata, boolean isClusterManagerNode) { + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()); + if (isClusterManagerNode) { + nodesBuilder.clusterManagerNodeId(localNode.getId()); + } return ClusterState.builder(clusterName) - .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build()) + .nodes(nodesBuilder.build()) .version(version) .metadata(metadata) .build(); @@ -266,7 +270,7 @@ public void testSetLastAcceptedState() throws IOException { .coordinationMetadata(createCoordinationMetadata(term)) .put(indexMetadata, false) .build(); - ClusterState state = createClusterState(version, metadata); + ClusterState state = createClusterState(version, metadata, false); gateway.setLastAcceptedState(state); gateway = maybeNew(gateway); @@ -291,8 +295,8 @@ public void testSetLastAcceptedStateTermChanged() throws IOException { final IndexMetadata indexMetadata = createIndexMetadata(indexName, numberOfShards, version); final ClusterState state = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build() - ); + Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build(), + false); gateway.setLastAcceptedState(state); gateway = maybeNew(gateway); @@ -301,8 +305,8 @@ public void testSetLastAcceptedStateTermChanged() throws IOException { final IndexMetadata newIndexMetadata = createIndexMetadata(indexName, newNumberOfShards, version); final ClusterState newClusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(createCoordinationMetadata(newTerm)).put(newIndexMetadata, false).build() - ); + Metadata.builder().coordinationMetadata(createCoordinationMetadata(newTerm)).put(newIndexMetadata, false).build(), + false); gateway.setLastAcceptedState(newClusterState); gateway = maybeNew(gateway); @@ -324,8 +328,8 @@ public void testCurrentTermAndTermAreDifferent() throws IOException { gateway.setLastAcceptedState( createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(term).build()).build() - ) + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(term).build()).build(), + false) ); gateway = maybeNew(gateway); @@ -349,8 +353,8 @@ public void testMarkAcceptedConfigAsCommitted() throws IOException { ClusterState state = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(coordinationMetadata).clusterUUID(randomAlphaOfLength(10)).build() - ); + Metadata.builder().coordinationMetadata(coordinationMetadata).clusterUUID(randomAlphaOfLength(10)).build(), + false); gateway.setLastAcceptedState(state); gateway = maybeNew(gateway); @@ -395,8 +399,8 @@ public void testStatePersistedOnLoad() throws IOException { ); final ClusterState state = createClusterState( randomNonNegativeLong(), - Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build() - ); + Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build(), + false); try ( GatewayMetaState.LucenePersistedState ignored = new GatewayMetaState.LucenePersistedState( persistedClusterStateService, @@ -519,8 +523,8 @@ public void testDataOnlyNodePersistence() throws Exception { ClusterState state = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(coordinationMetadata).clusterUUID(randomAlphaOfLength(10)).build() - ); + Metadata.builder().coordinationMetadata(coordinationMetadata).clusterUUID(randomAlphaOfLength(10)).build(), + false); persistedState.setCurrentTerm(state.term()); persistedState.setLastAcceptedState(state); assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten())); @@ -584,8 +588,8 @@ public void testDataOnlyNodePersistence() throws Exception { final IndexMetadata indexMetadata = createIndexMetadata(indexName, numberOfShards, i); state = createClusterState( state.version() + 1, - Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build() - ); + Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build(), + false); persistedState.setLastAcceptedState(state); } } @@ -629,7 +633,7 @@ Directory createDirectory(Path path) { return wrapper; } }; - ClusterState state = createClusterState(randomNonNegativeLong(), Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build()); + ClusterState state = createClusterState(randomNonNegativeLong(), Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build(), false); long currentTerm = 42L; try ( GatewayMetaState.LucenePersistedState persistedState = new GatewayMetaState.LucenePersistedState( @@ -643,8 +647,8 @@ Directory createDirectory(Path path) { if (randomBoolean()) { final ClusterState newState = createClusterState( randomNonNegativeLong(), - Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build() - ); + Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build(), + false); persistedState.setLastAcceptedState(newState); state = newState; } else { @@ -672,7 +676,7 @@ Directory createDirectory(Path path) { .coordinationMetadata(createCoordinationMetadata(1L)) .put(indexMetadata, false) .build(); - state = createClusterState(version, metadata); + state = createClusterState(version, metadata, false); persistedState.setLastAcceptedState(state); } else { currentTerm += 1; @@ -738,8 +742,8 @@ public void testRemotePersistedState() throws IOException { final long clusterTerm = randomNonNegativeLong(); final ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() - ); + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + false); remotePersistedState.setLastAcceptedState(clusterState); Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState, previousClusterUUID); @@ -749,8 +753,8 @@ public void testRemotePersistedState() throws IOException { final ClusterState secondClusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() - ); + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + false); remotePersistedState.setLastAcceptedState(secondClusterState); Mockito.verify(remoteClusterStateService, times(1)).writeFullMetadata(secondClusterState, previousClusterUUID); @@ -802,8 +806,8 @@ public void testRemotePersistedStateNotCommitted() throws IOException { final long clusterTerm = randomNonNegativeLong(); ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() - ); + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + false); clusterState = ClusterState.builder(clusterState) .metadata(Metadata.builder(clusterState.getMetadata()).clusterUUID(randomAlphaOfLength(10)).clusterUUIDCommitted(false).build()) .build(); @@ -825,8 +829,8 @@ public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOExcept final long clusterTerm = randomNonNegativeLong(); final ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() - ); + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true); assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(clusterState)); } @@ -843,8 +847,8 @@ public void testRemotePersistedStateFailureStats() throws IOException { final long clusterTerm = randomNonNegativeLong(); final ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() - ); + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true); assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(clusterState)); assertEquals(1, remoteClusterStateService.getStats().getFailedCount()); @@ -944,8 +948,8 @@ public void testGatewayForRemoteStateForNodeReplacement() throws IOException { false ) .clusterUUID(randomAlphaOfLength(10)) - .build() - ); + .build(), + false); when(remoteClusterStateService.getLastKnownUUIDFromRemote(clusterName.value())).thenReturn( previousState.metadata().clusterUUID() ); @@ -990,8 +994,8 @@ public void testGatewayForRemoteStateForNodeReboot() throws IOException { .coordinationMetadata(CoordinationMetadata.builder().term(randomLong()).build()) .put(indexMetadata, false) .clusterUUID(randomAlphaOfLength(10)) - .build() - ); + .build(), + false); gateway = newGatewayForRemoteState( remoteClusterStateService, remoteStoreRestoreService, @@ -1036,8 +1040,8 @@ public void testGatewayForRemoteStateForInitialBootstrapBlocksApplied() throws I .put(indexMetadata, false) .clusterUUID(ClusterState.UNKNOWN_UUID) .persistentSettings(Settings.builder().put(Metadata.SETTING_READ_ONLY_SETTING.getKey(), true).build()) - .build() - ) + .build(), + false) ).nodes(DiscoveryNodes.EMPTY_NODES).build(); final RemoteStoreRestoreService remoteStoreRestoreService = mock(RemoteStoreRestoreService.class);