Skip to content

Commit

Permalink
remove separate interval setting and rely on refresh interval
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Sep 3, 2024
1 parent 16c6326 commit aef15f9
Show file tree
Hide file tree
Showing 14 changed files with 116 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

import static java.util.Arrays.asList;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.index.IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_BACKPRESSURE_SETTING;
Expand Down Expand Up @@ -69,8 +68,6 @@ public Settings indexSettings() {
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.refresh_interval", -1)
// TODO: Remove this once metrics are fixed for polling based segrep
.put(INDEX_REPLICATION_INTERVAL_SETTING.getKey(), 0, TimeUnit.SECONDS)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

private Boolean useRemoteStore;

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (useRemoteStore) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}
return super.nodeSettings(nodeOrdinal);
}

@After
public void teardown() {
if (useRemoteStore) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, primary, replica);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
Expand All @@ -34,11 +33,9 @@
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -52,9 +49,6 @@
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.opensearch.index.IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING;
import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {
Expand All @@ -63,39 +57,20 @@ public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

private boolean enablePolling;

@Before
public void randomizePollingEnabled() {
this.enablePolling = randomBoolean();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, enablePolling)
.build();
return asList(MockTransportService.TestPlugin.class);
}

@Override
public Settings indexSettings() {
Settings.Builder builder = Settings.builder()
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
// randomly use polling based replication
if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(featureFlagSettings())) {
builder.put(INDEX_REPLICATION_INTERVAL_SETTING.getKey(), 1, TimeUnit.SECONDS);
}
return builder.build();
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
Expand All @@ -74,7 +72,6 @@
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.NodeClosedException;
Expand Down Expand Up @@ -914,21 +911,13 @@ public void testDropPrimaryDuringReplication() throws Exception {
@TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE")
public void testReplicaHasDiffFilesThanPrimary() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1")
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
.build()
);
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
IndexWriterConfig iwc = newIndexWriterConfig().setUseCompoundFile(true).setOpenMode(IndexWriterConfig.OpenMode.APPEND);
IndexWriterConfig iwc = newIndexWriterConfig().setOpenMode(IndexWriterConfig.OpenMode.APPEND);

// create a doc to index
int numDocs = 2 + random().nextInt(10);
Expand Down Expand Up @@ -958,39 +947,19 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {

final SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(replicaShard.store().directory());
replicaShard.finalizeReplication(segmentInfos);

ensureYellow(INDEX_NAME);

// set a single doc and refresh, this ensures we have segment 0 present as compound
client().prepareIndex(INDEX_NAME).setId("doc").setSource("field", "value").execute().get();
refresh(INDEX_NAME);
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);

// validate conflicting files are in the store of each shard
Map<String, StoreFileMetadata> replicaMetadata = replicaShard.getSegmentMetadataMap();
Map<String, StoreFileMetadata> primaryMetadata = primaryShard.getSegmentMetadataMap();
final String expectedFileName = "_0.cfe";
assertTrue(replicaMetadata.containsKey(expectedFileName));
assertTrue(primaryMetadata.containsKey(expectedFileName));
assertNotEquals(primaryMetadata.get(expectedFileName).checksum(), replicaMetadata.get(expectedFileName).checksum());

// index some more to ensure replication rounds run for both pull & push variations
final int docCount = 5;
final int docCount = scaledRandomIntBetween(10, 20);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
// Refresh, this should trigger round of segment replication
refresh(INDEX_NAME);
}
// wait until the replica dies and comes back
assertBusy(() -> {
final IndexShard newReplicaShard = getIndexShard(replicaNode, INDEX_NAME);
// shard can be null when cluster is yellow
assertNotNull(newReplicaShard);
assertNotEquals(replicaShard.routingEntry().allocationId().getId(), newReplicaShard.routingEntry().allocationId().getId());
});
ensureGreen(INDEX_NAME);
waitForSearchableDocs(6, primaryNode, replicaNode);
waitForSearchableDocs(docCount, primaryNode, replicaNode);
verifyStoreContent();
final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME);
assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId());
}

public void testPressureServiceStats() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,14 @@
import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;
import static org.opensearch.index.IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationStatsIT extends SegmentReplicationBaseIT {

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
// TODO: Remove this once metrics are fixed for polling based segrep
.put(INDEX_REPLICATION_INTERVAL_SETTING.getKey(), 0, TimeUnit.SECONDS)
.build();
return Settings.builder().put(super.indexSettings()).build();
}

public void testSegmentReplicationStatsResponse() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.indices.settings;

import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -23,12 +21,10 @@
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -179,46 +175,6 @@ public void testSearchReplicaScaling() {
assertActiveSearchShards(0);
}

public void testDefaultReplicationSettings() throws ExecutionException, InterruptedException {
internalCluster().startNodes(2);
createIndex(
TEST_INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 0)
.build()
);
ensureGreen(TEST_INDEX);
// assert settings
Metadata metadata = client().admin().cluster().prepareState().get().getState().metadata();
IndexMetadata indexMetadata = metadata.index(TEST_INDEX);
Settings settings = indexMetadata.getSettings();
int numSearchReplicas = Integer.parseInt(settings.get(SETTING_NUMBER_OF_SEARCH_REPLICAS));
assertEquals(0, numSearchReplicas);

GetSettingsResponse settingsResponse = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(TEST_INDEX).includeDefaults(true))
.get();
assertEquals("0ms", settingsResponse.getSetting(TEST_INDEX, INDEX_REPLICATION_INTERVAL_SETTING.getKey()));

// assert cluster state & routing table
assertActiveSearchShards(0);

// Add a search replica
client().admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
ensureGreen(TEST_INDEX);
assertActiveSearchShards(1);

settingsResponse = client().admin().indices().getSettings(new GetSettingsRequest().indices(TEST_INDEX).includeDefaults(true)).get();
assertEquals("5s", settingsResponse.getSetting(TEST_INDEX, INDEX_REPLICATION_INTERVAL_SETTING.getKey()));
}

/**
* Helper to assert counts of active shards for each type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
FeatureFlags.TIERED_REMOTE_INDEX,
List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING, IndexModule.INDEX_TIERING_STATE),
FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL,
List.of(IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING, IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING)
List.of(IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING)
);

public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);
Expand Down
12 changes: 2 additions & 10 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1095,14 +1095,6 @@ private void updateReplicationInterval() {
}
}

/**
* Gets the replication interval seen by the index service.
* @return the replication interval.
*/
private TimeValue getReplicationInterval() {
return getIndexSettings().getReplicationInterval();
}

/**
* Called whenever the refresh interval changes. This can happen in 2 cases -
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for
Expand Down Expand Up @@ -1370,7 +1362,7 @@ public String toString() {
final class AsyncReplicationTask extends BaseAsyncTask {

AsyncReplicationTask(IndexService indexService) {
super(indexService, indexService.getReplicationInterval());
super(indexService, indexService.getRefreshInterval());
}

@Override
Expand All @@ -1395,7 +1387,7 @@ protected boolean mustReschedule() {
}

private void maybeSyncSegments(boolean force) {
if (getReplicationInterval().millis() > 0 || force) {
if (getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
if (shard.routingEntry().primary() == false && shard.routingEntry().active()) {
Expand Down
Loading

0 comments on commit aef15f9

Please sign in to comment.