Skip to content

Commit

Permalink
[Snapshot Interop] Add changes for overriding remote store and replic…
Browse files Browse the repository at this point in the history
…ation settings during snapshot restore.

Signed-off-by: Harish Bhakuni <hbhakuni@amazon.com>
  • Loading branch information
Harish Bhakuni committed Jan 12, 2024
1 parent 5c82ab8 commit 28aaf7b
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -44,12 +46,17 @@
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -58,6 +65,8 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteRestoreSnapshotIT extends AbstractSnapshotIntegTestCase {
private static final String BASE_REMOTE_REPO = "test-rs-repo" + TEST_REMOTE_STORE_REPO_SUFFIX;

private static final String SNAP_REPO = "snap_repo";
private Path remoteRepoPath;

@Before
Expand Down Expand Up @@ -656,4 +665,167 @@ public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionExcep
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

public void testRestoreRemoteToNonRemote_WithReplicationTypeRestriction() throws Exception {
testRestoreRemoteToNonRemote(true);
}

public void testRestoreRemoteToNonRemote_WithoutReplicationTypeRestriction() throws Exception {
testRestoreRemoteToNonRemote(false);
}

// create snapshot in remote store enabled node and restore in non-remote store node,
// it should have the replication type as cluster default if `cluster.index.restrict.replication.type` is enabled.
// if `cluster.index.restrict.replication.type` is not enabled, it should be restored as seg-rep enabled.
private void testRestoreRemoteToNonRemote(boolean restrictReplicationType) throws Exception {
String indexName = "test-index";
String restoredIndexName = indexName + "-restored";
String testSnapName = "test-snap";
String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX;
String originalClusterManagerNode = internalCluster().startClusterManagerOnlyNode(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath));
String primaryNode = internalCluster().startDataOnlyNode(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath));

prepareCreate(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
).get();
ensureYellowAndNoInitializingShards(indexName);
String replicaNode = internalCluster().startDataOnlyNode(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath));
ensureGreen(indexName);

int numDocsInIndex = 10;
for (int i = 0; i < numDocsInIndex; i++) {
client().prepareIndex(indexName).setId(String.valueOf(i)).setSource("foo", "bar").get();
}

createRepository(SNAP_REPO, "fs", getRepositorySettings(remoteRepoPath, false));
createSnapshot(SNAP_REPO, testSnapName, Collections.singletonList(indexName));

// Delete index
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName)).get());
assertFalse("index [" + indexName + "] should have been deleted", indexExists(indexName));

// Start new set of nodes with cluster level replication type setting
// and restrict replication type setting if enabled.
Settings.Builder settingsBuilder = Settings.builder()
.put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT);

if (restrictReplicationType) {
settingsBuilder.put(CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING.getKey(), true);
}
Settings settings = settingsBuilder.build();

// Start new cluster manager node
String newClusterManagerNode = internalCluster().startClusterManagerOnlyNode(settings);

// Remove older nodes
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(originalClusterManagerNode));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));

String newPrimaryNode = internalCluster().startDataOnlyNode(settings);
String newReplicaNode = internalCluster().startDataOnlyNode(settings);

// Perform snapshot restore
logger.info("--> Performing snapshot restore to target index");

createRepository(remoteStoreRepoNameUpdated, "fs", remoteRepoPath);
RestoreSnapshotResponse restoreSnapshotResponse = client().admin()
.cluster()
.prepareRestoreSnapshot(SNAP_REPO, testSnapName)
.setWaitForCompletion(true)
.setIndices(indexName)
.setRenamePattern(indexName)
.setRenameReplacement(restoredIndexName)
.setSourceRemoteStoreRepository(remoteStoreRepoNameUpdated)
.get();

assertEquals(restoreSnapshotResponse.status(), RestStatus.OK);
assertTrue(restoreSnapshotResponse.getRestoreInfo().failedShards() == 0);
ensureGreen(restoredIndexName);
assertDocsPresentInIndex(client(), restoredIndexName, numDocsInIndex);
GetSettingsResponse settingsResponse = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(restoredIndexName).includeDefaults(true))
.get();

