Skip to content

Commit

Permalink
Update Shallow Snapshot flows to support remote path type & hash algo
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Apr 3, 2024
1 parent fb5d036 commit 6b53314
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -47,6 +48,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -284,7 +286,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {

indexDocuments(client, indexName1, randomIntBetween(5, 10));
ensureGreen(indexName1);
validatePathType(indexName1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(indexName1, PathType.FIXED);

logger.info("--> snapshot");
SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1)));
Expand All @@ -301,7 +303,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
.get();
assertEquals(RestStatus.ACCEPTED, restoreSnapshotResponse.status());
ensureGreen(restoredIndexName1version1);
validatePathType(restoredIndexName1version1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(restoredIndexName1version1, PathType.FIXED);

client(clusterManagerNode).admin()
.cluster()
Expand All @@ -327,16 +329,22 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A);

// Validating that custom data has not changed for indexes which were created before the cluster setting got updated
validatePathType(indexName1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(indexName1, PathType.FIXED);
}

private void validatePathType(String index, PathType pathType, PathHashAlgorithm pathHashAlgorithm) {
private void validatePathType(String index, PathType pathType) {
validatePathType(index, pathType, null);
}

private void validatePathType(String index, PathType pathType, @Nullable PathHashAlgorithm pathHashAlgorithm) {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
// Validate that the remote_store custom data is present in index metadata for the created index.
Map<String, String> remoteCustomData = state.metadata().index(index).getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assertNotNull(remoteCustomData);
assertEquals(pathType.name(), remoteCustomData.get(PathType.NAME));
assertEquals(pathHashAlgorithm.name(), remoteCustomData.get(PathHashAlgorithm.NAME));
if (Objects.nonNull(pathHashAlgorithm)) {
assertEquals(pathHashAlgorithm.name(), remoteCustomData.get(PathHashAlgorithm.NAME));
}
}

public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ public MetadataCreateIndexService(

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
remoteStorePathStrategyResolver = isRemoteDataAttributePresent(settings)
? new RemoteStorePathStrategyResolver(clusterService.getClusterSettings())
? new RemoteStorePathStrategyResolver(clusterService.getClusterSettings(), minNodeVersionSupplier)
: null;
}

Expand Down Expand Up @@ -575,22 +576,18 @@ public void addRemoteStorePathStrategyInCustomData(IndexMetadata.Builder tmpImdB
if (remoteStorePathStrategyResolver != null) {
// It is possible that remote custom data exists already. In such cases, we need to only update the path type
// in the remote store custom data map.
Map<String, String> existingRemoteCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
Map<String, String> remoteCustomData = existingRemoteCustomData == null
? new HashMap<>()
: new HashMap<>(existingRemoteCustomData);
Map<String, String> existingCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert assertNullOldType == false || Objects.isNull(existingCustomData);

// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStorePathStrategyResolver.get();
String oldPathType = remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name());
String oldHashAlgorithm = remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name());
assert !assertNullOldType || (Objects.isNull(oldPathType) && Objects.isNull(oldHashAlgorithm));
Map<String, String> remoteCustomData = new HashMap<>();
remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name());
if (Objects.nonNull(newPathStrategy.getHashAlgorithm())) {
remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name());
}
logger.trace(
() -> new ParameterizedMessage(
"Added newPathStrategy={}, replaced oldPathType={} oldHashAlgorithm={}",
newPathStrategy,
oldPathType,
oldHashAlgorithm
)
() -> new ParameterizedMessage("Added newStrategy={}, replaced oldStrategy={}", remoteCustomData, existingCustomData)
);
tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData);
}
Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1910,12 +1911,11 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo

