Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Sep 4, 2024
1 parent d7d138f commit 3bcdf47
Showing 1 changed file with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1251,13 +1251,12 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return this.latestReplicationCheckpoint;
}

// skip any shard that is a relocating primary or search only replica (not tracked by primary)
private boolean shouldSkipReplicationTimer(String allocationId) {
private boolean isPrimaryRelocation(String allocationId) {
Optional<ShardRouting> shardRouting = routingTable.shards()
.stream()
.filter(routing -> routing.allocationId().getId().equals(allocationId))
.findAny();
return shardRouting.isPresent() && (shardRouting.get().primary() || shardRouting.get().isSearchOnly());
return shardRouting.isPresent() && shardRouting.get().primary();
}

private void createReplicationLagTimers() {
Expand All @@ -1269,7 +1268,7 @@ private void createReplicationLagTimers() {
// it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event.
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& shouldSkipReplicationTimer(allocationId) == false
&& isPrimaryRelocation(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& (indexSettings.isSegRepLocalEnabled() == true
|| isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId()))) {
Expand Down Expand Up @@ -1303,7 +1302,7 @@ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpo
final CheckpointState cps = e.getValue();
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& shouldSkipReplicationTimer(e.getKey()) == false
&& isPrimaryRelocation(e.getKey()) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
Expand Down Expand Up @@ -1331,7 +1330,7 @@ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats
entry -> entry.getKey().equals(this.shardAllocationId) == false
&& entry.getValue().inSync
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
&& shouldSkipReplicationTimer(entry.getKey()) == false
&& isPrimaryRelocation(entry.getKey()) == false
/*Check if the current primary shard is migrating to remote and
all the other shard copies of the same index still hasn't completely moved over
to the remote enabled nodes. Ensures that:
Expand Down

0 comments on commit 3bcdf47

Please sign in to comment.