Skip to content

Commit

Permalink
Skip allocating more shards when cluster limit breached
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Jun 19, 2024
1 parent 7239ec7 commit 11cab8c
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.benchmark.routing.allocation;

import org.openjdk.jmh.annotations.*;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.logging.LogConfigurator;
import org.opensearch.common.settings.Settings;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Fork(1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@SuppressWarnings("unused") // invoked by benchmarking framework
public class RerouteBenchmark {
@Param({
// indices| nodes
" 5000| 500|",
})
public String indicesNodes = "1|1";
public int numIndices;
public int numNodes;
public int numShards = 9;
public int numReplicas = 5;


private AllocationService allocationService;
private ClusterState initialClusterState;

@Setup
public void setUp() throws Exception {
LogConfigurator.setNodeName("test");
final String[] params = indicesNodes.split("\\|");
numIndices = toInt(params[0]);
numNodes = toInt(params[1]);

int totalShardCount = (numReplicas + 1) * numShards * numIndices;
allocationService = Allocators.createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.load_awareness.provisioned_capacity", numNodes)
.put("cluster.routing.allocation.load_awareness.skew_factor", "50")
.put("cluster.routing.allocation.node_concurrent_recoveries", "2")
.build()
);
Metadata.Builder mb = Metadata.builder();
for (int i = 1; i <= numIndices; i++) {
mb.put(
IndexMetadata.builder("test_" + i)
.settings(Settings.builder().put("index.version.created", Version.CURRENT))
.numberOfShards(numShards)
.numberOfReplicas(numReplicas)
);
}

Metadata metadata = mb.build();
RoutingTable.Builder rb = RoutingTable.builder();
for (int i = 1; i <= numIndices; i++) {
rb.addAsNew(metadata.index("test_" + i));
}
RoutingTable routingTable = rb.build();
initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(setUpClusterNodes(numNodes))
.build();

// initialClusterState = allocationService.reroute(initialClusterState, "reroute");
// while (initialClusterState.getRoutingNodes().hasUnassignedShards()) {
// initialClusterState = allocationService.applyStartedShards(
// initialClusterState,
// initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
// );
// initialClusterState = allocationService.reroute(initialClusterState, "reroute");
// }
// // Ensure all shards are started
// while (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
// initialClusterState = allocationService.applyStartedShards(
// initialClusterState,
// initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
// );
// }
// assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size() == totalShardCount);
// assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 0);
// assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size() == 0);
}

@Benchmark
public ClusterState measureShardAllocationEmptyCluster() throws Exception {
return allocationService.reroute(initialClusterState, "reroute");
}

private int toInt(String v) {
return Integer.valueOf(v.trim());
}

private DiscoveryNodes.Builder setUpClusterNodes(int nodes) {
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= nodes; i++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("zone", "zone_" + (i % 3));
nb.add(Allocators.newNode("node_0_" + i, attributes));
}
return nb;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ public int getRelocatingShardCount() {
return relocatingShards;
}

public int getInitializingShardCount() {
return inactiveShardCount;
}

/**
* Returns all shards that are not in the state UNASSIGNED with the same shard
* ID as the given shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,13 @@ void allocateUnassigned() {
int primaryLength = primary.length;
ArrayUtil.timSort(primary, comparator);
do {
if (allocation.deciders().canAllocateAnyShard(allocation).type() == Decision.Type.THROTTLE) {
logger.info(
"Cannot allocate any shard in the cluster due to cluster concurrent recoveries getting breached"
+ ". Skipping shard iteration"
);
return;
}
for (int i = 0; i < primaryLength; i++) {
ShardRouting shard = primary[i];
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ public Decision canMoveAnyShard(RoutingAllocation allocation) {
return Decision.ALWAYS;
}

/**
* Returns a {@link Decision} whether any shard can be allocated in the cluster
* {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}.
*/
public Decision canAllocateAnyShard(RoutingAllocation allocation) {
return Decision.ALWAYS;
}

/**
* Returns a {@link Decision} whether any shard on the given
* {@link RoutingNode}} can be allocated The default is {@link Decision#ALWAYS}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,24 @@ public Decision canMoveAnyShard(RoutingAllocation allocation) {
return ret;
}

@Override
public Decision canAllocateAnyShard(RoutingAllocation allocation) {
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider decider: allocations) {
Decision decision = decider.canAllocateAnyShard(allocation);
if (decision.type().canPreemptivelyReturn()) {
if (allocation.debugDecision() == false) {
return decision;
} else {
ret.add(decision);
}
} else {
addDecision(ret, decision, allocation);
}
}
return ret;
}

private void addDecision(Decision.Multi ret, Decision decision, RoutingAllocation allocation) {
// We never add ALWAYS decisions and only add YES decisions when requested by debug mode (since Multi default is YES).
if (decision != Decision.ALWAYS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,32 @@ public Decision canMoveAnyShard(RoutingAllocation allocation) {
);
}

@Override
public Decision canAllocateAnyShard(RoutingAllocation allocation) {
if (clusterConcurrentRecoveries == -1) {
return allocation.decision(Decision.YES, NAME, "undefined cluster concurrent recoveries");
}
int initializingShards = allocation.routingNodes().getInitializingShardCount();
if (initializingShards >= clusterConcurrentRecoveries) {
return allocation.decision(
Decision.THROTTLE,
NAME,
"too many shards are concurrently initializing [%d], limit: [%d] cluster setting [%s=%d]",
initializingShards,
clusterConcurrentRecoveries,
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING.getKey(),
clusterConcurrentRecoveries
);
}
return allocation.decision(
Decision.YES,
NAME,
"below threshold [%d] for concurrent recoveries, current initializing shard count [%d]",
clusterConcurrentRecoveries,
initializingShards
);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return canMoveAnyShard(allocation);
Expand Down

0 comments on commit 11cab8c

Please sign in to comment.