From 09276b372269b48187976ddc2c39bb95dd862544 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 30 Jul 2024 23:15:34 +0530 Subject: [PATCH] Add lower limit for primary and replica batch allocators timeout (#14979) * Add lower limit for primary and replica batch allocators Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + .../gateway/RecoveryFromGatewayIT.java | 6 +- .../gateway/ShardsBatchGatewayAllocator.java | 39 ++++++++++++- .../gateway/GatewayAllocatorTests.java | 55 +++++++++++++++++++ 4 files changed, 96 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f619b6b85c649..a5a3e9c60b664 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `actions/github-script` from 6 to 7 ([#14997](https://github.com/opensearch-project/OpenSearch/pull/14997)) ### Changed +- Add lower limit for primary and replica batch allocators timeout ([#14979](https://github.com/opensearch-project/OpenSearch/pull/14979)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index eccc903dfac82..bcf23a37c0010 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -886,7 +886,7 @@ public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Ex assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches()); } - public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws Exception { + public void testBatchModeEnabledWithDisabledTimeoutAndClusterGreen() throws Exception { internalCluster().startClusterManagerOnlyNodes( 1, @@ -920,8 +920,8 @@ public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws .put("node.name", clusterManagerName) .put(clusterManagerDataPathSettings) .put(ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE.getKey(), 5) - .put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms") - .put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms") + .put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "-1") + .put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "-1") .put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true) .build() ); diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 673ed8dbaa1c3..6c6b1126a78d6 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -73,13 +73,14 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { private final long maxBatchSize; private static final short DEFAULT_SHARD_BATCH_SIZE = 2000; - private static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY = + public static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY = "cluster.routing.allocation.shards_batch_gateway_allocator.primary_allocator_timeout"; - private static final String REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY = + public static final String REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY = "cluster.routing.allocation.shards_batch_gateway_allocator.replica_allocator_timeout"; private TimeValue primaryShardsBatchGatewayAllocatorTimeout; private TimeValue replicaShardsBatchGatewayAllocatorTimeout; + public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); /** * Number of shards we send in one batch to data nodes for fetching metadata @@ -92,16 +93,50 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { Setting.Property.NodeScope ); + /** + * Timeout for existing primary shards batch allocator. + * Timeout value must be greater than or equal to 20s or -1ms to effectively disable timeout + */ public static final Setting PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, TimeValue.MINUS_ONE, + TimeValue.MINUS_ONE, + new Setting.Validator<>() { + @Override + public void validate(TimeValue timeValue) { + if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) { + throw new IllegalArgumentException( + "Setting [" + + PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey() + + "] should be more than 20s or -1ms to disable timeout" + ); + } + } + }, Setting.Property.NodeScope, Setting.Property.Dynamic ); + /** + * Timeout for existing replica shards batch allocator. + * Timeout value must be greater than or equal to 20s or -1ms to effectively disable timeout + */ public static final Setting REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, TimeValue.MINUS_ONE, + TimeValue.MINUS_ONE, + new Setting.Validator<>() { + @Override + public void validate(TimeValue timeValue) { + if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) { + throw new IllegalArgumentException( + "Setting [" + + REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey() + + "] should be more than 20s or -1ms to disable timeout" + ); + } + } + }, Setting.Property.NodeScope, Setting.Property.Dynamic ); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index bd56123f6df1f..1596a0b566b28 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -47,6 +47,11 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING; +import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY; +import static org.opensearch.gateway.ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING; +import static org.opensearch.gateway.ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY; + public class GatewayAllocatorTests extends OpenSearchAllocationTestCase { private final Logger logger = LogManager.getLogger(GatewayAllocatorTests.class); @@ -368,6 +373,56 @@ public void testCreatePrimaryAndReplicaExecutorOfSizeTwo() { assertEquals(executor.getTimeoutAwareRunnables().size(), 2); } + public void testPrimaryAllocatorTimeout() { + // Valid setting with timeout = 20s + Settings build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "20s").build(); + assertEquals(20, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds()); + + // Valid setting with timeout > 20s + build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "30000ms").build(); + assertEquals(30, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds()); + + // Invalid setting with timeout < 20s + Settings lessThan20sSetting = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "10s").build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(lessThan20sSetting) + ); + assertEquals( + "Setting [" + PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey() + "] should be more than 20s or -1ms to disable timeout", + iae.getMessage() + ); + + // Valid setting with timeout = -1 + build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "-1").build(); + assertEquals(-1, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis()); + } + + public void testReplicaAllocatorTimeout() { + // Valid setting with timeout = 20s + Settings build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "20s").build(); + assertEquals(20, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds()); + + // Valid setting with timeout > 20s + build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "30000ms").build(); + assertEquals(30, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds()); + + // Invalid setting with timeout < 20s + Settings lessThan20sSetting = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "10s").build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(lessThan20sSetting) + ); + assertEquals( + "Setting [" + REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey() + "] should be more than 20s or -1ms to disable timeout", + iae.getMessage() + ); + + // Valid setting with timeout = -1 + build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "-1").build(); + assertEquals(-1, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis()); + } + private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas) { if (count == 0) return; Metadata.Builder metadata = Metadata.builder();