Skip to content

Commit

Permalink
Shallow snapshot v2 - create snapshot validations in a cluster state …
Browse files Browse the repository at this point in the history
…update (#15939)

---------

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna authored Sep 27, 2024
1 parent 7caca26 commit e8b02c9
Show file tree
Hide file tree
Showing 7 changed files with 1,134 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
Expand All @@ -25,6 +27,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
Expand All @@ -43,14 +46,11 @@
import org.opensearch.repositories.RepositoryData;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotRestoreException;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -63,6 +63,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -79,48 +80,7 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteRestoreSnapshotIT extends AbstractSnapshotIntegTestCase {
private static final String BASE_REMOTE_REPO = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX;
private Path remoteRepoPath;

@Before
public void setup() {
remoteRepoPath = randomRepoPath().toAbsolutePath();
}

@After
public void teardown() {
clusterAdmin().prepareCleanupRepository(BASE_REMOTE_REPO).get();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath))
.build();
}

private Settings.Builder getIndexSettings(int numOfShards, int numOfReplicas) {
Settings.Builder settingsBuilder = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s");
return settingsBuilder;
}

private void indexDocuments(Client client, String indexName, int numOfDocs) {
indexDocuments(client, indexName, 0, numOfDocs);
}

private void indexDocuments(Client client, String indexName, int fromId, int toId) {
for (int i = fromId; i < toId; i++) {
String id = Integer.toString(i);
client.prepareIndex(indexName).setId(id).setSource("text", "sometext").get();
}
client.admin().indices().prepareFlush(indexName).get();
}
public class RemoteRestoreSnapshotIT extends RemoteSnapshotIT {

private void assertDocsPresentInIndex(Client client, String indexName, int numOfDocs) {
for (int i = 0; i < numOfDocs; i++) {
Expand Down Expand Up @@ -997,6 +957,75 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1));
}

public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Exception {
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String snapshotRepoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
createRepository(snapshotRepoName, FsRepository.TYPE, settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

Thread thread = new Thread(() -> {
try {
String snapshotName = "snapshot-earlier-master";
internalCluster().nonClusterManagerClient()
.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
.setWaitForCompletion(true)
.setMasterNodeTimeout(TimeValue.timeValueSeconds(60))
.get();

} catch (Exception ignored) {}
});
thread.start();

// stop existing master
final String clusterManagerNode = internalCluster().getClusterManagerName();
stopNode(clusterManagerNode);

// Validate that we have greater one snapshot has been created
String snapshotName = "new-snapshot";
try {
client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, snapshotName).setWaitForCompletion(true).get();
} catch (Exception e) {
logger.info("Exception while creating new-snapshot", e);
}

// Validate that snapshot is present in repository data
assertBusy(() -> {
GetSnapshotsRequest request = new GetSnapshotsRequest(snapshotRepoName);
GetSnapshotsResponse response2 = client().admin().cluster().getSnapshots(request).actionGet();
assertThat(response2.getSnapshots().size(), greaterThanOrEqualTo(1));
}, 30, TimeUnit.SECONDS);
thread.join();
}

public void testCreateSnapshotV2WithRedIndex() throws Exception {
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
Expand Down Expand Up @@ -1315,11 +1344,4 @@ public void testConcurrentV1SnapshotAndV2RepoSettingUpdate() throws Exception {
createV1SnapshotThread.join();
}

private Settings pinnedTimestampSettings() {
Settings settings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
return settings;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;
import java.util.concurrent.ExecutionException;

import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;

public abstract class RemoteSnapshotIT extends AbstractSnapshotIntegTestCase {
protected static final String BASE_REMOTE_REPO = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX;
protected Path remoteRepoPath;

@Before
public void setup() {
remoteRepoPath = randomRepoPath().toAbsolutePath();
}

@After
public void teardown() {
clusterAdmin().prepareCleanupRepository(BASE_REMOTE_REPO).get();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath))
.build();
}

protected Settings pinnedTimestampSettings() {
Settings settings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
return settings;
}

protected Settings.Builder getIndexSettings(int numOfShards, int numOfReplicas) {
Settings.Builder settingsBuilder = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s");
return settingsBuilder;
}

protected void indexDocuments(Client client, String indexName, int numOfDocs) {
indexDocuments(client, indexName, 0, numOfDocs);
}

void indexDocuments(Client client, String indexName, int fromId, int toId) {
for (int i = fromId; i < toId; i++) {
String id = Integer.toString(i);
client.prepareIndex(indexName).setId(id).setSource("text", "sometext").get();
}
client.admin().indices().prepareFlush(indexName).get();
}

protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(REPOSITORIES_FAILRATE_SETTING.getKey(), value);
createRepository(repoName, ReloadableFsRepository.TYPE, settings);
}

}
Loading

0 comments on commit e8b02c9

Please sign in to comment.