Skip to content

Commit

Permalink
Remove orphan timestamps post create snapshot completion
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Oct 1, 2024
1 parent 43e7597 commit 0503f63
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.remotestore;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.core.rest.RestStatus;
Expand All @@ -41,6 +43,7 @@
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryData;
Expand All @@ -62,8 +65,10 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -73,6 +78,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.snapshots.SnapshotsService.getPinningEntity;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -753,17 +759,15 @@ public void testInvalidRestoreRequestScenarios() throws Exception {
assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.segment.repository]" + " on restore"));
}

public void testCreateSnapshotV2() throws Exception {
public void testCreateSnapshotV2_Orphan_Timestamp_Cleanup() throws Exception {
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
Expand All @@ -787,27 +791,36 @@ public void testCreateSnapshotV2() throws Exception {
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

// create an orphan timestamp related to this repo
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
internalCluster().getClusterManagerName()
);
long pinnedTimestamp = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<Void> latchedActionListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {}
}, latch);

remoteStorePinnedTimestampService.pinTimestamp(
pinnedTimestamp,
getPinningEntity(snapshotRepoName, "some_uuid"),
latchedActionListener
);
latch.await();

SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList());
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));

indexDocuments(client, indexName1, 10);
indexDocuments(client, indexName2, 20);

createIndex(indexName3, indexSettings);
indexDocuments(client, indexName3, 10);

String snapshotName2 = "test-create-snapshot2";

// verify response status if waitForCompletion is not true
RestStatus createSnapshotResponseStatus = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.get()
.status();
assertEquals(RestStatus.ACCEPTED, createSnapshotResponseStatus);
forceSyncPinnedTimestamps();
waitUntil(() -> 1 == RemoteStorePinnedTimestampService.getPinnedEntities().size());
}

public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception {
Expand Down Expand Up @@ -879,6 +892,7 @@ public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));
assertEquals(RemoteStorePinnedTimestampService.getPinnedEntities().size(), 1);

}

Expand Down Expand Up @@ -955,6 +969,7 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1));
assertEquals(RemoteStorePinnedTimestampService.getPinnedEntities().size(), repositoryData.getSnapshotIds().size());
}

public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Exception {
Expand Down Expand Up @@ -1017,13 +1032,93 @@ public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Except
logger.info("Exception while creating new-snapshot", e);
}

AtomicLong totalSnaps = new AtomicLong();

// Validate that snapshot is present in repository data
assertBusy(() -> {
GetSnapshotsRequest request = new GetSnapshotsRequest(snapshotRepoName);
GetSnapshotsResponse response2 = client().admin().cluster().getSnapshots(request).actionGet();
assertThat(response2.getSnapshots().size(), greaterThanOrEqualTo(1));
totalSnaps.set(response2.getSnapshots().size());

}, 30, TimeUnit.SECONDS);
thread.join();
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
internalCluster().getClusterManagerName()
);
forceSyncPinnedTimestamps();
assertEquals(RemoteStorePinnedTimestampService.getPinnedEntities().size(), totalSnaps.intValue());
}

public void testCreateSnapshotV2() throws Exception {
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);

createRepository(snapshotRepoName, FsRepository.TYPE, settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList());
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));

indexDocuments(client, indexName1, 10);
indexDocuments(client, indexName2, 20);

createIndex(indexName3, indexSettings);
indexDocuments(client, indexName3, 10);

String snapshotName2 = "test-create-snapshot2";

// verify response status if waitForCompletion is not true
RestStatus createSnapshotResponseStatus = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.get()
.status();
assertEquals(RestStatus.ACCEPTED, createSnapshotResponseStatus);

assertEquals(2, RemoteStorePinnedTimestampService.getPinnedEntities().size());
}

public void forceSyncPinnedTimestamps() {
// for all nodes , run forceSyncPinnedTimestamps()
for (String node : internalCluster().getNodeNames()) {
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
node
);
remoteStorePinnedTimestampService.forceSyncPinnedTimestamps();
}
}

public void testCreateSnapshotV2WithRedIndex() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -49,6 +52,7 @@
public class RemoteStorePinnedTimestampService implements Closeable {
private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class);
private static Tuple<Long, Set<Long>> pinnedTimestampsSet = new Tuple<>(-1L, Set.of());
private static HashMap<String, List<Long>> pinnedEntityToTimestampsMap = new HashMap<>();
public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps";
public static final String PINNED_TIMESTAMPS_FILENAME_SEPARATOR = "__";

Expand Down Expand Up @@ -216,6 +220,16 @@ private long getTimestampFromBlobName(String blobName) {
return -1;
}

