Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Introducing mixed mode support for remote store migration #11986

Merged
merged 4 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887))
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to go in the [Unreleased 2.x] section if this PR is intended to be backported. Can you please do the manually backport and fix up this CHANGELOG entry on main as well?

Copy link
Collaborator Author

@gbbafna gbbafna Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure Andrew, will fix it up .


### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class DocRepMigrationTestCase extends MigrationBaseTestCase {

public void testMixedModeAddDocRep() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);

Client client = internalCluster().client(cmNodes.get(0));
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
addRemote = false;
internalCluster().startNode();
String[] allNodes = internalCluster().getNodeNames();
assertBusy(() -> { assertEquals(client.admin().cluster().prepareClusterStats().get().getNodes().size(), allNodes.length); });
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;

public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";

protected Path segmentRepoPath;
protected Path translogRepoPath;

boolean addRemote = false;

protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
translogRepoPath = randomRepoPath().toAbsolutePath();
}
if (addRemote) {
logger.info("Adding remote store node");
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.put("discovery.initial_state_timeout", "500ms")
.build();
} else {
logger.info("Adding docrep node");
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("discovery.initial_state_timeout", "500ms").build();
}
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class RemoteStoreMigrationTestCase extends MigrationBaseTestCase {
public void testMixedModeAddRemoteNodes() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);
Client client = internalCluster().client(cmNodes.get(0));
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// add remote node in mixed mode cluster
addRemote = true;
internalCluster().startNode();
internalCluster().startNode();
internalCluster().validateClusterFormed();

// assert repo gets registered
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { REPOSITORY_NAME });
GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().getRepositories(gr).actionGet();
assertEquals(1, getRepositoriesResponse.repositories().size());

// add docrep mode in mixed mode cluster
addRemote = true;
internalCluster().startNode();
assertBusy(() -> {
assertEquals(client.admin().cluster().prepareClusterStats().get().getNodes().size(), internalCluster().getNodeNames().length);
});

// add incompatible remote node in remote mixed cluster
Settings.Builder badSettings = Settings.builder()
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, "REPOSITORY_2_NAME", translogRepoPath))
.put("discovery.initial_state_timeout", "500ms");
String badNode = internalCluster().startNode(badSettings);
assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() < internalCluster().getNodeNames().length);
internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(badNode));
}

public void testMigrationDirections() {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
// add remote node in docrep cluster
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "docrep"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "random"));
assertThrows(IllegalArgumentException.class, () -> client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -176,12 +177,13 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo

DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());

// TODO: We are using one of the existing node to build the repository metadata, this will need to be updated
// once we start supporting mixed compatibility mode. An optimization can be done as this will get invoked
// An optimization can be done as this will get invoked
// for every set of node join task which we can optimize to not compute if cluster state already has
// repository information.
Optional<DiscoveryNode> remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
(currentNodes.getNodes().values()).stream().findFirst().get(),
dn,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);

Expand Down Expand Up @@ -212,6 +214,16 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness
ensureNodeCommissioned(node, currentState.metadata());
nodesBuilder.add(node);

if (remoteDN.isEmpty()) {
// This is hit only on cases where we encounter first remote node
logger.info("Updating system repository now for remote store");
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
node,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);
}

nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
Expand Down Expand Up @@ -495,36 +507,46 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod

assert existingNodes.isEmpty() == false;

// TODO: The below check is valid till we don't support migration, once we start supporting migration a remote
// store node will be able to join a non remote store cluster and vice versa. #7986
CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings());
if (STRICT.equals(remoteStoreCompatibilityMode)) {

DiscoveryNode existingNode = existingNodes.get(0);
if (joiningNode.isRemoteStoreNode()) {
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode);
} else {
if (existingNode.isRemoteStoreNode()) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]"
);
}
} else {
throw new IllegalStateException(
"a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"
"a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"
);
}
} else {
if (existingNode.isRemoteStoreNode()) {
}
} else {
if (remoteStoreCompatibilityMode == CompatibilityMode.MIXED) {
if (joiningNode.isRemoteStoreNode()) {
Optional<DiscoveryNode> remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode));
}
}
}
}

private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
if (joiningNode.isRemoteStoreNode()) {
if (existingNode.isRemoteStoreNode()) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
throw new IllegalStateException(
"a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]"
);
}
} else {
throw new IllegalStateException("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ protected FeatureFlagSettings(
FeatureFlags.TELEMETRY_SETTING,
FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING,
FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING,
FeatureFlags.DOC_ID_FUZZY_SET_SETTING
FeatureFlags.DOC_ID_FUZZY_SET_SETTING,
FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
* @opensearch.internal
*/
public class FeatureFlags {
/**
* Gates the visibility of the remote store migration support from docrep .
*/
public static final String REMOTE_STORE_MIGRATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.migration.enabled";

/**
* Gates the ability for Searchable Snapshots to read snapshots that are older than the
* guaranteed backward compatibility for OpenSearch (one prior major version) on a best effort basis.
Expand Down Expand Up @@ -98,6 +103,12 @@ public static boolean isEnabled(Setting<Boolean> featureFlag) {
}
}

public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_STORE_MIGRATION_EXPERIMENTAL,
false,
Property.NodeScope
);

public static final Setting<Boolean> EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope);

public static final Setting<Boolean> IDENTITY_SETTING = Setting.boolSetting(IDENTITY, false, Property.NodeScope);
Expand Down
Loading
Loading