Skip to content

Commit

Permalink
Add test for failure case and refactor some code.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Dec 2, 2022
1 parent d50a9e0 commit db2532c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
Expand All @@ -23,10 +24,12 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -70,7 +73,7 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase {

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
assumeTrue("Segment replication Feature flag is enabled", true);
}

@Override
Expand Down Expand Up @@ -198,16 +201,14 @@ public void testCancelPrimaryAllocation() throws Exception {
}

/**
* This test adds a new replica shard to an existing cluster which already has few docs inserted before adding replica.
* We don't perform any refresh on index and assert new replica shard on doc hit count.
* This test makes sure that when a new replica is added to an existing cluster it gets all latest segments from primary even without a refresh.
* This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*/
public void testAddNewReplica() throws Exception {
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primary = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(INDEX_NAME, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get();
prepareCreate(INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)).get();

logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
Expand All @@ -226,17 +227,36 @@ public void testAddNewReplica() throws Exception {

logger.info("--> start replica node");
final String replica = internalCluster().startNode();

// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primary
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replica),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
throw new OpenSearchCorruptionException("expected");
}
connection.sendRequest(requestId, action, request, options);
}
);

// Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
waitForReplicaUpdate();
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));
assertThat(clusterHealthResponse.isTimedOut(), equalTo(true));
ensureYellow(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replica);
assertEquals(false, indicesService.hasIndex(resolveIndex(INDEX_NAME)));
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
Expand Down Expand Up @@ -497,18 +517,14 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get();
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);

client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get();
refresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

IndexShard primaryShard = getIndexShard(primaryNode);
IndexShard replicaShard = getIndexShard(replicaNode);
assertEquals(
primaryShard.translogStats().estimatedNumberOfOperations(),
replicaShard.translogStats().estimatedNumberOfOperations()
);
assertSegmentStats(REPLICA_COUNT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
// For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before it
// is marked as Started.
if (indexShard.indexSettings().isSegRepEnabled()
if (indexShard != null && indexShard.indexSettings().isSegRepEnabled()
&& shardRouting.primary() == false
&& shardRouting.state() == ShardRoutingState.INITIALIZING
&& indexShard.state() == IndexShardState.POST_RECOVERY) {
Expand Down Expand Up @@ -829,6 +829,7 @@ public void onReplicationFailure(
logger.error("replication failure", e);
indexShard.failShard("replication failure", e);
}
handleRecoveryFailure(shardRouting, sendShardFailure, e);
}
}
);
Expand Down

0 comments on commit db2532c

Please sign in to comment.