Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize primary shard movement during shard relocation #1445

Merged
merged 12 commits into from
Jan 20, 2022
Merged
130 changes: 103 additions & 27 deletions server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;

Expand All @@ -48,63 +49,140 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards
* that are hosted on that nodes. Each {@link RoutingNode} has a unique node id that can be used to identify the node.
*/
public class RoutingNode implements Iterable<ShardRouting> {

static class BucketedShards implements Iterable<ShardRouting> {
private final Tuple<LinkedHashMap<ShardId, ShardRouting>, LinkedHashMap<ShardId, ShardRouting>> shardTuple; // LinkedHashMap to
// preserve order

BucketedShards(LinkedHashMap<ShardId, ShardRouting> primaryShards, LinkedHashMap<ShardId, ShardRouting> replicaShards) {
this.shardTuple = new Tuple(primaryShards, replicaShards);
}

public boolean isEmpty() {
return this.shardTuple.v1().isEmpty() && this.shardTuple.v2().isEmpty();
}

public int size() {
return this.shardTuple.v1().size() + this.shardTuple.v2().size();
}

public boolean containsKey(ShardId shardId) {
return this.shardTuple.v1().containsKey(shardId) || this.shardTuple.v2().containsKey(shardId);
}

public ShardRouting get(ShardId shardId) {
if (this.shardTuple.v1().containsKey(shardId)) {
return this.shardTuple.v1().get(shardId);
}
return this.shardTuple.v2().get(shardId);
}

public ShardRouting add(ShardRouting shardRouting) {
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
return put(shardRouting.shardId(), shardRouting);
}

public ShardRouting put(ShardId shardId, ShardRouting shardRouting) {
ShardRouting ret;
if (shardRouting.primary()) {
ret = this.shardTuple.v1().put(shardId, shardRouting);
if (this.shardTuple.v2().containsKey(shardId)) {
ret = this.shardTuple.v2().remove(shardId);
}
} else {
ret = this.shardTuple.v2().put(shardId, shardRouting);
if (this.shardTuple.v1().containsKey(shardId)) {
ret = this.shardTuple.v1().remove(shardId);
}
}

return ret;
}

public ShardRouting remove(ShardId shardId) {
if (this.shardTuple.v1().containsKey(shardId)) {
return this.shardTuple.v1().remove(shardId);
}
return this.shardTuple.v2().remove(shardId);
}

@Override
public Iterator<ShardRouting> iterator() {
final Iterator<ShardRouting> primaryIterator = Collections.unmodifiableCollection(this.shardTuple.v1().values()).iterator();
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
final Iterator<ShardRouting> replicaIterator = Collections.unmodifiableCollection(this.shardTuple.v2().values()).iterator();
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
return primaryIterator.hasNext() || replicaIterator.hasNext();
}

@Override
public ShardRouting next() {
if (primaryIterator.hasNext()) {
return primaryIterator.next();
}
return replicaIterator.next();
}
};
}
}

private final String nodeId;

private final DiscoveryNode node;

private final LinkedHashMap<ShardId, ShardRouting> shards; // LinkedHashMap to preserve order
private final BucketedShards shards;

private final LinkedHashSet<ShardRouting> initializingShards;

private final LinkedHashSet<ShardRouting> relocatingShards;

private final HashMap<Index, LinkedHashSet<ShardRouting>> shardsByIndex;

public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this(nodeId, node, buildShardRoutingMap(shards));
}

RoutingNode(String nodeId, DiscoveryNode node, LinkedHashMap<ShardId, ShardRouting> shards) {
public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shardRoutings) {
this.nodeId = nodeId;
this.node = node;
this.shards = shards;
final LinkedHashMap<ShardId, ShardRouting> primaryShards = new LinkedHashMap<>();
final LinkedHashMap<ShardId, ShardRouting> replicaShards = new LinkedHashMap<>();
this.shards = new BucketedShards(primaryShards, replicaShards);
this.relocatingShards = new LinkedHashSet<>();
this.initializingShards = new LinkedHashSet<>();
this.shardsByIndex = new LinkedHashMap<>();
for (ShardRouting shardRouting : shards.values()) {

for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.initializing()) {
initializingShards.add(shardRouting);
} else if (shardRouting.relocating()) {
relocatingShards.add(shardRouting);
}
shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting);
}
assert invariant();
}

private static LinkedHashMap<ShardId, ShardRouting> buildShardRoutingMap(ShardRouting... shardRoutings) {
final LinkedHashMap<ShardId, ShardRouting> shards = new LinkedHashMap<>();
for (ShardRouting shardRouting : shardRoutings) {
ShardRouting previousValue = shards.put(shardRouting.shardId(), shardRouting);
ShardRouting previousValue;
if (shardRouting.primary()) {
previousValue = primaryShards.put(shardRouting.shardId(), shardRouting);
} else {
previousValue = replicaShards.put(shardRouting.shardId(), shardRouting);
}

if (previousValue != null) {
throw new IllegalArgumentException(
"Cannot have two different shards with same shard id " + shardRouting.shardId() + " on same node "
);
}
}
return shards;

assert invariant();
}

@Override
public Iterator<ShardRouting> iterator() {
return Collections.unmodifiableCollection(shards.values()).iterator();
return shards.iterator();
}

/**
Expand Down Expand Up @@ -139,7 +217,7 @@ public int size() {
*/
void add(ShardRouting shard) {
assert invariant();
if (shards.containsKey(shard.shardId())) {
if (shards.add(shard) != null) {
throw new IllegalStateException(
"Trying to add a shard "
+ shard.shardId()
Expand All @@ -152,7 +230,6 @@ void add(ShardRouting shard) {
+ "]"
);
}
shards.put(shard.shardId(), shard);

if (shard.initializing()) {
initializingShards.add(shard);
Expand Down Expand Up @@ -322,7 +399,7 @@ public int numberOfOwningShardsForIndex(final Index index) {
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n");
for (ShardRouting entry : shards.values()) {
for (ShardRouting entry : shards) {
sb.append("--------").append(entry.shortSummary()).append('\n');
}
return sb.toString();
Expand All @@ -345,7 +422,9 @@ public String toString() {
}

public List<ShardRouting> copyShards() {
return new ArrayList<>(shards.values());
List<ShardRouting> result = new ArrayList<>();
shards.forEach(result::add);
return result;
}

public boolean isEmpty() {
Expand All @@ -355,23 +434,20 @@ public boolean isEmpty() {
private boolean invariant() {

// initializingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsInitializing = shards.values()
.stream()
Collection<ShardRouting> shardRoutingsInitializing = StreamSupport.stream(shards.spliterator(), false)
.filter(ShardRouting::initializing)
.collect(Collectors.toList());
assert initializingShards.size() == shardRoutingsInitializing.size();
assert initializingShards.containsAll(shardRoutingsInitializing);

// relocatingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsRelocating = shards.values()
.stream()
Collection<ShardRouting> shardRoutingsRelocating = StreamSupport.stream(shards.spliterator(), false)
.filter(ShardRouting::relocating)
.collect(Collectors.toList());
assert relocatingShards.size() == shardRoutingsRelocating.size();
assert relocatingShards.containsAll(shardRoutingsRelocating);

final Map<Index, Set<ShardRouting>> shardRoutingsByIndex = shards.values()
.stream()
final Map<Index, Set<ShardRouting>> shardRoutingsByIndex = StreamSupport.stream(shards.spliterator(), false)
.collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet()));
assert shardRoutingsByIndex.equals(shardsByIndex);

Expand Down
Loading