public RemoteStorePathStrategy getRemoteStorePathStrategy() {
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
if (remoteCustomData != null
&& remoteCustomData.containsKey(PathType.NAME)
&& remoteCustomData.containsKey(PathHashAlgorithm.NAME)) {
if (remoteCustomData != null && remoteCustomData.containsKey(PathType.NAME)) {
PathType pathType = PathType.parseString(remoteCustomData.get(PathType.NAME));
PathHashAlgorithm pathHashAlgorithm = PathHashAlgorithm.parseString(remoteCustomData.get(PathHashAlgorithm.NAME));
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
String hashAlgoStr = remoteCustomData.get(PathHashAlgorithm.NAME);
PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) ? PathHashAlgorithm.parseString(hashAlgoStr) : null;
return new RemoteStorePathStrategy(pathType, hashAlgorithm);
}
return new RemoteStorePathStrategy(PathType.FIXED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
import org.opensearch.common.hash.FNV1a;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;

Expand Down Expand Up @@ -78,9 +82,10 @@ public String getName() {
*/
@PublicApi(since = "2.14.0")
public enum PathType {
FIXED {
FIXED(0) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.isNull(hashAlgorithm) : "hashAlgorithm is expected to be null with fixed remote store path type";
// Hash algorithm is not used in FIXED path type
return pathInput.basePath()
.add(pathInput.indexUUID())
Expand All @@ -94,7 +99,7 @@ boolean requiresHashAlgorithm() {
return false;
}
},
HASHED_PREFIX {
HASHED_PREFIX(1) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
// TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise.
Expand All @@ -112,6 +117,33 @@ boolean requiresHashAlgorithm() {
}
};

private final int code;

PathType(int code) {
this.code = code;
}

public int getCode() {
return code;
}

private static final Map<Integer, PathType> CODE_TO_ENUM;
static {
PathType[] values = values();
Map<Integer, PathType> codeToStatus = new HashMap<>(values.length);
for (PathType value : values) {
codeToStatus.put(value.code, value);
}
CODE_TO_ENUM = unmodifiableMap(codeToStatus);
}

/**
* Turn a status code into a {@link PathType}.
*/
public static PathType fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

/**
* This method generates the path for the given path input which constitutes multiple fields and characteristics
* of the data.
Expand All @@ -131,7 +163,7 @@ public BlobPath path(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
return generatePath(pathInput, hashAlgorithm);
}

abstract BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm);
protected abstract BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm);

abstract boolean requiresHashAlgorithm();

Expand All @@ -158,7 +190,7 @@ public static PathType parseString(String pathType) {
@PublicApi(since = "2.14.0")
public enum PathHashAlgorithm {

FNV_1A {
FNV_1A(0) {
@Override
long hash(PathInput pathInput) {
String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType()
Expand All @@ -167,6 +199,33 @@ long hash(PathInput pathInput) {
}
};

private final int code;

PathHashAlgorithm(int code) {
this.code = code;
}

public int getCode() {
return code;
}

private static final Map<Integer, PathHashAlgorithm> CODE_TO_ENUM;
static {
PathHashAlgorithm[] values = values();
Map<Integer, PathHashAlgorithm> codeToStatus = new HashMap<>(values.length);
for (PathHashAlgorithm value : values) {
codeToStatus.put(value.code, value);
}
CODE_TO_ENUM = unmodifiableMap(codeToStatus);
}

/**
* Turn a status code into a {@link PathHashAlgorithm}.
*/
public static PathHashAlgorithm fromCode(int code) {
return CODE_TO_ENUM.get(code);

Check warning on line 226 in server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java#L226

Added line #L226 was not covered by tests
}

abstract long hash(PathInput pathInput);

public static PathHashAlgorithm parseString(String pathHashAlgorithm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public RemoteStorePathStrategy(PathType type) {
}

public RemoteStorePathStrategy(PathType type, PathHashAlgorithm hashAlgorithm) {
assert type.requiresHashAlgorithm() == false || Objects.nonNull(hashAlgorithm);
assert (type.requiresHashAlgorithm() == false && Objects.isNull(hashAlgorithm)) || Objects.nonNull(hashAlgorithm);
this.type = Objects.requireNonNull(type);
this.hashAlgorithm = hashAlgorithm;
}
Expand All @@ -55,7 +55,7 @@ public String toString() {
}

public BlobPath generatePath(PathInput pathInput) {
return type.generatePath(pathInput, hashAlgorithm);
return type.path(pathInput, hashAlgorithm);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@

package org.opensearch.index.remote;

import org.opensearch.Version;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.IndicesService;

import java.util.function.Supplier;

/**
* Determines the {@link RemoteStorePathStrategy} at the time of index metadata creation.
*
Expand All @@ -22,13 +25,22 @@ public class RemoteStorePathStrategyResolver {

private volatile PathType type;

public RemoteStorePathStrategyResolver(ClusterSettings clusterSettings) {
private final Supplier<Version> minNodeVersionSupplier;

public RemoteStorePathStrategyResolver(ClusterSettings clusterSettings, Supplier<Version> minNodeVersionSupplier) {
this.minNodeVersionSupplier = minNodeVersionSupplier;
type = clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING);
clusterSettings.addSettingsUpdateConsumer(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, this::setType);
}

public RemoteStorePathStrategy get() {
return new RemoteStorePathStrategy(type, PathHashAlgorithm.FNV_1A);
PathType pathType;
PathHashAlgorithm pathHashAlgorithm;
// Min node version check ensures that we are enabling the new prefix type only when all the nodes understand it.
pathType = Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 ? type : PathType.FIXED;
// If the path type is fixed, hash algorithm is not applicable.
pathHashAlgorithm = pathType == PathType.FIXED ? null : PathHashAlgorithm.FNV_1A;
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
}

private void setType(PathType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
Expand Down Expand Up @@ -412,8 +410,7 @@ void recoverFromSnapshotAndRemoteStore(
remoteStoreRepository,
indexUUID,
shardId,
new RemoteStorePathStrategy(PathType.FIXED)
// TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot
shallowCopyShardMetadata.getRemoteStorePathStrategy()

Check warning on line 413 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L413

Added line #L413 was not covered by tests
);
sourceRemoteDirectory.initializeToSpecificCommit(
primaryTerm,
Expand Down
Loading

0 comments on commit 6b53314

Please sign in to comment.