Skip to content

Commit

Permalink
Snapshot Status API changes for V2 snapshots
Browse files Browse the repository at this point in the history
Signed-off-by: Lakshya Taragi <lakshya.taragi@gmail.com>
  • Loading branch information
ltaragi committed Aug 26, 2024
1 parent 80bf6cc commit b0c0c7c
Show file tree
Hide file tree
Showing 29 changed files with 700 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@

package org.opensearch.snapshots;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.common.action.ActionFuture;
Expand All @@ -44,8 +46,13 @@
import org.junit.Before;

import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_V2;
import static org.opensearch.snapshots.SnapshotsService.MAX_SHARDS_ALLOWED_IN_STATUS_API;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -192,6 +199,102 @@ public void testStatusAPICallInProgressShallowSnapshot() throws Exception {
createSnapshotResponseActionFuture.actionGet();
}

public void testStatusAPICallForShallowV2Snapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used for the test");
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);

logger.info("Create repository for shallow V2 snapshots");
final String snapshotRepoName = "snapshot-repo-name";
Settings.Builder snapshotV2RepoSettings = snapshotRepoSettingsForShallowCopy().put(SNAPSHOT_V2.getKey(), Boolean.TRUE);
createRepository(snapshotRepoName, "fs", snapshotV2RepoSettings);

final String index1 = "remote-index-1";
final String index2 = "remote-index-2";
final String index3 = "remote-index-3";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(index1, remoteStoreEnabledIndexSettings);
createIndex(index2, remoteStoreEnabledIndexSettings);
createIndex(index3, remoteStoreEnabledIndexSettings);
ensureGreen();

logger.info("Indexing some data");
for (int i = 0; i < 50; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index2, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index3, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

final String snapshot = "snapshot";
SnapshotInfo snapshotInfo = createFullSnapshot(snapshotRepoName, snapshot);
assertTrue(snapshotInfo.getPinnedTimestamp() > 0); // to assert creation of a shallow v2 snapshot

logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// without index filter
assertBusy(() -> {
// although no. of shards in snapshot (3) is greater than the max value allowed in a status api call, the request does not fail
SnapshotStatus snapshotStatusWithoutIndexFilter = client().admin()
.cluster()
.prepareSnapshotStatus(snapshotRepoName)
.setSnapshots(snapshot)
.execute()
.actionGet()
.getSnapshots()
.get(0);

assertShallowV2SnapshotStatus(snapshotStatusWithoutIndexFilter, false);

SnapshotStatus snapshotStatusWithIndexFilter = client().admin()
.cluster()
.prepareSnapshotStatus(snapshotRepoName)
.setSnapshots(snapshot)
.setIndices(index1, index2)
.execute()
.actionGet()
.getSnapshots()
.get(0);

assertShallowV2SnapshotStatus(snapshotStatusWithIndexFilter, true);

}, 1, TimeUnit.MINUTES);

}

private void assertShallowV2SnapshotStatus(SnapshotStatus snapshotStatus, boolean hasIndexFilter) {
if (hasIndexFilter) {
assertEquals(0, snapshotStatus.getStats().getTotalSize());
} else {
assertTrue(snapshotStatus.getStats().getTotalSize() > 0);
}
// assert that total and incremental values of file count and size_in_bytes are 0 at index and shard levels
assertEquals(0, snapshotStatus.getStats().getTotalFileCount());
assertEquals(0, snapshotStatus.getStats().getIncrementalSize());
assertEquals(0, snapshotStatus.getStats().getIncrementalFileCount());

for (Map.Entry<String, SnapshotIndexStatus> entry : snapshotStatus.getIndices().entrySet()) {
// index level
SnapshotIndexStatus snapshotIndexStatus = entry.getValue();
assertEquals(0, snapshotIndexStatus.getStats().getTotalSize());
assertEquals(0, snapshotIndexStatus.getStats().getTotalFileCount());
assertEquals(0, snapshotIndexStatus.getStats().getIncrementalSize());
assertEquals(0, snapshotIndexStatus.getStats().getIncrementalFileCount());

for (SnapshotIndexShardStatus snapshotIndexShardStatus : snapshotStatus.getShards()) {
// shard level
assertEquals(0, snapshotIndexShardStatus.getStats().getTotalSize());
assertEquals(0, snapshotIndexShardStatus.getStats().getTotalFileCount());
assertEquals(0, snapshotIndexShardStatus.getStats().getIncrementalSize());
assertEquals(0, snapshotIndexShardStatus.getStats().getIncrementalFileCount());
assertEquals(SnapshotIndexShardStage.DONE, snapshotIndexShardStatus.getStage());
}
}
}

private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {
return snapshotStatus.getIndices().get(indexName).getShards().get(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
package org.opensearch.snapshots;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStats;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
Expand All @@ -49,6 +52,7 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -59,9 +63,12 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.snapshots.SnapshotsService.MAX_SHARDS_ALLOWED_IN_STATUS_API;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -564,6 +571,192 @@ public void testGetSnapshotsRequest() throws Exception {
waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60));
}

