Skip to content

Commit

Permalink
Delete stale index routing files.
Browse files Browse the repository at this point in the history
  • Loading branch information
Shailendra Singh committed May 30, 2024
1 parent db5240e commit d1268cb
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) {
public Builder() {
indices = new ArrayList<>();
customMetadataMap = new HashMap<>();
indicesRouting = new ArrayList<>();
}

public Builder(ClusterMetadataManifest manifest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;

import java.io.Closeable;
import java.io.IOException;
import java.sql.Blob;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -171,6 +173,7 @@ void deleteClusterMetadata(
Set<String> staleManifestPaths = new HashSet<>();
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();
Set<String> staleIndexRoutingPaths = new HashSet<>();
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
clusterName,
Expand All @@ -189,6 +192,10 @@ void deleteClusterMetadata(
.values()
.forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
}
if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V3) {
clusterMetadataManifest.getIndicesRouting()
.forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename()));
}
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
Expand Down Expand Up @@ -221,6 +228,14 @@ void deleteClusterMetadata(
attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths)
);
}
if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V3) {
clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> {
if (filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename()) == false){
staleIndexRoutingPaths.add(new BlobPath().buildAsString() + uploadedIndicesRouting.getUploadedFilename());
logger.debug("Indices routing paths in stale manifest: {}", uploadedIndicesRouting.getUploadedFilename());
}
});
}

clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
Expand All @@ -240,6 +255,7 @@ void deleteClusterMetadata(
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths));
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down Expand Up @@ -344,6 +360,14 @@ void deleteStalePaths(String clusterName, String clusterUUID, List<String> stale
);
}

void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
logger.debug(String.format(Locale.ROOT, "Deleting stale index routing files from remote - %s", stalePaths));
getBlobStoreTransferService().deleteBlobs(
BlobPath.cleanPath(),
stalePaths
);
}

/**
* Purges all remote cluster state against provided cluster UUIDs
* @param clusterState current state of the cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,13 @@ public void testDeleteClusterMetadata() throws IOException {
List<BlobMetadata> inactiveBlobs = Arrays.asList(
new PlainBlobMetadata("manifest1.dat", 1L),
new PlainBlobMetadata("manifest2.dat", 1L),
new PlainBlobMetadata("manifest3.dat", 1L)
new PlainBlobMetadata("manifest3.dat", 1L),
new PlainBlobMetadata("manifest6.dat", 1L)
);
List<BlobMetadata> activeBlobs = Arrays.asList(
new PlainBlobMetadata("manifest4.dat", 1L),
new PlainBlobMetadata("manifest5.dat", 1L)
new PlainBlobMetadata("manifest5.dat", 1L),
new PlainBlobMetadata("manifest7.dat", 1L)
);
UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1");
UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2");
Expand Down Expand Up @@ -199,6 +201,39 @@ public void testDeleteClusterMetadata() throws IOException {
.settingMetadata(settingMetadataUpdated)
.build();

List<UploadedIndexMetadata> indicesRouting1 = List.of(index1Metadata, index2Metadata);
List<UploadedIndexMetadata> indicesRouting2 = List.of(index1Metadata);
ClusterMetadataManifest manifest6 = ClusterMetadataManifest.builder()
.indices(List.of(index1UpdatedMetadata))
.globalMetadataFileName("global_metadata")
.clusterTerm(1L)
.stateVersion(1L)
.codecVersion(CODEC_V3)
.stateUUID(randomAlphaOfLength(10))
.clusterUUID(clusterUUID)
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID(ClusterState.UNKNOWN_UUID)
.committed(true)
.routingTableVersion(0L)
.indicesRouting(indicesRouting1)
.build();
ClusterMetadataManifest manifest7 = ClusterMetadataManifest.builder()
.indices(List.of(index1UpdatedMetadata))
.globalMetadataFileName("global_metadata")
.clusterTerm(1L)
.stateVersion(1L)
.codecVersion(CODEC_V3)
.stateUUID(randomAlphaOfLength(10))
.clusterUUID(clusterUUID)
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID(ClusterState.UNKNOWN_UUID)
.committed(true)
.routingTableVersion(0L)
.indicesRouting(indicesRouting2)
.build();

// active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated
ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3)
.coordinationMetadata(coordinationMetadataUpdated)
Expand All @@ -210,8 +245,11 @@ public void testDeleteClusterMetadata() throws IOException {
manifest5,
manifest1,
manifest2,
manifest3
manifest3,
manifest6,
manifest7
);

BlobContainer container = mock(BlobContainer.class);
when(blobStore.blobContainer(any())).thenReturn(container);
doNothing().when(container).deleteBlobsIgnoringIfNotExists(any());
Expand All @@ -231,6 +269,7 @@ public void testDeleteClusterMetadata() throws IOException {
+ ".dat"
)
);
verify(container).deleteBlobsIgnoringIfNotExists(List.of(new BlobPath().buildAsString() + index1Metadata.getUploadedFilename()));
Set<String> staleManifest = new HashSet<>();
inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name()));
verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest));
Expand Down

0 comments on commit d1268cb

Please sign in to comment.