Skip to content

Commit

Permalink
Make number of preferred nodes configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Apr 26, 2024
1 parent 62d9641 commit 12f1ae6
Show file tree
Hide file tree
Showing 15 changed files with 50 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,8 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
if (getNodeSelectionStrategy() == SOFT_AFFINITY) {
// SOFT_AFFINITY node selection strategy scheduler would choose 2 workers (preferred, secondary preferred)
// for scheduling
return nodeProvider.get(filePath, 2);
// SOFT_AFFINITY node selection strategy scheduler would choose preferred nodes for scheduling
return nodeProvider.get(filePath);
}
return ImmutableList.of(); // empty list indicates no preference.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public List<HostAddress> getAddresses()
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
if (getNodeSelectionStrategy() == SOFT_AFFINITY) {
return nodeProvider.get(fileSplit.getPath() + "#" + fileSplit.getAffinitySchedulingFileSectionIndex(), 2);
return nodeProvider.get(fileSplit.getPath() + "#" + fileSplit.getAffinitySchedulingFileSectionIndex());
}
return addresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testAffinitySchedulingKey()
private static String getAffinitySchedulingKey(HiveSplit split)
{
AtomicReference<String> reference = new AtomicReference<>();
split.getPreferredNodes((key, count) -> {
split.getPreferredNodes((key) -> {
reference.set(key);
return ImmutableList.of();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
if (getNodeSelectionStrategy() == SOFT_AFFINITY) {
return baseFile.map(file -> nodeProvider.get(file.getPath(), 2)).orElse(addresses);
return baseFile.map(file -> nodeProvider.get(file.getPath())).orElse(addresses);
}
return addresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
if (getNodeSelectionStrategy() == SOFT_AFFINITY) {
return nodeProvider.get(path, 2);
return nodeProvider.get(path);
}
return addresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.HashFunction;

Expand All @@ -34,7 +33,6 @@
import static java.util.Objects.requireNonNull;

public class ConsistentHashingNodeProvider
implements NodeProvider
{
private static final HashFunction HASH_FUNCTION = murmur3_32();
private final NavigableMap<Integer, InternalNode> candidates;
Expand All @@ -57,7 +55,6 @@ private ConsistentHashingNodeProvider(NavigableMap<Integer, InternalNode> candid
this.nodeCount = nodeCount;
}

@Override
public List<HostAddress> get(String key, int count)
{
if (count > nodeCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.PrestoException;
import com.google.common.hash.HashFunction;

Expand All @@ -29,7 +28,6 @@
import static java.util.Collections.unmodifiableList;

public class ModularHashingNodeProvider
implements NodeProvider
{
private static final HashFunction HASH_FUNCTION = murmur3_32();

Expand All @@ -43,7 +41,6 @@ public ModularHashingNodeProvider(List<InternalNode> sortedCandidates)
this.sortedCandidates = sortedCandidates;
}

@Override
public List<HostAddress> get(String identifier, int count)
{
int size = sortedCandidates.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.Optional;
import java.util.Set;

import static java.lang.String.format;

public class NodeMap
{
private final Map<String, InternalNode> activeNodesByNodeId;
Expand Down Expand Up @@ -92,15 +90,12 @@ public SetMultimap<HostAddress, InternalNode> getAllNodesByHostAndPort()
return allNodesByHostAndPort;
}

public NodeProvider getActiveNodeProvider(NodeSelectionHashStrategy nodeSelectionHashStrategy)
public NodeProvider getNodeProvider(int nodeCount)
{
switch (nodeSelectionHashStrategy) {
case MODULAR_HASHING:
return new ModularHashingNodeProvider(activeNodes);
case CONSISTENT_HASHING:
return consistentHashingNodeProvider.get();
default:
throw new IllegalArgumentException(format("Unknown NodeSelectionHashStrategy: %s", nodeSelectionHashStrategy));
if (consistentHashingNodeProvider.isPresent()) {
return (key) -> consistentHashingNodeProvider.get().get(key, nodeCount);
}
ModularHashingNodeProvider modularHashingNodeProvider = new ModularHashingNodeProvider(allNodes);
return (key) -> modularHashingNodeProvider.get(key, nodeCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class NodeScheduler
private final SimpleTtlNodeSelectorConfig simpleTtlNodeSelectorConfig;
private final NodeSelectionHashStrategy nodeSelectionHashStrategy;
private final int minVirtualNodeCount;
private final int maxPreferredNodes;

@Inject
public NodeScheduler(
Expand Down Expand Up @@ -167,6 +168,7 @@ public NodeScheduler(
this.simpleTtlNodeSelectorConfig = requireNonNull(simpleTtlNodeSelectorConfig, "simpleTtlNodeSelectorConfig is null");
this.nodeSelectionHashStrategy = config.getNodeSelectionHashStrategy();
this.minVirtualNodeCount = config.getMinVirtualNodeCount();
this.maxPreferredNodes = config.getMaxPreferredNodes();
}

@PreDestroy
Expand Down Expand Up @@ -222,7 +224,7 @@ public NodeSelector createNodeSelector(Session session, ConnectorId connectorId,
topologicalSplitCounters,
networkLocationSegmentNames,
networkLocationCache,
nodeSelectionHashStrategy);
maxPreferredNodes);
}

SimpleNodeSelector simpleNodeSelector = new SimpleNodeSelector(
Expand All @@ -236,7 +238,7 @@ public NodeSelector createNodeSelector(Session session, ConnectorId connectorId,
maxPendingSplitsWeightPerTask,
maxUnacknowledgedSplitsPerTask,
maxTasksPerStage,
nodeSelectionHashStrategy);
maxPreferredNodes);

if (resourceAwareSchedulingStrategy == TTL) {
return new SimpleTtlNodeSelector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public static class NetworkTopologyType
private NodeSelectionHashStrategy nodeSelectionHashStrategy = NodeSelectionHashStrategy.MODULAR_HASHING;
private int minVirtualNodeCount = 1000;
private ResourceAwareSchedulingStrategy resourceAwareSchedulingStrategy = ResourceAwareSchedulingStrategy.RANDOM;
private int maxPreferredNodes = 2;

@NotNull
public String getNetworkTopology()
Expand Down Expand Up @@ -160,6 +161,19 @@ public NodeSchedulerConfig setResourceAwareSchedulingStrategy(ResourceAwareSched
return this;
}

@Min(1)
public int getMaxPreferredNodes()
{
return maxPreferredNodes;
}

@Config("node-scheduler.max-preferred-nodes")
public NodeSchedulerConfig setMaxPreferredNodes(int maxPreferredNodes)
{
this.maxPreferredNodes = maxPreferredNodes;
return this;
}

public enum ResourceAwareSchedulingStrategy
{
RANDOM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.scheduler.BucketNodeMap;
import com.facebook.presto.execution.scheduler.InternalNodeInfo;
import com.facebook.presto.execution.scheduler.ModularHashingNodeProvider;
import com.facebook.presto.execution.scheduler.NodeAssignmentStats;
import com.facebook.presto.execution.scheduler.NodeMap;
import com.facebook.presto.execution.scheduler.NodeSelectionHashStrategy;
import com.facebook.presto.execution.scheduler.SplitPlacementResult;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
Expand Down Expand Up @@ -53,7 +51,6 @@
import static com.facebook.presto.execution.scheduler.NodeScheduler.selectExactNodes;
import static com.facebook.presto.execution.scheduler.NodeScheduler.selectNodes;
import static com.facebook.presto.execution.scheduler.NodeScheduler.toWhenHasSplitQueueSpaceFuture;
import static com.facebook.presto.execution.scheduler.NodeSelectionHashStrategy.MODULAR_HASHING;
import static com.facebook.presto.metadata.InternalNode.NodeStatus.DEAD;
import static com.facebook.presto.spi.StandardErrorCode.NODE_SELECTION_NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
Expand All @@ -80,7 +77,7 @@ public class SimpleNodeSelector
private final long maxPendingSplitsWeightPerTask;
private final int maxUnacknowledgedSplitsPerTask;
private final int maxTasksPerStage;
private final NodeSelectionHashStrategy nodeSelectionHashStrategy;
private final int maxPreferredNodes;

public SimpleNodeSelector(
InternalNodeManager nodeManager,
Expand All @@ -93,7 +90,7 @@ public SimpleNodeSelector(
long maxPendingSplitsWeightPerTask,
int maxUnacknowledgedSplitsPerTask,
int maxTasksPerStage,
NodeSelectionHashStrategy nodeSelectionHashStrategy)
int maxPreferredNodes)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.nodeSelectionStats = requireNonNull(nodeSelectionStats, "nodeSelectionStats is null");
Expand All @@ -106,7 +103,7 @@ public SimpleNodeSelector(
this.maxUnacknowledgedSplitsPerTask = maxUnacknowledgedSplitsPerTask;
checkArgument(maxUnacknowledgedSplitsPerTask > 0, "maxUnacknowledgedSplitsPerTask must be > 0, found: %s", maxUnacknowledgedSplitsPerTask);
this.maxTasksPerStage = maxTasksPerStage;
this.nodeSelectionHashStrategy = requireNonNull(nodeSelectionHashStrategy, "nodeSelectionHashStrategy is null");
this.maxPreferredNodes = maxPreferredNodes;
}

@Override
Expand Down Expand Up @@ -152,8 +149,7 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
Set<InternalNode> blockedExactNodes = new HashSet<>();
boolean splitWaitingForAnyNode = false;

NodeProvider nodeProvider = nodeMap.getActiveNodeProvider(nodeSelectionHashStrategy);

NodeProvider nodeProvider = nodeMap.getNodeProvider(maxPreferredNodes);
OptionalInt preferredNodeCount = OptionalInt.empty();
for (Split split : splits) {
List<InternalNode> candidateNodes;
Expand All @@ -163,10 +159,6 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
preferredNodeCount = OptionalInt.of(candidateNodes.size());
break;
case SOFT_AFFINITY:
// Using all nodes for soft affinity scheduling with modular hashing because otherwise temporarily down nodes would trigger too much rehashing
if (nodeSelectionHashStrategy == MODULAR_HASHING) {
nodeProvider = new ModularHashingNodeProvider(nodeMap.getAllNodes());
}
candidateNodes = selectExactNodes(nodeMap, split.getPreferredNodes(nodeProvider), includeCoordinator);
preferredNodeCount = OptionalInt.of(candidateNodes.size());
candidateNodes = ImmutableList.<InternalNode>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.facebook.presto.execution.scheduler.NetworkLocationCache;
import com.facebook.presto.execution.scheduler.NodeAssignmentStats;
import com.facebook.presto.execution.scheduler.NodeMap;
import com.facebook.presto.execution.scheduler.NodeSelectionHashStrategy;
import com.facebook.presto.execution.scheduler.ResettableRandomizedIterator;
import com.facebook.presto.execution.scheduler.SplitPlacementResult;
import com.facebook.presto.metadata.InternalNode;
Expand Down Expand Up @@ -77,7 +76,7 @@ public class TopologyAwareNodeSelector
private final List<CounterStat> topologicalSplitCounters;
private final List<String> networkLocationSegmentNames;
private final NetworkLocationCache networkLocationCache;
private final NodeSelectionHashStrategy nodeSelectionHashStrategy;
private final int maxPreferredNodes;

public TopologyAwareNodeSelector(
InternalNodeManager nodeManager,
Expand All @@ -92,7 +91,7 @@ public TopologyAwareNodeSelector(
List<CounterStat> topologicalSplitCounters,
List<String> networkLocationSegmentNames,
NetworkLocationCache networkLocationCache,
NodeSelectionHashStrategy nodeSelectionHashStrategy)
int maxPreferredNodes)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.nodeSelectionStats = requireNonNull(nodeSelectionStats, "nodeSelectionStats is null");
Expand All @@ -107,7 +106,7 @@ public TopologyAwareNodeSelector(
this.topologicalSplitCounters = requireNonNull(topologicalSplitCounters, "topologicalSplitCounters is null");
this.networkLocationSegmentNames = requireNonNull(networkLocationSegmentNames, "networkLocationSegmentNames is null");
this.networkLocationCache = requireNonNull(networkLocationCache, "networkLocationCache is null");
this.nodeSelectionHashStrategy = requireNonNull(nodeSelectionHashStrategy, "nodeSelectionHashStrategy is null");
this.maxPreferredNodes = maxPreferredNodes;
}

@Override
Expand Down Expand Up @@ -153,8 +152,7 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
Set<InternalNode> blockedExactNodes = new HashSet<>();
boolean splitWaitingForAnyNode = false;

NodeProvider nodeProvider = nodeMap.getActiveNodeProvider(nodeSelectionHashStrategy);

NodeProvider nodeProvider = nodeMap.getNodeProvider(maxPreferredNodes);
for (Split split : splits) {
SplitWeight splitWeight = split.getSplitWeight();
if (split.getNodeSelectionStrategy() == HARD_AFFINITY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public void testScheduleLocal()
Set<Split> splits = ImmutableSet.of(split);

Map.Entry<InternalNode, Split> assignment = Iterables.getOnlyElement(nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments().entries());
assertEquals(assignment.getKey().getHostAndPort(), split.getPreferredNodes(new ModularHashingNodeProvider(nodeSelector.getAllNodes())).get(0));
ModularHashingNodeProvider modularHashingNodeProvider = new ModularHashingNodeProvider(nodeSelector.getAllNodes());
assertEquals(assignment.getKey().getHostAndPort(), split.getPreferredNodes((key) -> modularHashingNodeProvider.get(key, 3)).get(0));
assertEquals(assignment.getValue(), split);
}

Expand Down Expand Up @@ -361,10 +362,11 @@ public NetworkLocation get(HostAddress host)
}
unassigned = Sets.difference(unassigned, new HashSet<>(assignments.values()));
assertEquals(unassigned.size(), 3);
ModularHashingNodeProvider modularHashingNodeProvider = new ModularHashingNodeProvider(nodeSelector.getAllNodes());
int rack1 = 0;
int rack2 = 0;
for (Split split : unassigned) {
String rack = topology.locate(split.getPreferredNodes(new ModularHashingNodeProvider(nodeSelector.getAllNodes())).get(0)).getSegments().get(0);
String rack = topology.locate(split.getPreferredNodes((key) -> modularHashingNodeProvider.get(key, 2)).get(0)).getSegments().get(0);
switch (rack) {
case "rack1":
rack1++;
Expand Down Expand Up @@ -1353,7 +1355,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
@Override
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return nodeProvider.get(format("split%d", scheduleIdentifierId), 1);
return nodeProvider.get(format("split%d", scheduleIdentifierId));
}

@Override
Expand Down Expand Up @@ -1393,7 +1395,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
@Override
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return nodeProvider.get(String.valueOf(new Random().nextInt()), 1);
return nodeProvider.get(String.valueOf(new Random().nextInt()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void testDefaults()
.setIncludeCoordinator(true)
.setNodeSelectionHashStrategy(MODULAR_HASHING)
.setMinVirtualNodeCount(1000)
.setResourceAwareSchedulingStrategy(RANDOM));
.setResourceAwareSchedulingStrategy(RANDOM)
.setMaxPreferredNodes(2));
}

@Test
Expand All @@ -56,6 +57,7 @@ public void testExplicitPropertyMappings()
.put("node-scheduler.node-selection-hash-strategy", "CONSISTENT_HASHING")
.put("node-scheduler.consistent-hashing-min-virtual-node-count", "2000")
.put("experimental.resource-aware-scheduling-strategy", "TTL")
.put("node-scheduler.max-preferred-nodes", "5")
.build();

NodeSchedulerConfig expected = new NodeSchedulerConfig()
Expand All @@ -67,7 +69,8 @@ public void testExplicitPropertyMappings()
.setMinCandidates(11)
.setNodeSelectionHashStrategy(CONSISTENT_HASHING)
.setMinVirtualNodeCount(2000)
.setResourceAwareSchedulingStrategy(TTL);
.setResourceAwareSchedulingStrategy(TTL)
.setMaxPreferredNodes(5);

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ public interface NodeProvider
{
/**
* @param identifier an unique identifier used to obtain the nodes
* @param count how many desirable nodes to be returned
* @return a list of the chosen nodes by specific hash function
* @return a list of the chosen nodes
*/
List<HostAddress> get(String identifier, int count);
List<HostAddress> get(String identifier);
}

0 comments on commit 12f1ae6

Please sign in to comment.