Skip to content

Commit

Permalink
Add timeout setting for batch allocator
Browse files Browse the repository at this point in the history
Fix minor bug

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Jul 7, 2024
1 parent d51f7e0 commit 7a760e1
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -633,10 +633,11 @@ private void allocateAllUnassignedShards(RoutingAllocation allocation) {
}

private void processWorkItemQueue(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
long startTime = System.nanoTime();
//TODO replace with max time
while (workQueue.isEmpty() == false) {
if (System.nanoTime() - startTime > TimeValue.timeValueSeconds(30).nanos()) {
if (System.nanoTime() - startTime > allocator.getAllocatorTimeout().nanos()) {
logger.info("Timed out while running process work item queue");
return;
}
Expand Down Expand Up @@ -827,6 +828,11 @@ public void allocateUnassigned(
unassignedAllocationHandler.removeAndIgnore(AllocationStatus.NO_VALID_SHARD_COPY, allocation.changes());
}

@Override
public TimeValue getAllocatorTimeout() {
return null;
}

@Override
public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation allocation) {
assert unassignedShard.unassigned();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;

Expand Down Expand Up @@ -102,6 +103,10 @@ void allocateUnassigned(
UnassignedAllocationHandler unassignedAllocationHandler
);

default TimeValue getAllocatorTimeout() {
return null;
}

/**
* Allocate all unassigned shards in the given {@link RoutingAllocation} for which this {@link ExistingShardsAllocator} is responsible.
* Default implementation calls {@link #allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} for each Unassigned shard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ public void apply(Settings value, Settings current, Settings previous) {
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE,
ShardsBatchGatewayAllocator.BATCH_ALLOCATOR_TIMEOUT_SETTING,
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.opensearch.common.UUIDs;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -60,6 +62,9 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
private static final Logger logger = LogManager.getLogger(ShardsBatchGatewayAllocator.class);
private final long maxBatchSize;
private static final short DEFAULT_SHARD_BATCH_SIZE = 2000;
private static final String BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY = "cluster.routing.allocation.shards_batch_gateway_allocator.allocator_timeout";

private TimeValue shardsBatchGatewayAllocatorTimeout;

/**
* Number of shards we send in one batch to data nodes for fetching metadata
Expand All @@ -72,6 +77,13 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
Setting.Property.NodeScope
);

public static final Setting<TimeValue> BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
TimeValue.timeValueSeconds(60),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final RerouteService rerouteService;
private final PrimaryShardBatchAllocator primaryShardBatchAllocator;
private final ReplicaShardBatchAllocator replicaShardBatchAllocator;
Expand All @@ -90,14 +102,20 @@ public ShardsBatchGatewayAllocator(
RerouteService rerouteService,
TransportNodesListGatewayStartedShardsBatch batchStartedAction,
TransportNodesListShardStoreMetadataBatch batchStoreAction,
Settings settings
Settings settings,
ClusterSettings clusterSettings
) {
this.rerouteService = rerouteService;
this.primaryShardBatchAllocator = new InternalPrimaryBatchShardAllocator();
this.replicaShardBatchAllocator = new InternalReplicaBatchShardAllocator();
this.batchStartedAction = batchStartedAction;
this.batchStoreAction = batchStoreAction;
this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings);
this.shardsBatchGatewayAllocatorTimeout = BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
BATCH_ALLOCATOR_TIMEOUT_SETTING,
this::setAllocatorTimeout
);
}

@Override
Expand All @@ -120,6 +138,7 @@ protected ShardsBatchGatewayAllocator(long batchSize) {
this.batchStoreAction = null;
this.replicaShardBatchAllocator = null;
this.maxBatchSize = batchSize;
this.shardsBatchGatewayAllocatorTimeout = null;
}
// for tests

Expand Down Expand Up @@ -179,6 +198,15 @@ public void allocateUnassigned(
throw new UnsupportedOperationException("ShardsBatchGatewayAllocator does not support allocating unassigned shards");
}

@Override
public TimeValue getAllocatorTimeout() {
return this.shardsBatchGatewayAllocatorTimeout;
}

public void setAllocatorTimeout(TimeValue shardsBatchGatewayAllocatorTimeout) {
this.shardsBatchGatewayAllocatorTimeout = shardsBatchGatewayAllocatorTimeout;
}

@Override
public List<Runnable> allocateAllUnassignedShards(final RoutingAllocation allocation, boolean primary) {

Expand Down

0 comments on commit 7a760e1

Please sign in to comment.