public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws Exception {
String repositoryName = "test-repo";
String index1 = "test-idx-1";
String index2 = "test-idx-2";
String index3 = "test-idx-3";
createRepository(repositoryName, "fs");

logger.info("Create indices");
createIndex(index1, index2, index3);
ensureGreen();

logger.info("Indexing some data");
for (int i = 0; i < 10; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index2, "_doc", Integer.toString(i), "foo", "baz" + i);
index(index3, "_doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
String snapshot1 = "test-snap-1";
String snapshot2 = "test-snap-2";
createSnapshot(repositoryName, snapshot1, List.of(index1, index2, index3));
createSnapshot(repositoryName, snapshot2, List.of(index1, index2));

logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// across a single snapshot
assertBusy(() -> {
SnapshotException snapshotException = expectThrows(
SnapshotException.class,
() -> client().admin().cluster().prepareSnapshotStatus(repositoryName).setSnapshots(snapshot1).execute().actionGet()
);
assertEquals(snapshotException.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE);
assertTrue(
snapshotException.getMessage()
.endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")
);
}, 1, TimeUnit.MINUTES);

// across multiple snapshots
assertBusy(() -> {
SnapshotException snapshotException = expectThrows(
SnapshotException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot1, snapshot2)
.execute()
.actionGet()
);
assertEquals(snapshotException.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE);
assertTrue(
snapshotException.getMessage()
.endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")
);
}, 1, TimeUnit.MINUTES);

logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

public void testSnapshotStatusForIndexFilter() throws Exception {
String repositoryName = "test-repo";
String index1 = "test-idx-1";
String index2 = "test-idx-2";
String index3 = "test-idx-3";
createRepository(repositoryName, "fs");

logger.info("Create indices");
createIndex(index1, index2, index3);
ensureGreen();

logger.info("Indexing some data");
for (int i = 0; i < 10; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index2, "_doc", Integer.toString(i), "foo", "baz" + i);
index(index3, "_doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
String snapshot = "test-snap-1";
createSnapshot(repositoryName, snapshot, List.of(index1, index2, index3));

assertBusy(() -> {
SnapshotStatus snapshotsStatus = client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot)
.setIndices(index1, index2)
.get()
.getSnapshots()
.get(0);
Map<String, SnapshotIndexStatus> snapshotIndexStatusMap = snapshotsStatus.getIndices();
// Although the snapshot contains 3 indices, the response of status api call only contains results for 2
assertEquals(snapshotIndexStatusMap.size(), 2);
assertEquals(snapshotIndexStatusMap.keySet(), Set.of(index1, index2));
}, 1, TimeUnit.MINUTES);
}

public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
String repositoryName = "test-repo";
String index1 = "test-idx-1";
String index2 = "test-idx-2";
String index3 = "test-idx-3";
createRepository(repositoryName, "fs");

logger.info("Create indices");
createIndex(index1, index2, index3);
ensureGreen();

logger.info("Indexing some data");
for (int i = 0; i < 10; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index2, "_doc", Integer.toString(i), "foo", "baz" + i);
index(index3, "_doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
String snapshot1 = "test-snap-1";
String snapshot2 = "test-snap-2";
createSnapshot(repositoryName, snapshot1, List.of(index1, index2, index3));
createSnapshot(repositoryName, snapshot2, List.of(index1));

assertBusy(() -> {
// failure due to passing index filter for multiple snapshots
ActionRequestValidationException ex1 = expectThrows(
ActionRequestValidationException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot1, snapshot2)
.setIndices(index1, index2, index3)
.execute()
.actionGet()
);
String cause = "index list filter is supported only for a single snapshot";
assertTrue(ex1.getMessage().contains(cause));

// failure due to index not found in snapshot
SnapshotException ex2 = expectThrows(
SnapshotException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot2)
.setIndices(index1, index2, index3)
.execute()
.actionGet()
);
assertEquals(ex2.status(), RestStatus.NOT_FOUND);
cause = String.format(
"[%s:%s] indices [%s] missing in snapshot [%s]",
repositoryName,
snapshot2,
String.join(", ", List.of(index2, index3)),
snapshot2
);
assertEquals(cause, ex2.getMessage());

// failure due to too many shards requested
logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

SnapshotException ex3 = expectThrows(
SnapshotException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot1)
.setIndices(index1, index2, index3)
.execute()
.actionGet()
);
assertEquals(ex3.status(), RestStatus.REQUEST_ENTITY_TOO_LARGE);
assertTrue(ex3.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request"));

logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

}, 1, TimeUnit.MINUTES);
}

private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {
return snapshotStatus.getIndices().get(indexName).getShards().get(0);
}
Expand Down
Loading

0 comments on commit b0c0c7c

Please sign in to comment.