Skip to content

Commit

Permalink
Allow configuring driver threads based on the number of cores
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed May 30, 2024
1 parent 93b44ae commit 3cb1e6f
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 4 deletions.
6 changes: 6 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,12 @@ in performance due to a context switching. The number of active threads is avail
via the ``RunningSplits`` property of the
``com.facebook.presto.execution.executor:name=TaskExecutor.RunningSplits`` JXM object.

The number of threads can be configured using either an absolute value (for example, ``10``)
or a value relative to the number of available CPU cores (for example, ``1.5C``). When
using a relative value, the number of threads is calculated based on the available CPU
cores multiplied by the specified factor (for example, ``1.5``) and rounded to the
nearest integer.

``task.min-drivers``
^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ public int getMaxWorkerThreads()

@LegacyConfig("task.shard.max-threads")
@Config("task.max-worker-threads")
public TaskManagerConfig setMaxWorkerThreads(int maxWorkerThreads)
public TaskManagerConfig setMaxWorkerThreads(String maxWorkerThreads)
{
this.maxWorkerThreads = maxWorkerThreads;
this.maxWorkerThreads = ThreadCountParser.parse(maxWorkerThreads);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;

import com.google.common.annotations.VisibleForTesting;

import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Double.parseDouble;
import static java.lang.Integer.parseInt;
import static java.lang.Math.max;
import static java.lang.Math.round;
import static java.lang.Math.toIntExact;

public class ThreadCountParser
{
private static final String PER_CORE_SUFFIX = "C";

private ThreadCountParser() {}

public static int parse(String value)
{
return parse(value, Runtime.getRuntime().availableProcessors());
}

@VisibleForTesting
static int parse(String value, int availableProcessors)
{
int threads;
if (value.endsWith(PER_CORE_SUFFIX)) {
double multiplier = parseDouble(value.substring(0, value.length() - PER_CORE_SUFFIX.length()).trim());
checkArgument(multiplier > 0, "Thread multiplier must be positive: %s", multiplier);
threads = toIntExact(round(multiplier * availableProcessors));
threads = max(threads, 1);
}
else {
threads = parseInt(value);
}

checkArgument(threads > 0, "Thread count must be positive: %s", threads);
return threads;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testDefaults()
.setTaskCpuTimerEnabled(true)
.setPerOperatorAllocationTrackingEnabled(false)
.setTaskAllocationTrackingEnabled(false)
.setMaxWorkerThreads(Runtime.getRuntime().availableProcessors() * 2)
.setMaxWorkerThreads("2C")
.setMinDrivers(Runtime.getRuntime().availableProcessors() * 2 * 2)
.setMinDriversPerTask(3)
.setMaxDriversPerTask(Integer.MAX_VALUE)
Expand Down Expand Up @@ -141,7 +141,7 @@ public void testExplicitPropertyMappings()
.setShareIndexLoading(true)
.setMaxPartialAggregationMemoryUsage(new DataSize(32, Unit.MEGABYTE))
.setMaxLocalExchangeBufferSize(new DataSize(33, Unit.MEGABYTE))
.setMaxWorkerThreads(3)
.setMaxWorkerThreads("3")
.setMinDrivers(2)
.setMinDriversPerTask(5)
.setMaxDriversPerTask(13)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;

import org.testng.annotations.Test;

import static com.facebook.presto.execution.ThreadCountParser.parse;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;

public class TestThreadCountParser
{
@Test
public void testParse()
{
assertEquals(parse("1"), 1);
assertEquals(parse("100"), 100);
assertEquals(parse("1", 1), 1);
assertEquals(parse("1", 2), 1);
assertEquals(parse("100", 4), 100);
assertEquals(parse("1C", 4), 4);
assertEquals(parse("2C", 8), 16);
assertEquals(parse("1.5C", 6), 9);
assertEquals(parse("0.1C", 1), 1);
assertThatThrownBy(() -> parse("0"))
.hasMessageContaining("Thread count must be positive");
assertThatThrownBy(() -> parse("-1"))
.hasMessageContaining("Thread count must be positive");
assertThatThrownBy(() -> parse("0C"))
.hasMessageContaining("Thread multiplier must be positive");
assertThatThrownBy(() -> parse("-1C"))
.hasMessageContaining("Thread multiplier must be positive");
assertThatThrownBy(() -> parse("abc"))
.isInstanceOf(NumberFormatException.class);
assertThatThrownBy(() -> parse("abcC"))
.isInstanceOf(NumberFormatException.class);
}
}

0 comments on commit 3cb1e6f

Please sign in to comment.