Skip to content

Commit

Permalink
Remove unnecessary tests and fix tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Feb 10, 2023
1 parent 138c56b commit d15da00
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryAction;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsAction;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand All @@ -51,11 +46,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockTransportService.TestPlugin.class);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testNodesInfoTimeout() {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -157,55 +147,6 @@ public void testRecoveriesWithTimeout() {
assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

public void testSegmentReplicationStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String dataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String anotherDataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);

int numShards = 4;
assertAcked(
prepareCreate(
"test-index",
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index("test-index", "doc", Integer.toString(i));
}
refresh("test-index");
ensureSearchable("test-index");

// Happy case
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats()
.get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards * 2));

// simulate timeout on bad node.
simulateTimeoutAtTransport(dataNode, anotherDataNode, SegmentReplicationStatsAction.NAME);

// verify response with bad node.
segmentReplicationStatsResponse = dataNodeClient().admin().indices().prepareSegmentReplicationStats().get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getFailedShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

public void testStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest
singleIndexWithSegmentReplicationDisabled = shardRouting.getIndexName();
return null;
}
if (indexShard.indexSettings().isSegRepEnabled() == false) {
if (indexShard.indexSettings().isSegRepEnabled() == false || shardRouting.primary()) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.opensearch.indices.replication.common.ReplicationTimer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* ReplicationState implementation to track Segment Replication events.
Expand Down Expand Up @@ -76,10 +76,9 @@ public static Stage fromId(byte id) {

private Stage stage;
private ReplicationLuceneIndex index;
private final ReplicationTimer overallTimer;

// Timing data will have as many entries as stages, plus one
private final Map<String, Long> timingData = new ConcurrentHashMap<>(Stage.values().length + 1);
private final ReplicationTimer overallTimer;
private final Map<String, Long> timingData;
private final ReplicationTimer stageTimer;
private long replicationId;
private final ShardRouting shardRouting;
Expand Down Expand Up @@ -153,6 +152,8 @@ public SegmentReplicationState(
this.replicationId = replicationId;
this.sourceDescription = sourceDescription;
this.targetNode = targetNode;
// Timing data will have as many entries as stages, plus one
timingData = new HashMap<>(Stage.values().length + 1);
overallTimer = new ReplicationTimer();
stageTimer = new ReplicationTimer();
setStage(Stage.INIT);
Expand All @@ -166,6 +167,7 @@ public SegmentReplicationState(StreamInput in) throws IOException {
replicationId = in.readLong();
overallTimer = new ReplicationTimer(in);
stageTimer = new ReplicationTimer(in);
timingData = in.readMap(StreamInput::readString, StreamInput::readLong);
sourceDescription = in.readString();
targetNode = new DiscoveryNode(in);
}
Expand All @@ -178,6 +180,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(replicationId);
overallTimer.writeTo(out);
stageTimer.writeTo(out);

// Copy of timingData is created to avoid concurrent modification of timingData map.
Map<String, Long> timingDataCopy = new HashMap<>();
for (Map.Entry<String, Long> entry : timingDataCopy.entrySet()) {
timingDataCopy.put(entry.getKey(), entry.getValue());
}
out.writeMap(timingDataCopy, StreamOutput::writeString, StreamOutput::writeLong);
out.writeString(sourceDescription);
targetNode.writeTo(out);
}
Expand Down Expand Up @@ -270,11 +279,13 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
builder.startObject(SegmentReplicationState.Fields.INDEX);
index.toXContent(builder, params);
builder.endObject();
builder.field(Fields.REPLICATING_STAGE, getReplicatingStageTime());
builder.field(Fields.GET_CHECKPOINT_INFO_STAGE, getGetCheckpointInfoStageTime());
builder.field(Fields.FILE_DIFF_STAGE, getFileDiffStageTime());
builder.field(Fields.GET_FILES_STAGE, getGetFileStageTime());
builder.field(Fields.FINALIZE_REPLICATION_STAGE, getFinalizeReplicationStageTime());
if (timingData.isEmpty() == false) {
builder.field(Fields.REPLICATING_STAGE, getReplicatingStageTime());
builder.field(Fields.GET_CHECKPOINT_INFO_STAGE, getGetCheckpointInfoStageTime());
builder.field(Fields.FILE_DIFF_STAGE, getFileDiffStageTime());
builder.field(Fields.GET_FILES_STAGE, getGetFileStageTime());
builder.field(Fields.FINALIZE_REPLICATION_STAGE, getFinalizeReplicationStageTime());
}

return builder;
}
Expand Down

0 comments on commit d15da00

Please sign in to comment.