From db2532c6a86e21fcace7ad1efefade94094b0ee7 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Fri, 2 Dec 2022 15:58:45 +0000 Subject: [PATCH] Add test for failure case and refactor some code. Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationIT.java | 50 ++++++++++++------- .../cluster/IndicesClusterStateService.java | 3 +- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 16c6c3e441c8e..d2f1238d060a2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -10,6 +10,7 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import org.junit.BeforeClass; +import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.indices.segments.IndexShardSegments; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; @@ -23,10 +24,12 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.Index; import org.opensearch.index.IndexModule; @@ -70,7 +73,7 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase { @BeforeClass public static void assumeFeatureFlag() { - assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); + assumeTrue("Segment replication Feature flag is enabled", true); } @Override @@ -198,16 +201,14 @@ public void testCancelPrimaryAllocation() throws Exception { } /** - * This test adds a new replica shard to an existing cluster which already has few docs inserted before adding replica. - * We don't perform any refresh on index and assert new replica shard on doc hit count. - * This test makes sure that when a new replica is added to an existing cluster it gets all latest segments from primary even without a refresh. + * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. */ - public void testAddNewReplica() throws Exception { + public void testAddNewReplicaFailure() throws Exception { logger.info("--> starting [Primary Node] ..."); final String primary = internalCluster().startNode(); logger.info("--> creating test index ..."); - prepareCreate(INDEX_NAME, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get(); + prepareCreate(INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)).get(); logger.info("--> index 10 docs"); for (int i = 0; i < 10; i++) { @@ -226,17 +227,36 @@ public void testAddNewReplica() throws Exception { logger.info("--> start replica node"); final String replica = internalCluster().startNode(); + + // Mock transport service to add behaviour of throwing corruption exception during segment replication process. + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + primary + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replica), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + throw new OpenSearchCorruptionException("expected"); + } + connection.sendRequest(requestId, action, request, options); + } + ); + + // Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster ClusterHealthResponse clusterHealthResponse = client().admin() .cluster() .prepareHealth() .setWaitForEvents(Priority.LANGUID) .setWaitForNodes("2") .setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(2)) .execute() .actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - waitForReplicaUpdate(); - assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(true)); + ensureYellow(INDEX_NAME); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replica); + assertEquals(false, indicesService.hasIndex(resolveIndex(INDEX_NAME))); } public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { @@ -497,18 +517,14 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String replicaNode = internalCluster().startNode(); ensureGreen(INDEX_NAME); - client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); + refresh(INDEX_NAME); waitForReplicaUpdate(); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); - - IndexShard primaryShard = getIndexShard(primaryNode); - IndexShard replicaShard = getIndexShard(replicaNode); - assertEquals( - primaryShard.translogStats().estimatedNumberOfOperations(), - replicaShard.translogStats().estimatedNumberOfOperations() - ); assertSegmentStats(REPLICA_COUNT); } diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index ccf73c6e5efbf..b5587400ff9cf 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -785,7 +785,7 @@ public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id()); // For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before it // is marked as Started. - if (indexShard.indexSettings().isSegRepEnabled() + if (indexShard != null && indexShard.indexSettings().isSegRepEnabled() && shardRouting.primary() == false && shardRouting.state() == ShardRoutingState.INITIALIZING && indexShard.state() == IndexShardState.POST_RECOVERY) { @@ -829,6 +829,7 @@ public void onReplicationFailure( logger.error("replication failure", e); indexShard.failShard("replication failure", e); } + handleRecoveryFailure(shardRouting, sendShardFailure, e); } } );