diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index 846a6a0792ab..4d52d5fdf9f4 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -546,6 +546,12 @@ in performance due to a context switching. The number of active threads is avail via the ``RunningSplits`` property of the ``com.facebook.presto.execution.executor:name=TaskExecutor.RunningSplits`` JXM object. +The number of threads can be configured using either an absolute value (for example, ``10``) +or a value relative to the number of available CPU cores (for example, ``1.5C``). When +using a relative value, the number of threads is calculated based on the available CPU +cores multiplied by the specified factor (for example, ``1.5``) and rounded to the +nearest integer. + ``task.min-drivers`` ^^^^^^^^^^^^^^^^^^^^ diff --git a/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java index c61f069de399..ea34e1d4cc36 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java @@ -281,9 +281,9 @@ public int getMaxWorkerThreads() @LegacyConfig("task.shard.max-threads") @Config("task.max-worker-threads") - public TaskManagerConfig setMaxWorkerThreads(int maxWorkerThreads) + public TaskManagerConfig setMaxWorkerThreads(String maxWorkerThreads) { - this.maxWorkerThreads = maxWorkerThreads; + this.maxWorkerThreads = ThreadCountParser.parse(maxWorkerThreads); return this; } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/ThreadCountParser.java b/presto-main/src/main/java/com/facebook/presto/execution/ThreadCountParser.java new file mode 100644 index 000000000000..d8ae80314153 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/execution/ThreadCountParser.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.execution; + +import com.google.common.annotations.VisibleForTesting; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.Double.parseDouble; +import static java.lang.Integer.parseInt; +import static java.lang.Math.max; +import static java.lang.Math.round; +import static java.lang.Math.toIntExact; + +public class ThreadCountParser +{ + private static final String PER_CORE_SUFFIX = "C"; + + private ThreadCountParser() {} + + public static int parse(String value) + { + return parse(value, Runtime.getRuntime().availableProcessors()); + } + + @VisibleForTesting + static int parse(String value, int availableProcessors) + { + int threads; + if (value.endsWith(PER_CORE_SUFFIX)) { + double multiplier = parseDouble(value.substring(0, value.length() - PER_CORE_SUFFIX.length()).trim()); + checkArgument(multiplier > 0, "Thread multiplier must be positive: %s", multiplier); + threads = toIntExact(round(multiplier * availableProcessors)); + threads = max(threads, 1); + } + else { + threads = parseInt(value); + } + + checkArgument(threads > 0, "Thread count must be positive: %s", threads); + return threads; + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java index 497b709d18fa..59a42ad38fa7 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestTaskManagerConfig.java @@ -46,7 +46,7 @@ public void testDefaults() .setTaskCpuTimerEnabled(true) .setPerOperatorAllocationTrackingEnabled(false) .setTaskAllocationTrackingEnabled(false) - .setMaxWorkerThreads(Runtime.getRuntime().availableProcessors() * 2) + .setMaxWorkerThreads("2C") .setMinDrivers(Runtime.getRuntime().availableProcessors() * 2 * 2) .setMinDriversPerTask(3) .setMaxDriversPerTask(Integer.MAX_VALUE) @@ -141,7 +141,7 @@ public void testExplicitPropertyMappings() .setShareIndexLoading(true) .setMaxPartialAggregationMemoryUsage(new DataSize(32, Unit.MEGABYTE)) .setMaxLocalExchangeBufferSize(new DataSize(33, Unit.MEGABYTE)) - .setMaxWorkerThreads(3) + .setMaxWorkerThreads("3") .setMinDrivers(2) .setMinDriversPerTask(5) .setMaxDriversPerTask(13) diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestThreadCountParser.java b/presto-main/src/test/java/com/facebook/presto/execution/TestThreadCountParser.java new file mode 100644 index 000000000000..2bb4aad67174 --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestThreadCountParser.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.execution; + +import org.testng.annotations.Test; + +import static com.facebook.presto.execution.ThreadCountParser.parse; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; + +public class TestThreadCountParser +{ + @Test + public void testParse() + { + assertEquals(parse("1"), 1); + assertEquals(parse("100"), 100); + assertEquals(parse("1", 1), 1); + assertEquals(parse("1", 2), 1); + assertEquals(parse("100", 4), 100); + assertEquals(parse("1C", 4), 4); + assertEquals(parse("2C", 8), 16); + assertEquals(parse("1.5C", 6), 9); + assertEquals(parse("0.1C", 1), 1); + assertThatThrownBy(() -> parse("0")) + .hasMessageContaining("Thread count must be positive"); + assertThatThrownBy(() -> parse("-1")) + .hasMessageContaining("Thread count must be positive"); + assertThatThrownBy(() -> parse("0C")) + .hasMessageContaining("Thread multiplier must be positive"); + assertThatThrownBy(() -> parse("-1C")) + .hasMessageContaining("Thread multiplier must be positive"); + assertThatThrownBy(() -> parse("abc")) + .isInstanceOf(NumberFormatException.class); + assertThatThrownBy(() -> parse("abcC")) + .isInstanceOf(NumberFormatException.class); + } +}