From aef15f9474970089c21aa250c2a9d74dbb2c1d67 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 3 Sep 2024 16:59:47 -0700 Subject: [PATCH] remove separate interval setting and rely on refresh interval Signed-off-by: Marc Handalian --- .../index/SegmentReplicationPressureIT.java | 3 - .../SearchReplicaReplicationIT.java | 85 +++++++++++++++++++ .../replication/SegmentReplicationBaseIT.java | 33 +------ .../replication/SegmentReplicationIT.java | 43 ++-------- .../SegmentReplicationStatsIT.java | 7 +- .../indices/settings/SearchOnlyReplicaIT.java | 44 ---------- .../common/settings/IndexScopedSettings.java | 2 +- .../org/opensearch/index/IndexService.java | 12 +-- .../org/opensearch/index/IndexSettings.java | 36 -------- .../index/seqno/ReplicationTracker.java | 11 +-- .../RemoteStoreReplicationSource.java | 4 +- .../checkpoint/PublishCheckpointAction.java | 2 +- ...SegmentReplicationCheckpointPublisher.java | 1 - .../opensearch/index/IndexServiceTests.java | 18 ++-- 14 files changed, 116 insertions(+), 185 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java index f1a5247f33b78..033ea75b68958 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java @@ -36,7 +36,6 @@ import static java.util.Arrays.asList; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; -import static org.opensearch.index.IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING; import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS; import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING; import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_BACKPRESSURE_SETTING; @@ -69,8 +68,6 @@ public Settings indexSettings() { .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put("index.refresh_interval", -1) - // TODO: Remove this once metrics are fixed for polling based segrep - .put(INDEX_REPLICATION_INTERVAL_SETTING.getKey(), 0, TimeUnit.SECONDS) .build(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java new file mode 100644 index 0000000000000..a1b512c326ac5 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationIT.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import java.nio.file.Path; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT { + + private static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected Path absolutePath; + + private Boolean useRemoteStore; + + @Before + public void randomizeRemoteStoreEnabled() { + useRemoteStore = randomBoolean(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + if (useRemoteStore) { + if (absolutePath == null) { + absolutePath = randomRepoPath().toAbsolutePath(); + } + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)) + .build(); + } + return super.nodeSettings(nodeOrdinal); + } + + @After + public void teardown() { + if (useRemoteStore) { + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); + } + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build(); + } + + public void testReplication() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + final int docCount = 10; + for (int i = 0; i < docCount; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); + } + refresh(INDEX_NAME); + waitForSearchableDocs(docCount, primary, replica); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 348957005591e..93ce21933dad5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -21,7 +21,6 @@ import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; @@ -34,11 +33,9 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; -import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; -import org.junit.Before; import java.io.IOException; import java.util.Arrays; @@ -52,9 +49,6 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; -import static org.opensearch.index.IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING; -import static org.opensearch.test.OpenSearchIntegTestCase.client; -import static org.opensearch.test.OpenSearchTestCase.assertBusy; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase { @@ -63,39 +57,20 @@ public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase { protected static final int SHARD_COUNT = 1; protected static final int REPLICA_COUNT = 1; - private boolean enablePolling; - - @Before - public void randomizePollingEnabled() { - this.enablePolling = randomBoolean(); - } - @Override protected Collection> nodePlugins() { - return asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class); - } - - @Override - protected Settings featureFlagSettings() { - return Settings.builder() - .put(super.featureFlagSettings()) - .put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, enablePolling) - .build(); + return asList(MockTransportService.TestPlugin.class); } @Override public Settings indexSettings() { - Settings.Builder builder = Settings.builder() + return Settings.builder() .put(super.indexSettings()) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); - // randomly use polling based replication - if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(featureFlagSettings())) { - builder.put(INDEX_REPLICATION_INTERVAL_SETTING.getKey(), 1, TimeUnit.SECONDS); - } - return builder.build(); + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); } @Nullable diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 2b967b4f710c9..2421a1a507372 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -63,8 +63,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.IndexModule; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.ReplicationStats; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationPressureService; @@ -74,7 +72,6 @@ import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.engine.NRTReplicationReaderManager; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; @@ -914,21 +911,13 @@ public void testDropPrimaryDuringReplication() throws Exception { @TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE") public void testReplicaHasDiffFilesThanPrimary() throws Exception { final String primaryNode = internalCluster().startDataOnlyNode(); - createIndex( - INDEX_NAME, - Settings.builder() - .put(indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1") - .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) - .build() - ); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); ensureYellow(INDEX_NAME); final String replicaNode = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME); - IndexWriterConfig iwc = newIndexWriterConfig().setUseCompoundFile(true).setOpenMode(IndexWriterConfig.OpenMode.APPEND); + IndexWriterConfig iwc = newIndexWriterConfig().setOpenMode(IndexWriterConfig.OpenMode.APPEND); // create a doc to index int numDocs = 2 + random().nextInt(10); @@ -958,39 +947,19 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { final SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(replicaShard.store().directory()); replicaShard.finalizeReplication(segmentInfos); - ensureYellow(INDEX_NAME); - // set a single doc and refresh, this ensures we have segment 0 present as compound - client().prepareIndex(INDEX_NAME).setId("doc").setSource("field", "value").execute().get(); - refresh(INDEX_NAME); - IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); - - // validate conflicting files are in the store of each shard - Map replicaMetadata = replicaShard.getSegmentMetadataMap(); - Map primaryMetadata = primaryShard.getSegmentMetadataMap(); - final String expectedFileName = "_0.cfe"; - assertTrue(replicaMetadata.containsKey(expectedFileName)); - assertTrue(primaryMetadata.containsKey(expectedFileName)); - assertNotEquals(primaryMetadata.get(expectedFileName).checksum(), replicaMetadata.get(expectedFileName).checksum()); - - // index some more to ensure replication rounds run for both pull & push variations - final int docCount = 5; + final int docCount = scaledRandomIntBetween(10, 20); for (int i = 0; i < docCount; i++) { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get(); // Refresh, this should trigger round of segment replication refresh(INDEX_NAME); } - // wait until the replica dies and comes back - assertBusy(() -> { - final IndexShard newReplicaShard = getIndexShard(replicaNode, INDEX_NAME); - // shard can be null when cluster is yellow - assertNotNull(newReplicaShard); - assertNotEquals(replicaShard.routingEntry().allocationId().getId(), newReplicaShard.routingEntry().allocationId().getId()); - }); ensureGreen(INDEX_NAME); - waitForSearchableDocs(6, primaryNode, replicaNode); + waitForSearchableDocs(docCount, primaryNode, replicaNode); verifyStoreContent(); + final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME); + assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); } public void testPressureServiceStats() throws Exception { diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index bfe82d0efa3a1..d0adec056195a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit; import static java.util.Arrays.asList; -import static org.opensearch.index.IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -41,11 +40,7 @@ public class SegmentReplicationStatsIT extends SegmentReplicationBaseIT { @Override public Settings indexSettings() { - return Settings.builder() - .put(super.indexSettings()) - // TODO: Remove this once metrics are fixed for polling based segrep - .put(INDEX_REPLICATION_INTERVAL_SETTING.getKey(), 0, TimeUnit.SECONDS) - .build(); + return Settings.builder().put(super.indexSettings()).build(); } public void testSegmentReplicationStatsResponse() throws Exception { diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java index 4157d7c51b31c..891230f2cf41b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java @@ -8,8 +8,6 @@ package org.opensearch.indices.settings; -import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; -import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -23,12 +21,10 @@ import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; -import java.util.concurrent.ExecutionException; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; -import static org.opensearch.index.IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -179,46 +175,6 @@ public void testSearchReplicaScaling() { assertActiveSearchShards(0); } - public void testDefaultReplicationSettings() throws ExecutionException, InterruptedException { - internalCluster().startNodes(2); - createIndex( - TEST_INDEX, - Settings.builder() - .put(indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 0) - .build() - ); - ensureGreen(TEST_INDEX); - // assert settings - Metadata metadata = client().admin().cluster().prepareState().get().getState().metadata(); - IndexMetadata indexMetadata = metadata.index(TEST_INDEX); - Settings settings = indexMetadata.getSettings(); - int numSearchReplicas = Integer.parseInt(settings.get(SETTING_NUMBER_OF_SEARCH_REPLICAS)); - assertEquals(0, numSearchReplicas); - - GetSettingsResponse settingsResponse = client().admin() - .indices() - .getSettings(new GetSettingsRequest().indices(TEST_INDEX).includeDefaults(true)) - .get(); - assertEquals("0ms", settingsResponse.getSetting(TEST_INDEX, INDEX_REPLICATION_INTERVAL_SETTING.getKey())); - - // assert cluster state & routing table - assertActiveSearchShards(0); - - // Add a search replica - client().admin() - .indices() - .prepareUpdateSettings(TEST_INDEX) - .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)) - .get(); - ensureGreen(TEST_INDEX); - assertActiveSearchShards(1); - - settingsResponse = client().admin().indices().getSettings(new GetSettingsRequest().indices(TEST_INDEX).includeDefaults(true)).get(); - assertEquals("5s", settingsResponse.getSetting(TEST_INDEX, INDEX_REPLICATION_INTERVAL_SETTING.getKey())); - } - /** * Helper to assert counts of active shards for each type. */ diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 778cd2dc5c137..00c73de1c1780 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -281,7 +281,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { FeatureFlags.TIERED_REMOTE_INDEX, List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING, IndexModule.INDEX_TIERING_STATE), FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, - List.of(IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING, IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING) + List.of(IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING) ); public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS); diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index af9027309de5c..b9fdd6574afeb 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -1095,14 +1095,6 @@ private void updateReplicationInterval() { } } - /** - * Gets the replication interval seen by the index service. - * @return the replication interval. - */ - private TimeValue getReplicationInterval() { - return getIndexSettings().getReplicationInterval(); - } - /** * Called whenever the refresh interval changes. This can happen in 2 cases - * 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for @@ -1370,7 +1362,7 @@ public String toString() { final class AsyncReplicationTask extends BaseAsyncTask { AsyncReplicationTask(IndexService indexService) { - super(indexService, indexService.getReplicationInterval()); + super(indexService, indexService.getRefreshInterval()); } @Override @@ -1395,7 +1387,7 @@ protected boolean mustReschedule() { } private void maybeSyncSegments(boolean force) { - if (getReplicationInterval().millis() > 0 || force) { + if (getRefreshInterval().millis() > 0 || force) { for (IndexShard shard : this.shards.values()) { try { if (shard.routingEntry().primary() == false && shard.routingEntry().active()) { diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index be62af930cbeb..c55d7bfec2b74 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -70,7 +70,6 @@ import java.util.function.UnaryOperator; import static org.opensearch.Version.V_2_7_0; -import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING; import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY; import static org.opensearch.index.codec.fuzzy.FuzzySetParameters.DEFAULT_FALSE_POSITIVE_PROBABILITY; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING; @@ -372,20 +371,6 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); - /** - * Setting to control the rate at which replicas poll for new segments. By default the value is zero indicating - * pull based replication is disabled. - */ - public static final TimeValue DEFAULT_REPLICATION_INTERVAL = TimeValue.ZERO; - public static final TimeValue DEFAULT_SEARCH_REPLICA_INTERVAL = TimeValue.timeValueSeconds(5); - public static final TimeValue MINIMUM_REPLICATION_INTERVAL = TimeValue.ZERO; - public static final Setting INDEX_REPLICATION_INTERVAL_SETTING = Setting.timeSetting("index.replication_interval", (s) -> { - if (INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(s) > 0) { - return DEFAULT_SEARCH_REPLICA_INTERVAL; - } - return DEFAULT_REPLICATION_INTERVAL; - }, MINIMUM_REPLICATION_INTERVAL, Property.Dynamic, Property.IndexScope); - public static final Setting INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING = Setting.byteSizeSetting( "index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), @@ -802,7 +787,6 @@ public static IndexMergePolicy fromString(String text) { private volatile Translog.Durability durability; private volatile TimeValue syncInterval; private volatile TimeValue refreshInterval; - private volatile TimeValue replicationInterval; private volatile ByteSizeValue flushThresholdSize; private volatile TimeValue translogRetentionAge; private volatile ByteSizeValue translogRetentionSize; @@ -1195,15 +1179,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING, this::setRemoteStoreTranslogRepository ); - - if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(nodeSettings)) { - // Move setting registration to IndexScopedSettings when feature flag is removed. - scopedSettings.registerSetting(INDEX_REPLICATION_INTERVAL_SETTING); - setReplicationInterval(scopedSettings.get(INDEX_REPLICATION_INTERVAL_SETTING)); - scopedSettings.addSettingsUpdateConsumer(INDEX_REPLICATION_INTERVAL_SETTING, this::setReplicationInterval); - } else { - replicationInterval = DEFAULT_REPLICATION_INTERVAL; - } } private void setSearchIdleAfter(TimeValue searchIdleAfter) { @@ -1254,10 +1229,6 @@ private void setRefreshInterval(TimeValue timeValue) { this.refreshInterval = timeValue; } - private void setReplicationInterval(TimeValue timeValue) { - this.replicationInterval = timeValue; - } - /** * Returns the settings for this index. These settings contain the node and index level settings where * settings that are specified on both index and node level are overwritten by the index settings. @@ -1527,13 +1498,6 @@ public TimeValue getRefreshInterval() { return refreshInterval; } - /** - * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled. - */ - public TimeValue getReplicationInterval() { - return replicationInterval; - } - /** * Returns the transaction log threshold size when to forcefully flush the index and clear the transaction log. */ diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index b9cb5e92d0ed1..1e43827afeb47 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1251,12 +1251,13 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { return this.latestReplicationCheckpoint; } - private boolean isPrimaryRelocation(String allocationId) { + // skip any shard that is a relocating primary or search only replica (not tracked by primary) + private boolean shouldSkipReplicationTimer(String allocationId) { Optional shardRouting = routingTable.shards() .stream() .filter(routing -> routing.allocationId().getId().equals(allocationId)) .findAny(); - return shardRouting.isPresent() && shardRouting.get().primary(); + return shardRouting.isPresent() && (shardRouting.get().primary() || shardRouting.get().isSearchOnly()); } private void createReplicationLagTimers() { @@ -1268,7 +1269,7 @@ private void createReplicationLagTimers() { // it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event. if (cps.inSync && replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false - && isPrimaryRelocation(allocationId) == false + && shouldSkipReplicationTimer(allocationId) == false && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint) && (indexSettings.isSegRepLocalEnabled() == true || isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId()))) { @@ -1302,7 +1303,7 @@ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpo final CheckpointState cps = e.getValue(); if (cps.inSync && replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false - && isPrimaryRelocation(e.getKey()) == false + && shouldSkipReplicationTimer(e.getKey()) == false && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint) && cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) { cps.checkpointTimers.get(latestReplicationCheckpoint).start(); @@ -1330,7 +1331,7 @@ public synchronized Set getSegmentReplicationStats entry -> entry.getKey().equals(this.shardAllocationId) == false && entry.getValue().inSync && replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false - && isPrimaryRelocation(entry.getKey()) == false + && shouldSkipReplicationTimer(entry.getKey()) == false /*Check if the current primary shard is migrating to remote and all the other shard copies of the same index still hasn't completely moved over to the remote enabled nodes. Ensures that: diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index c0f379d3d1380..b06b3e0497cf7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -69,9 +69,7 @@ public void getCheckpointMetadata( final RemoteSegmentMetadata mdFile = getRemoteSegmentMetadata(); // During initial recovery flow, the remote store might not // have metadata as primary hasn't uploaded anything yet. - if (mdFile == null - && (indexShard.state().equals(IndexShardState.STARTED) == false - || indexShard.indexSettings().getReplicationInterval().millis() > 0)) { + if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) { listener.onResponse(new CheckpointInfoResponse(checkpoint, Collections.emptyMap(), null)); return; } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 6ad23cd434cf5..d1e2884956f5c 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -99,7 +99,7 @@ protected void doExecute(Task task, PublishCheckpointRequest request, ActionList @Override public ReplicationMode getReplicationMode(IndexShard indexShard) { - if (indexShard.indexSettings().isAssignedOnRemoteNode() && indexShard.indexSettings().getReplicationInterval().millis() <= 0) { + if (indexShard.indexSettings().isAssignedOnRemoteNode()) { return ReplicationMode.FULL_REPLICATION; } return super.getReplicationMode(indexShard); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java index 3c3d5c745b2bd..a35d6fd103dc0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -35,7 +35,6 @@ public SegmentReplicationCheckpointPublisher(PublishAction publishAction) { } public void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) { - if (indexShard.indexSettings().getReplicationInterval().millis() > 0) return; publishAction.publish(indexShard, checkpoint); indexShard.onCheckpointPublished(checkpoint); } diff --git a/server/src/test/java/org/opensearch/index/IndexServiceTests.java b/server/src/test/java/org/opensearch/index/IndexServiceTests.java index 844228e42f932..b2713538711ea 100644 --- a/server/src/test/java/org/opensearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/opensearch/index/IndexServiceTests.java @@ -67,7 +67,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING; -import static org.opensearch.index.IndexSettings.INDEX_REPLICATION_INTERVAL_SETTING; +import static org.opensearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING; import static org.opensearch.index.shard.IndexShardTestCase.getEngine; import static org.opensearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -185,7 +185,7 @@ public void testRefreshTaskIsUpdated() throws Exception { client().admin() .indices() .prepareUpdateSettings("test") - .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)) + .setSettings(Settings.builder().put(INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)) .get(); assertNotSame(refreshTask, indexService.getRefreshTask()); assertTrue(refreshTask.isClosed()); @@ -195,7 +195,7 @@ public void testRefreshTaskIsUpdated() throws Exception { client().admin() .indices() .prepareUpdateSettings("test") - .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "100ms")) + .setSettings(Settings.builder().put(INDEX_REFRESH_INTERVAL_SETTING.getKey(), "100ms")) .get(); assertNotSame(refreshTask, indexService.getRefreshTask()); assertTrue(refreshTask.isClosed()); @@ -209,7 +209,7 @@ public void testRefreshTaskIsUpdated() throws Exception { client().admin() .indices() .prepareUpdateSettings("test") - .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "200ms")) + .setSettings(Settings.builder().put(INDEX_REFRESH_INTERVAL_SETTING.getKey(), "200ms")) .get(); assertNotSame(refreshTask, indexService.getRefreshTask()); assertTrue(refreshTask.isClosed()); @@ -223,7 +223,7 @@ public void testRefreshTaskIsUpdated() throws Exception { client().admin() .indices() .prepareUpdateSettings("test") - .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "200ms")) + .setSettings(Settings.builder().put(INDEX_REFRESH_INTERVAL_SETTING.getKey(), "200ms")) .get(); assertSame(refreshTask, indexService.getRefreshTask()); assertTrue(indexService.getRefreshTask().mustReschedule()); @@ -311,7 +311,7 @@ public void testRefreshActuallyWorks() throws Exception { client().admin() .indices() .prepareUpdateSettings("test") - .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)) + .setSettings(Settings.builder().put(INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)) .get(); // when we update we reschedule the existing task AND fire off an async refresh to make sure we make everything visible // before that this is why we need to wait for the refresh task to be unscheduled and the first doc to be visible @@ -331,7 +331,7 @@ public void testRefreshActuallyWorks() throws Exception { client().admin() .indices() .prepareUpdateSettings("test") - .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1ms")) + .setSettings(Settings.builder().put(INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1ms")) .get(); assertTrue(refreshTask.isClosed()); assertBusy(() -> { @@ -612,7 +612,7 @@ public void testReplicationTask() throws Exception { "segrep_index", Settings.builder() .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) - .put(INDEX_REPLICATION_INTERVAL_SETTING.getKey(), "5s") + .put(INDEX_REFRESH_INTERVAL_SETTING.getKey(), "5s") .build() ); final Index srIndex = indexService.index(); @@ -626,7 +626,7 @@ public void testReplicationTask() throws Exception { client().admin() .indices() .prepareUpdateSettings("segrep_index") - .setSettings(Settings.builder().put(INDEX_REPLICATION_INTERVAL_SETTING.getKey(), "1s")) + .setSettings(Settings.builder().put(INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")) .get(); IndexService.AsyncReplicationTask updatedTask = indexService.getReplicationTask();