String expectedReplicationType = ReplicationType.SEGMENT.toString();
if (restrictReplicationType) {
expectedReplicationType = ReplicationType.DOCUMENT.toString();
}
assertEquals("expected: " + expectedReplicationType + " actual: " + settingsResponse.getSetting(restoredIndexName, SETTING_REPLICATION_TYPE), expectedReplicationType, settingsResponse.getSetting(restoredIndexName, SETTING_REPLICATION_TYPE));
assertEquals(settingsResponse.getSetting(restoredIndexName, SETTING_REMOTE_STORE_ENABLED), Boolean.FALSE.toString());
assertNull(settingsResponse.getSetting(restoredIndexName, SETTING_REMOTE_SEGMENT_STORE_REPOSITORY));
assertNull(settingsResponse.getSetting(restoredIndexName, SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY));
}

public void testRestoreNonRemoteToRemote() throws Exception {
String indexName = "test-index";
String restoredIndexName = indexName + "-restored";
String testSnapName = "test-snap";
String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX;
String originalClusterManagerNode = internalCluster().startClusterManagerOnlyNode();
String primaryNode = internalCluster().startDataOnlyNode();

prepareCreate(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
).get();
ensureYellowAndNoInitializingShards(indexName);
String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(indexName);

int numDocsInIndex = 10;
for (int i = 0; i < numDocsInIndex; i++) {
client().prepareIndex(indexName).setId(String.valueOf(i)).setSource("foo", "bar").get();
}

createRepository(SNAP_REPO, "fs", getRepositorySettings(remoteRepoPath, false));
createSnapshot(SNAP_REPO, testSnapName, Collections.singletonList(indexName));

// Delete index
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName)).get());
assertFalse("index [" + indexName + "] should have been deleted", indexExists(indexName));

// Start new cluster manager node
String newClusterManagerNode = internalCluster().startClusterManagerOnlyNode(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath));

// Remove older nodes
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(originalClusterManagerNode));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));

String newPrimaryNode = internalCluster().startDataOnlyNode(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath));
String newReplicaNode = internalCluster().startDataOnlyNode(remoteStoreClusterSettings(BASE_REMOTE_REPO, remoteRepoPath));

// Perform snapshot restore
logger.info("--> Performing snapshot restore to target index");

createRepository(remoteStoreRepoNameUpdated, "fs", remoteRepoPath);
RestoreSnapshotResponse restoreSnapshotResponse = client().admin()
.cluster()
.prepareRestoreSnapshot(SNAP_REPO, testSnapName)
.setWaitForCompletion(true)
.setIndices(indexName)
.setRenamePattern(indexName)
.setRenameReplacement(restoredIndexName)
.setSourceRemoteStoreRepository(remoteStoreRepoNameUpdated)
.get();

