diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index bf02c73ca560b..7622b07185f47 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -638,6 +638,7 @@ public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) { public Builder() { indices = new ArrayList<>(); customMetadataMap = new HashMap<>(); + indicesRouting = new ArrayList<>(); } public Builder(ClusterMetadataManifest manifest) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java index 8a106a25e5630..547a40120883a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java @@ -24,9 +24,11 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import java.io.Closeable; import java.io.IOException; +import java.sql.Blob; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -171,6 +173,7 @@ void deleteClusterMetadata( Set staleManifestPaths = new HashSet<>(); Set staleIndexMetadataPaths = new HashSet<>(); Set staleGlobalMetadataPaths = new HashSet<>(); + Set staleIndexRoutingPaths = new HashSet<>(); activeManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( clusterName, @@ -189,6 +192,10 @@ void deleteClusterMetadata( .values() .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename())); } + if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V3) { + clusterMetadataManifest.getIndicesRouting() + .forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename())); + } }); staleManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( @@ -221,6 +228,14 @@ void deleteClusterMetadata( attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths) ); } + if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V3) { + clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> { + if (filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename()) == false){ + staleIndexRoutingPaths.add(new BlobPath().buildAsString() + uploadedIndicesRouting.getUploadedFilename()); + logger.debug("Indices routing paths in stale manifest: {}", uploadedIndicesRouting.getUploadedFilename()); + } + }); + } clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { @@ -240,6 +255,7 @@ void deleteClusterMetadata( deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); + deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths)); } catch (IllegalStateException e) { logger.error("Error while fetching Remote Cluster Metadata manifests", e); } catch (IOException e) { @@ -344,6 +360,14 @@ void deleteStalePaths(String clusterName, String clusterUUID, List stale ); } + void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException { + logger.debug(String.format(Locale.ROOT, "Deleting stale index routing files from remote - %s", stalePaths)); + getBlobStoreTransferService().deleteBlobs( + BlobPath.cleanPath(), + stalePaths + ); + } + /** * Purges all remote cluster state against provided cluster UUIDs * @param clusterState current state of the cluster diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java index 24fd1b164a4ff..a547ffb1ee591 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java @@ -155,11 +155,13 @@ public void testDeleteClusterMetadata() throws IOException { List inactiveBlobs = Arrays.asList( new PlainBlobMetadata("manifest1.dat", 1L), new PlainBlobMetadata("manifest2.dat", 1L), - new PlainBlobMetadata("manifest3.dat", 1L) + new PlainBlobMetadata("manifest3.dat", 1L), + new PlainBlobMetadata("manifest6.dat", 1L) ); List activeBlobs = Arrays.asList( new PlainBlobMetadata("manifest4.dat", 1L), - new PlainBlobMetadata("manifest5.dat", 1L) + new PlainBlobMetadata("manifest5.dat", 1L), + new PlainBlobMetadata("manifest7.dat", 1L) ); UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1"); UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2"); @@ -199,6 +201,39 @@ public void testDeleteClusterMetadata() throws IOException { .settingMetadata(settingMetadataUpdated) .build(); + List indicesRouting1 = List.of(index1Metadata, index2Metadata); + List indicesRouting2 = List.of(index1Metadata); + ClusterMetadataManifest manifest6 = ClusterMetadataManifest.builder() + .indices(List.of(index1UpdatedMetadata)) + .globalMetadataFileName("global_metadata") + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting1) + .build(); + ClusterMetadataManifest manifest7 = ClusterMetadataManifest.builder() + .indices(List.of(index1UpdatedMetadata)) + .globalMetadataFileName("global_metadata") + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting2) + .build(); + // active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3) .coordinationMetadata(coordinationMetadataUpdated) @@ -210,8 +245,11 @@ public void testDeleteClusterMetadata() throws IOException { manifest5, manifest1, manifest2, - manifest3 + manifest3, + manifest6, + manifest7 ); + BlobContainer container = mock(BlobContainer.class); when(blobStore.blobContainer(any())).thenReturn(container); doNothing().when(container).deleteBlobsIgnoringIfNotExists(any()); @@ -231,6 +269,7 @@ public void testDeleteClusterMetadata() throws IOException { + ".dat" ) ); + verify(container).deleteBlobsIgnoringIfNotExists(List.of(new BlobPath().buildAsString() + index1Metadata.getUploadedFilename())); Set staleManifest = new HashSet<>(); inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name())); verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest));