Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RW Separation] Add polling segment replication for search replicas #15627

Merged
merged 8 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Reader Writer Separation] Add experimental search replica shard type to achieve reader writer separation ([#15237](https://github.com/opensearch-project/OpenSearch/pull/15237))
- [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

private Boolean useRemoteStore;

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (useRemoteStore) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}
return super.nodeSettings(nodeOrdinal);
}

@After
public void teardown() {
if (useRemoteStore) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, primary, replica);
}

}
54 changes: 53 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
Expand Down Expand Up @@ -629,6 +630,56 @@
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
return newIndexService(

Check warning on line 634 in server/src/main/java/org/opensearch/index/IndexModule.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexModule.java#L634

Added line #L634 was not covered by tests
indexCreationContext,
environment,
xContentRegistry,
shardStoreDeleter,
circuitBreakerService,
bigArrays,
threadPool,
scriptService,
clusterService,
client,
indicesQueryCache,
mapperRegistry,
indicesFieldDataCache,
namedWriteableRegistry,
idFieldDataEnabled,
valuesSourceRegistry,
remoteDirectoryFactory,
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
recoverySettings,
remoteStoreSettings,
(s) -> {}

Check warning on line 656 in server/src/main/java/org/opensearch/index/IndexModule.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexModule.java#L656

Added line #L656 was not covered by tests
);
}

public IndexService newIndexService(
IndexService.IndexCreationContext indexCreationContext,
NodeEnvironment environment,
NamedXContentRegistry xContentRegistry,
IndexService.ShardStoreDeleter shardStoreDeleter,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
ScriptService scriptService,
ClusterService clusterService,
Client client,
IndicesQueryCache indicesQueryCache,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -689,7 +740,8 @@
recoverySettings,
remoteStoreSettings,
fileCache,
compositeIndexSettings
compositeIndexSettings,
replicator
);
success = true;
return indexService;
Expand Down
70 changes: 68 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.common.collect.MapBuilder.newMapBuilder;
import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING;
import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings;

/**
Expand Down Expand Up @@ -174,6 +175,7 @@
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
private volatile AsyncReplicationTask asyncReplicationTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
Expand All @@ -194,6 +196,7 @@
private final RemoteStoreSettings remoteStoreSettings;
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -231,7 +234,8 @@
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -306,11 +310,15 @@
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
this.asyncReplicationTask = new AsyncReplicationTask(this);
}
this.translogFactorySupplier = translogFactorySupplier;
this.recoverySettings = recoverySettings;
this.remoteStoreSettings = remoteStoreSettings;
this.compositeIndexSettings = compositeIndexSettings;
this.fileCache = fileCache;
this.replicator = replicator;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -386,7 +394,8 @@
recoverySettings,
remoteStoreSettings,
null,
null
null,
s -> {}

Check warning on line 398 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L398

Added line #L398 was not covered by tests
);
}

Expand All @@ -395,6 +404,11 @@
&& indexCreationContext == IndexCreationContext.CREATE_INDEX); // metadata verification needs a mapper service
}

// visible for tests
AsyncReplicationTask getReplicationTask() {
return asyncReplicationTask;
}

/**
* Context for index creation
*
Expand Down Expand Up @@ -1067,11 +1081,22 @@
}
onRefreshIntervalChange();
updateFsyncTaskIfNecessary();
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
updateReplicationTask();
}
}

metadataListeners.forEach(c -> c.accept(newIndexMetadata));
}

private void updateReplicationTask() {
try {
asyncReplicationTask.close();
} finally {
asyncReplicationTask = new AsyncReplicationTask(this);
}
}

/**
* Called whenever the refresh interval changes. This can happen in 2 cases -
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for
Expand Down Expand Up @@ -1336,6 +1361,47 @@
}
}

final class AsyncReplicationTask extends BaseAsyncTask {

AsyncReplicationTask(IndexService indexService) {
super(indexService, indexService.getRefreshInterval());
}

@Override
protected void runInternal() {
indexService.maybeSyncSegments(false);
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}

@Override
public String toString() {
return "replication";
}

@Override
protected boolean mustReschedule() {
return indexSettings.isSegRepEnabledOrRemoteNode() && super.mustReschedule();
}
}

private void maybeSyncSegments(boolean force) {
if (getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
if (shard.routingEntry().isSearchOnly() && shard.routingEntry().active()) {
replicator.accept(shard);

Check warning on line 1396 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1396

Added line #L1396 was not covered by tests
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {

Check warning on line 1398 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1398

Added line #L1398 was not covered by tests
// do nothing
}
}

Check warning on line 1401 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1400-L1401

Added lines #L1400 - L1401 were not covered by tests
}
}

final class AsyncTrimTranslogTask extends BaseAsyncTask {

AsyncTrimTranslogTask(IndexService indexService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1251,12 +1251,13 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return this.latestReplicationCheckpoint;
}

private boolean isPrimaryRelocation(String allocationId) {
// skip any shard that is a relocating primary or search only replica (not tracked by primary)
private boolean shouldSkipReplicationTimer(String allocationId) {
Optional<ShardRouting> shardRouting = routingTable.shards()
.stream()
.filter(routing -> routing.allocationId().getId().equals(allocationId))
.findAny();
return shardRouting.isPresent() && shardRouting.get().primary();
return shardRouting.isPresent() && (shardRouting.get().primary() || shardRouting.get().isSearchOnly());
}

private void createReplicationLagTimers() {
Expand All @@ -1268,7 +1269,7 @@ private void createReplicationLagTimers() {
// it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event.
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& isPrimaryRelocation(allocationId) == false
&& shouldSkipReplicationTimer(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& (indexSettings.isSegRepLocalEnabled() == true
|| isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId()))) {
Expand Down Expand Up @@ -1302,7 +1303,7 @@ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpo
final CheckpointState cps = e.getValue();
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& isPrimaryRelocation(e.getKey()) == false
&& shouldSkipReplicationTimer(e.getKey()) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
Expand Down Expand Up @@ -1330,7 +1331,7 @@ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats
entry -> entry.getKey().equals(this.shardAllocationId) == false
&& entry.getValue().inSync
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
&& isPrimaryRelocation(entry.getKey()) == false
&& shouldSkipReplicationTimer(entry.getKey()) == false
/*Check if the current primary shard is migrating to remote and
all the other shard copies of the same index still hasn't completely moved over
to the remote enabled nodes. Ensures that:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final SearchRequestStats searchRequestStats;
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;

@Override
protected void doStart() {
Expand Down Expand Up @@ -395,7 +396,8 @@ public IndicesService(
CacheService cacheService,
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -504,6 +506,7 @@ protected void closeInternal() {
this.remoteStoreSettings = remoteStoreSettings;
this.compositeIndexSettings = compositeIndexSettings;
this.fileCache = fileCache;
this.replicator = replicator;
}

public IndicesService(
Expand Down Expand Up @@ -564,6 +567,7 @@ public IndicesService(
cacheService,
remoteStoreSettings,
null,
null,
null
);
}
Expand Down Expand Up @@ -980,7 +984,8 @@ private synchronized IndexService createIndexService(
translogFactorySupplier,
this::getClusterDefaultRefreshInterval,
this.recoverySettings,
this.remoteStoreSettings
this.remoteStoreSettings,
replicator
);
}

Expand Down
Loading
Loading