private String getEntityFromBlobName(String blobName) {
String[] blobNameTokens = blobName.split(PINNED_TIMESTAMPS_FILENAME_SEPARATOR);
if (blobNameTokens.length < 2) {
String errorMessage = "Pinned timestamps blob name contains invalid format: " + blobName;
logger.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
return String.join(PINNED_TIMESTAMPS_FILENAME_SEPARATOR, Arrays.copyOfRange(blobNameTokens, 0, blobNameTokens.length - 1));
}

/**
* Unpins a timestamp from the remote store.
*
Expand Down Expand Up @@ -262,6 +276,10 @@ public static Tuple<Long, Set<Long>> getPinnedTimestamps() {
return pinnedTimestampsSet;
}

public static HashMap<String, List<Long>> getPinnedEntities() {
return pinnedEntityToTimestampsMap;
}

/**
* Inner class for asynchronously updating the pinned timestamp set.
*/
Expand Down Expand Up @@ -290,8 +308,23 @@ protected void runInternal() {
.map(RemoteStorePinnedTimestampService.this::getTimestampFromBlobName)
.filter(timestamp -> timestamp != -1)
.collect(Collectors.toSet());

logger.debug("Fetched pinned timestamps from remote store: {} - {}", triggerTimestamp, pinnedTimestamps);
pinnedTimestampsSet = new Tuple<>(triggerTimestamp, pinnedTimestamps);

pinnedEntityToTimestampsMap = new HashMap<>();
pinnedEntityToTimestampsMap.putAll(
pinnedTimestampList.keySet()
.stream()
.collect(Collectors.toMap(RemoteStorePinnedTimestampService.this::getEntityFromBlobName, blobName -> {
long timestamp = RemoteStorePinnedTimestampService.this.getTimestampFromBlobName(blobName);
return Collections.singletonList(timestamp);
}, (existingList, newList) -> {
List<Long> mergedList = new ArrayList<>(existingList);
mergedList.addAll(newList);
return mergedList;
}))
);
} catch (Throwable t) {
logger.error("Exception while fetching pinned timestamp details", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.Version;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
Expand Down Expand Up @@ -621,6 +622,7 @@ public void onResponse(RepositoryData repositoryData) {
return;
}
listener.onResponse(snapshotInfo);
cleanOrphanTimestamp(repositoryName, repositoryData);
}

@Override
Expand Down Expand Up @@ -651,6 +653,60 @@ public TimeValue timeout() {
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
}

private void cleanOrphanTimestamp(String repoName, RepositoryData repositoryData) {
Collection<String> snapshotUUIDs = repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList());
remoteStorePinnedTimestampService.forceSyncPinnedTimestamps();

HashMap<String, List<Long>> pinnedEntities = RemoteStorePinnedTimestampService.getPinnedEntities();

List<String> orphanPinnedEntities = pinnedEntities.keySet()
.stream()
.filter(pinnedEntity -> isOrphanPinnedEntity(repoName, snapshotUUIDs, pinnedEntity))
.collect(Collectors.toList());

if (orphanPinnedEntities.isEmpty()) {
return;
}

logger.info("Found {} orphan timestamps. Cleaning it up now", orphanPinnedEntities.size());
if (tryEnterRepoLoop(repoName)) {
deleteOrphanTimestamps(pinnedEntities, orphanPinnedEntities);
leaveRepoLoop(repoName);
} else {
logger.info("Concurrent snapshot create/delete is happening. Skipping clean up of orphan timestamps");
}
}

private boolean isOrphanPinnedEntity(String repoName, Collection<String> snapshotUUIDs, String pinnedEntity) {
Tuple<String, String> tokens = getRepoSnapshotUUIDTuple(pinnedEntity);
return Objects.equals(tokens.v1(), repoName) && snapshotUUIDs.contains(tokens.v2()) == false;
}

private void deleteOrphanTimestamps(HashMap<String, List<Long>> pinnedEntities, List<String> orphanPinnedEntities) {
final CountDownLatch latch = new CountDownLatch(orphanPinnedEntities.size());
for (String pinnedEntity : orphanPinnedEntities) {
assert pinnedEntities.get(pinnedEntity).size() == 1 : "Multiple timestamps for same repo-snapshot uuid found";
long orphanTimestamp = pinnedEntities.get(pinnedEntity).get(0);
String pinningEntity = getPinningEntity(pinnedEntity);
remoteStorePinnedTimestampService.unpinTimestamp(
orphanTimestamp,
pinningEntity,
new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {}
}, latch)
);
}
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private void createSnapshotPreValidations(
ClusterState currentState,
RepositoryData repositoryData,
Expand Down Expand Up @@ -707,6 +763,16 @@ public static String getPinningEntity(String repositoryName, String snapshotUUID
return repositoryName + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshotUUID;
}

public static String getPinningEntity(String blobname) {
String[] tokens = blobname.split(SNAPSHOT_PINNED_TIMESTAMP_DELIMITER);
return tokens[0] + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + tokens[1];
}

public static Tuple<String, String> getRepoSnapshotUUIDTuple(String pinningEntity) {
String[] tokens = pinningEntity.split(SNAPSHOT_PINNED_TIMESTAMP_DELIMITER);
return new Tuple<>(tokens[0], tokens[1]);
}

private void cloneSnapshotPinnedTimestamp(
RepositoryData repositoryData,
SnapshotId sourceSnapshot,
Expand Down

0 comments on commit 0503f63

Please sign in to comment.