Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete stale index routing files. #13909

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -297,4 +298,15 @@ protected void doStart() {
@Override
protected void doStop() {}

@Override
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
try {
logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths);
blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete some stale index routing paths from {}", stalePaths), e);
throw e;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,9 @@
protected void doClose() throws IOException {
// noop
}

@Override
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
// noop
}

Check warning on line 81 in server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java#L81

Added line #L81 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting
List<String> indicesRoutingToDelete
);

public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobMetadata;
Expand Down Expand Up @@ -72,8 +73,13 @@
private final ThreadPool threadpool;
private final ClusterApplierService clusterApplierService;
private RemoteManifestManager remoteManifestManager;
private final RemoteRoutingTableService remoteRoutingTableService;

public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
public RemoteClusterStateCleanupManager(
RemoteClusterStateService remoteClusterStateService,
ClusterService clusterService,
RemoteRoutingTableService remoteRoutingTableService
shailendra0811 marked this conversation as resolved.
Show resolved Hide resolved
) {
this.remoteClusterStateService = remoteClusterStateService;
this.remoteStateStats = remoteClusterStateService.getStats();
ClusterSettings clusterSettings = clusterService.getClusterSettings();
Expand All @@ -83,6 +89,7 @@
// initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold
this.lastCleanupAttemptStateVersion = 0;
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval);
this.remoteRoutingTableService = remoteRoutingTableService;
}

void start() {
Expand Down Expand Up @@ -170,6 +177,7 @@
Set<String> staleManifestPaths = new HashSet<>();
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();
Set<String> staleIndexRoutingPaths = new HashSet<>();
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
clusterName,
Expand All @@ -192,6 +200,10 @@
.values()
.forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
}
if (clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting()
.forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename()));
}
shailendra0811 marked this conversation as resolved.
Show resolved Hide resolved
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
Expand Down Expand Up @@ -221,6 +233,19 @@
.filter(file -> filesToKeep.contains(file) == false)
.forEach(staleGlobalMetadataPaths::add);
}
if (clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> {
if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) {
staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename());
logger.debug(
() -> new ParameterizedMessage(

Check warning on line 241 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java#L241

Added line #L241 was not covered by tests
"Indices routing paths in stale manifest: {}",
uploadedIndicesRouting.getUploadedFilename()

Check warning on line 243 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java#L243

Added line #L243 was not covered by tests
)
);
}
});
}

clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
String fileName = RemoteClusterStateUtils.getFormattedIndexFileName(uploadedIndexMetadata.getUploadedFilename());
Expand All @@ -238,6 +263,15 @@
deleteStalePaths(new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(new ArrayList<>(staleManifestPaths));
try {
remoteRoutingTableService.deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths));
} catch (IOException e) {
logger.error(
() -> new ParameterizedMessage("Error while deleting stale index routing files {}", staleIndexRoutingPaths),
e
);
remoteStateStats.indexRoutingFilesCleanupAttemptFailed();
}
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ public RemoteClusterStateService(
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
this.remoteStateStats = new RemotePersistenceStats();
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(repositoriesService, settings, clusterSettings);
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
*/
public class RemotePersistenceStats extends PersistedStateStats {
static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count";
static final String INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "index_routing_files_cleanup_attempt_failed_count";
static final String REMOTE_UPLOAD = "remote_upload";
private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0);

private AtomicLong indexRoutingFilesCleanupAttemptFailedCount = new AtomicLong(0);

public RemotePersistenceStats() {
super(REMOTE_UPLOAD);
addToExtendedFields(CLEANUP_ATTEMPT_FAILED_COUNT, cleanupAttemptFailedCount);
addToExtendedFields(INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT, indexRoutingFilesCleanupAttemptFailedCount);
}

public void cleanUpAttemptFailed() {
Expand All @@ -34,4 +38,12 @@ public void cleanUpAttemptFailed() {
public long getCleanupAttemptFailedCount() {
return cleanupAttemptFailedCount.get();
}

public void indexRoutingFilesCleanupAttemptFailed() {
indexRoutingFilesCleanupAttemptFailedCount.incrementAndGet();
}

public long getIndexRoutingFilesCleanupAttemptFailedCount() {
return indexRoutingFilesCleanupAttemptFailedCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX;
import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN;
Expand All @@ -65,6 +67,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -91,14 +94,12 @@ public void setup() {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository")
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false)
.build();

blobStoreRepository = mock(BlobStoreRepository.class);
when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor());
blobStore = mock(BlobStore.class);
blobContainer = mock(BlobContainer.class);
when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository);
when(blobStoreRepository.blobStore()).thenReturn(blobStore);

Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);

Expand Down Expand Up @@ -552,4 +553,28 @@ private BlobPath getPath() {
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64
);
}

public void testDeleteStaleIndexRoutingPaths() throws IOException {
doNothing().when(blobContainer).deleteBlobsIgnoringIfNotExists(any());
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
List<String> stalePaths = Arrays.asList("path1", "path2");
remoteRoutingTableService.doStart();
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
}

public void testDeleteStaleIndexRoutingPathsThrowsIOException() throws IOException {
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
List<String> stalePaths = Arrays.asList("path1", "path2");
// Simulate an IOException
doThrow(new IOException("test exception")).when(blobContainer).deleteBlobsIgnoringIfNotExists(Mockito.anyList());

remoteRoutingTableService.doStart();
IOException thrown = assertThrows(IOException.class, () -> {
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
});
assertEquals("test exception", thrown.getMessage());
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
}

}
Loading
Loading