Skip to content

Commit

Permalink
Read the same medata file that is locked during restore of shallow sn…
Browse files Browse the repository at this point in the history
…apshot (#10979)

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored Oct 30, 2023
1 parent 84be8c9 commit 4efa6d7
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -589,4 +589,71 @@ public void testRestoreShallowSnapshotRepository() throws ExecutionException, In
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionException, InterruptedException {
String indexName1 = "testindex1";
String snapshotRepoName = "test-restore-snapshot-repo";
String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX;
String snapshotName1 = "test-restore-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
Path absolutePath2 = randomRepoPath().toAbsolutePath();
String[] pathTokens = absolutePath1.toString().split("/");
String basePath = pathTokens[pathTokens.length - 1];
Arrays.copyOf(pathTokens, pathTokens.length - 1);
Path location = PathUtils.get(String.join("/", pathTokens));
pathTokens = absolutePath2.toString().split("/");
String basePath2 = pathTokens[pathTokens.length - 1];
Arrays.copyOf(pathTokens, pathTokens.length - 1);
Path location2 = PathUtils.get(String.join("/", pathTokens));
logger.info("Path 1 [{}]", absolutePath1);
logger.info("Path 2 [{}]", absolutePath2);
String restoredIndexName1 = indexName1 + "-restored";

createRepository(snapshotRepoName, "fs", getRepositorySettings(location, basePath, true));

Client client = client();
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
createIndex(indexName1, indexSettings);

int numDocsInIndex1 = randomIntBetween(2, 5);
indexDocuments(client, indexName1, numDocsInIndex1);

ensureGreen(indexName1);

logger.info("--> snapshot");
SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(List.of(indexName1)));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

int extraNumDocsInIndex1 = randomIntBetween(20, 50);
indexDocuments(client, indexName1, extraNumDocsInIndex1);
refresh(indexName1);

client().admin().indices().close(Requests.closeIndexRequest(indexName1)).get();
createRepository(remoteStoreRepoNameUpdated, "fs", remoteRepoPath);
RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1)
.setRenamePattern(indexName1)
.setRenameReplacement(restoredIndexName1)
.setSourceRemoteStoreRepository(remoteStoreRepoNameUpdated)
.get();

assertTrue(restoreSnapshotResponse2.getRestoreInfo().failedShards() == 0);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1);

// indexing some new docs and validating
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4908,8 +4908,7 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
remoteStore.incRef();
}
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = sourceRemoteDirectory
.initializeToSpecificCommit(primaryTerm, commitGeneration)
.getMetadata();
.getSegmentsUploadedToRemoteStore();
final Directory storeDirectory = store.directory();
store.incRef();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,11 @@ void recoverFromSnapshotAndRemoteStore(
indexUUID,
shardId
);
sourceRemoteDirectory.initializeToSpecificCommit(
primaryTerm,
commitGeneration,
recoverySource.snapshot().getSnapshotId().getUUID()
);
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, primaryTerm, commitGeneration);
final Store store = indexShard.store();
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand Down Expand Up @@ -160,8 +161,9 @@ public RemoteSegmentMetadata init() throws IOException {
*
* @throws IOException if there were any failures in reading the metadata file
*/
public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration) throws IOException {
String metadataFile = getMetadataFileForCommit(primaryTerm, commitGeneration);
public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration, String acquirerId) throws IOException {
String metadataFilePrefix = MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, commitGeneration);
String metadataFile = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLock(metadataFilePrefix, acquirerId);
RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile(metadataFile);
if (remoteSegmentMetadata != null) {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
import org.apache.lucene.store.IndexOutput;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* A Class that implements Remote Store Lock Manager by creating lock files for the remote store files that needs to
Expand Down Expand Up @@ -70,6 +73,19 @@ public void release(LockInfo lockInfo) throws IOException {
}
}

public String fetchLock(String filenamePrefix, String acquirerId) throws IOException {
Collection<String> lockFiles = lockDirectory.listFilesByPrefix(filenamePrefix);
List<String> lockFilesForAcquirer = lockFiles.stream()
.filter(lockFile -> acquirerId.equals(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lockFile)))
.map(FileLockInfo.LockFileUtils::getFileToLockNameFromLock)
.collect(Collectors.toList());
if (lockFilesForAcquirer.size() == 0) {
throw new FileNotFoundException("No lock file found for prefix: " + filenamePrefix + " and acquirerId: " + acquirerId);
}
assert lockFilesForAcquirer.size() == 1;
return lockFilesForAcquirer.get(0);
}

/**
* Checks whether a given file have any lock on it or not.
* @param lockInfo File Lock Info instance for which we need to check if lock is acquired.
Expand Down

0 comments on commit 4efa6d7

Please sign in to comment.