assertEquals(restoreSnapshotResponse.status(), RestStatus.OK);
assertTrue(restoreSnapshotResponse.getRestoreInfo().failedShards() == 0);
ensureGreen(restoredIndexName);
assertDocsPresentInIndex(client(), restoredIndexName, numDocsInIndex);
GetSettingsResponse settingsResponse = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(restoredIndexName).includeDefaults(true))
.get();
assertEquals(settingsResponse.getSetting(restoredIndexName, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString());
assertEquals(settingsResponse.getSetting(restoredIndexName, SETTING_REMOTE_STORE_ENABLED), Boolean.TRUE.toString());
assertEquals(settingsResponse.getSetting(restoredIndexName, SETTING_REMOTE_SEGMENT_STORE_REPOSITORY), BASE_REMOTE_REPO);
assertEquals(settingsResponse.getSetting(restoredIndexName, SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY), BASE_REMOTE_REPO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@ public void createSnapshot() {
}

public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings) {
return restoreSnapshotWithSettings(indexSettings, RESTORED_INDEX_NAME);
}

public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings, String restoredIndexName) {
RestoreSnapshotRequestBuilder builder = client().admin()
.cluster()
.prepareRestoreSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME)
.setWaitForCompletion(false)
.setRenamePattern(INDEX_NAME)
.setRenameReplacement(RESTORED_INDEX_NAME);
.setRenameReplacement(restoredIndexName);
if (indexSettings != null) {
builder.setIndexSettings(indexSettings);
}
Expand Down Expand Up @@ -311,7 +315,8 @@ public void testSnapshotRestoreOnIndexWithSegRepClusterSetting() throws Exceptio
* 2. Snapshot index
* 3. Add new set of nodes with `cluster.indices.replication.strategy` set to SEGMENT and `cluster.index.restrict.replication.type`
* set to true.
* 4. Perform restore on new set of nodes to validate restored index has `DOCUMENT` replication.
* 4. Perform restore on new set of nodes to validate restored index has `SEGMENT` replication.
* 5. Validate that if replication type is passed as DOCUMENT as request parameter, restore operation fails
*/
public void testSnapshotRestoreOnRestrictReplicationSetting() throws Exception {
final int documentCount = scaledRandomIntBetween(1, 10);
Expand Down Expand Up @@ -361,7 +366,24 @@ public void testSnapshotRestoreOnRestrictReplicationSetting() throws Exception {
// Perform snapshot restore
logger.info("--> Performing snapshot restore to target index");

IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> restoreSnapshotWithSettings(null));
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null);

// Assertions
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME).includeDefaults(true))
.get();
assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString());

// Verify index setting isSegRepEnabled.
Index index = resolveIndex(RESTORED_INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);

// Perform Snapshot Restore with different index name
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> restoreSnapshotWithSettings(null, RESTORED_INDEX_NAME + "2"));
assertEquals(REPLICATION_MISMATCH_VALIDATION_ERROR, exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ static Settings aggregateIndexSettings(
* @param clusterSettings cluster level settings
* @param combinedTemplateSettings combined template settings which satisfy the index
*/
private static void updateReplicationStrategy(
public static void updateReplicationStrategy(
Settings.Builder settingsBuilder,
Settings requestSettings,
Settings clusterSettings,
Expand All @@ -953,14 +953,14 @@ private static void updateReplicationStrategy(
// 4. Default cluster level setting

final ReplicationType indexReplicationType;
if (INDEX_REPLICATION_TYPE_SETTING.exists(requestSettings)) {
if (isRemoteStoreAttributePresent(clusterSettings)) {
indexReplicationType = ReplicationType.SEGMENT;
} else if (INDEX_REPLICATION_TYPE_SETTING.exists(requestSettings)) {
indexReplicationType = INDEX_REPLICATION_TYPE_SETTING.get(requestSettings);
} else if (INDEX_REPLICATION_TYPE_SETTING.exists(combinedTemplateSettings)) {
indexReplicationType = INDEX_REPLICATION_TYPE_SETTING.get(combinedTemplateSettings);
} else if (CLUSTER_REPLICATION_TYPE_SETTING.exists(clusterSettings)) {
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.get(clusterSettings);
} else if (isRemoteStoreAttributePresent(clusterSettings)) {
indexReplicationType = ReplicationType.SEGMENT;
} else {
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.getDefault(clusterSettings);
}
Expand All @@ -970,20 +970,20 @@ private static void updateReplicationStrategy(
/**
* Updates index settings to enable remote store by default based on node attributes
* @param settingsBuilder index settings builder to be updated with relevant settings
* @param clusterSettings cluster level settings
* @param nodeSettings node settings
*/
private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) {
if (isRemoteStoreAttributePresent(clusterSettings)) {
public static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings nodeSettings) {
if (isRemoteStoreAttributePresent(nodeSettings)) {
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(
SETTING_REMOTE_SEGMENT_STORE_REPOSITORY,
clusterSettings.get(
nodeSettings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY
)
)
.put(
SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY,
clusterSettings.get(
nodeSettings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY
)
);
Expand Down
Loading

0 comments on commit 28aaf7b

Please sign in to comment.