Skip to content

Commit

Permalink
Implement collection of min/max values in DynamicFilterSourceOperator
Browse files Browse the repository at this point in the history
Cherry-picked from trinodb/trino#3871
as well as the relevant code from the following commits:

- trinodb/trino@f401bfe
- trinodb/trino@f7dd1de
- trinodb/trino@c224b48
- trinodb/trino@e0d1846
- trinodb/trino@b87425e

Co-authored-by: Raunaq Morarka <raunaqmorarka@gmail.com>
Co-authored-by: Piotr Findeisen <piotr.findeisen@gmail.com>
Co-authored-by: Michael Chirico <michael.chirico@grabtaxi.com>
  • Loading branch information
4 people committed Jul 7, 2021
1 parent 109d9a0 commit 59c0073
Show file tree
Hide file tree
Showing 9 changed files with 550 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.toIntExact;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;

Expand Down Expand Up @@ -129,6 +132,20 @@ public static long hashPosition(Type type, Block block, int position)
return type.hash(block, position);
}

public static boolean isFloatingPointNaN(Type type, Object value)
{
requireNonNull(type, "type is null");
requireNonNull(value, "value is null");

if (type == DOUBLE) {
return Double.isNaN((double) value);
}
if (type == REAL) {
return Float.isNaN(intBitsToFloat(toIntExact((long) value)));
}
return false;
}

static void checkElementNotNull(boolean isNull, String errorMsg)
{
if (isNull) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public final class SystemSessionProperties
public static final String ENABLE_DYNAMIC_FILTERING = "enable_dynamic_filtering";
public static final String DYNAMIC_FILTERING_MAX_PER_DRIVER_ROW_COUNT = "dynamic_filtering_max_per_driver_row_count";
public static final String DYNAMIC_FILTERING_MAX_PER_DRIVER_SIZE = "dynamic_filtering_max_per_driver_size";
public static final String DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER = "dynamic_filtering_range_row_limit_per_driver";
public static final String FRAGMENT_RESULT_CACHING_ENABLED = "fragment_result_caching_enabled";
public static final String LEGACY_TYPE_COERCION_WARNING_ENABLED = "legacy_type_coercion_warning_enabled";
public static final String INLINE_SQL_FUNCTIONS = "inline_sql_functions";
Expand Down Expand Up @@ -936,6 +937,11 @@ public SystemSessionProperties(
false,
value -> DataSize.valueOf((String) value),
DataSize::toString),
integerProperty(
DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER,
"Maximum number of build-side rows per driver up to which min and max values will be collected for dynamic filtering",
featuresConfig.getDynamicFilteringRangeRowLimitPerDriver(),
false),
booleanProperty(
FRAGMENT_RESULT_CACHING_ENABLED,
"Enable fragment result caching and read/write leaf fragment result pages from/to cache when applicable",
Expand Down Expand Up @@ -1683,6 +1689,11 @@ public static DataSize getDynamicFilteringMaxPerDriverSize(Session session)
return session.getSystemProperty(DYNAMIC_FILTERING_MAX_PER_DRIVER_SIZE, DataSize.class);
}

public static int getDynamicFilteringRangeRowLimitPerDriver(Session session)
{
return session.getSystemProperty(DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER, Integer.class);
}

public static boolean isFragmentResultCachingEnabled(Session session)
{
return session.getSystemProperty(FRAGMENT_RESULT_CACHING_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeUtils;
import com.facebook.presto.operator.aggregation.TypedSet;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.google.common.collect.ImmutableList;
Expand All @@ -33,6 +32,11 @@
import java.util.Optional;
import java.util.function.Consumer;

import static com.facebook.presto.common.predicate.Range.range;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.TypeUtils.isFloatingPointNaN;
import static com.facebook.presto.common.type.TypeUtils.readNativeValue;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
Expand All @@ -41,7 +45,8 @@
/**
* This operator acts as a simple "pass-through" pipe, while saving its input pages.
* The collected pages' value are used for creating a run-time filtering constraint (for probe-side table scan in an inner join).
* We support only small build-side pages (which should be the case when using "broadcast" join).
* We record all values for the run-time filter only for small build-side pages (which should be the case when using "broadcast" join).
* For large inputs on build side, we can optionally record the min and max values per channel for orderable types (except Double and Real).
*/
public class DynamicFilterSourceOperator
implements Operator
Expand Down Expand Up @@ -86,6 +91,7 @@ public static class DynamicFilterSourceOperatorFactory
private final List<Channel> channels;
private final int maxFilterPositionsCount;
private final DataSize maxFilterSize;
private final int minMaxCollectionLimit;

private boolean closed;

Expand All @@ -95,7 +101,8 @@ public DynamicFilterSourceOperatorFactory(
Consumer<TupleDomain<String>> dynamicPredicateConsumer,
List<Channel> channels,
int maxFilterPositionsCount,
DataSize maxFilterSize)
DataSize maxFilterSize,
int minMaxCollectionLimit)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
Expand All @@ -109,6 +116,7 @@ public DynamicFilterSourceOperatorFactory(
"duplicate channel indices are not allowed");
this.maxFilterPositionsCount = maxFilterPositionsCount;
this.maxFilterSize = maxFilterSize;
this.minMaxCollectionLimit = minMaxCollectionLimit;
}

@Override
Expand All @@ -121,7 +129,8 @@ public Operator createOperator(DriverContext driverContext)
channels,
planNodeId,
maxFilterPositionsCount,
maxFilterSize);
maxFilterSize,
minMaxCollectionLimit);
}

@Override
Expand All @@ -143,6 +152,7 @@ public OperatorFactory duplicate()
private final int maxFilterPositionsCount;
private final long maxFilterSizeInBytes;
private final List<Channel> channels;
private final List<Integer> minMaxChannels;

private boolean finished;
private Page current;
Expand All @@ -153,13 +163,20 @@ public OperatorFactory duplicate()
@Nullable
private TypedSet[] valueSets;

private int minMaxCollectionLimit;
@Nullable
private Block[] minValues;
@Nullable
private Block[] maxValues;

private DynamicFilterSourceOperator(
OperatorContext context,
Consumer<TupleDomain<String>> dynamicPredicateConsumer,
List<Channel> channels,
PlanNodeId planNodeId,
int maxFilterPositionsCount,
DataSize maxFilterSize)
DataSize maxFilterSize,
int minMaxCollectionLimit)
{
this.context = requireNonNull(context, "context is null");
this.maxFilterPositionsCount = maxFilterPositionsCount;
Expand All @@ -170,8 +187,13 @@ private DynamicFilterSourceOperator(

this.blockBuilders = new BlockBuilder[channels.size()];
this.valueSets = new TypedSet[channels.size()];
ImmutableList.Builder<Integer> minMaxChannelsBuilder = ImmutableList.builder();
for (int channelIndex = 0; channelIndex < channels.size(); ++channelIndex) {
Type type = channels.get(channelIndex).getType();
// Skipping DOUBLE and REAL in collectMinMaxValues to avoid dealing with NaN values
if (minMaxCollectionLimit > 0 && type.isOrderable() && type != DOUBLE && type != REAL) {
minMaxChannelsBuilder.add(channelIndex);
}
this.blockBuilders[channelIndex] = type.createBlockBuilder(null, EXPECTED_BLOCK_BUILDER_SIZE);
this.valueSets[channelIndex] = new TypedSet(
type,
Expand All @@ -180,6 +202,12 @@ private DynamicFilterSourceOperator(
String.format("DynamicFilterSourceOperator_%s_%d", planNodeId, channelIndex),
Optional.empty() /* maxBlockMemory */);
}
this.minMaxCollectionLimit = minMaxCollectionLimit;
minMaxChannels = minMaxChannelsBuilder.build();
if (!minMaxChannels.isEmpty()) {
minValues = new Block[channels.size()];
maxValues = new Block[channels.size()];
}
}

@Override
Expand All @@ -200,9 +228,24 @@ public void addInput(Page page)
verify(!finished, "DynamicFilterSourceOperator: addInput() shouldn't not be called after finish()");
current = page;
if (valueSets == null) {
return; // the predicate became too large.
// the exact predicate became too large.
if (minValues == null) {
// there are too many rows to collect min/max range
return;
}
minMaxCollectionLimit -= page.getPositionCount();
if (minMaxCollectionLimit < 0) {
handleMinMaxCollectionLimitExceeded();
return;
}
// the predicate became too large, record only min and max values for each orderable channel
for (Integer channelIndex : minMaxChannels) {
Block block = page.getBlock(channels.get(channelIndex).index);
updateMinMaxValues(block, channelIndex);
}
return;
}

minMaxCollectionLimit -= page.getPositionCount();
// TODO: we should account for the memory used for collecting build-side values using MemoryContext
long filterSizeInBytes = 0;
int filterPositionsCount = 0;
Expand All @@ -224,13 +267,82 @@ public void addInput(Page page)

private void handleTooLargePredicate()
{
// The resulting predicate is too large, allow all probe-side values to be read.
dynamicPredicateConsumer.accept(TupleDomain.all());
// The resulting predicate is too large
if (minMaxChannels.isEmpty()) {
// allow all probe-side values to be read.
dynamicPredicateConsumer.accept(TupleDomain.all());
}
else {
if (minMaxCollectionLimit < 0) {
handleMinMaxCollectionLimitExceeded();
}
else {
// convert to min/max per column for orderable types
for (Integer channelIndex : minMaxChannels) {
Block block = blockBuilders[channelIndex].build();
updateMinMaxValues(block, channelIndex);
}
}
}

// Drop references to collected values.
valueSets = null;
blockBuilders = null;
}

private void handleMinMaxCollectionLimitExceeded()
{
// allow all probe-side values to be read.
dynamicPredicateConsumer.accept(TupleDomain.all());
// Drop references to collected values.
minValues = null;
maxValues = null;
}

private void updateMinMaxValues(Block block, int channelIndex)
{
checkState(minValues != null && maxValues != null);
Type type = channels.get(channelIndex).type;
int minValuePosition = -1;
int maxValuePosition = -1;
for (int position = 0; position < block.getPositionCount(); ++position) {
if (block.isNull(position)) {
continue;
}
if (minValuePosition == -1) {
// First non-null value
minValuePosition = position;
maxValuePosition = position;
continue;
}
if (type.compareTo(block, position, block, minValuePosition) < 0) {
minValuePosition = position;
}
else if (type.compareTo(block, position, block, maxValuePosition) > 0) {
maxValuePosition = position;
}
}
if (minValuePosition == -1) {
// all block values are nulls
return;
}
if (minValues[channelIndex] == null) {
// First Page with non-null value for this block
minValues[channelIndex] = block.getSingleValueBlock(minValuePosition);
maxValues[channelIndex] = block.getSingleValueBlock(maxValuePosition);
return;
}
// Compare with min/max values from previous Pages
Block currentMin = minValues[channelIndex];
Block currentMax = maxValues[channelIndex];
if (type.compareTo(block, minValuePosition, currentMin, 0) < 0) {
minValues[channelIndex] = block.getSingleValueBlock(minValuePosition);
}
if (type.compareTo(block, maxValuePosition, currentMax, 0) > 0) {
maxValues[channelIndex] = block.getSingleValueBlock(maxValuePosition);
}
}

@Override
public Page getOutput()
{
Expand All @@ -247,12 +359,36 @@ public void finish()
return;
}
finished = true;
ImmutableMap.Builder<String, Domain> domainsBuilder = ImmutableMap.builder();
if (valueSets == null) {
return; // the predicate became too large.
if (minValues == null) {
// there were too many rows to collect collect min/max range
// dynamicPredicateConsumer was notified with 'all' in handleTooLargePredicate if there are no orderable types,
// else it was notified with 'all' in handleMinMaxCollectionLimitExceeded
return;
}
// valueSets became too large, create TupleDomain from min/max values
for (Integer channelIndex : minMaxChannels) {
Type type = channels.get(channelIndex).type;
if (minValues[channelIndex] == null) {
// all values were null
domainsBuilder.put(channels.get(channelIndex).filterId, Domain.none(type));
continue;
}
Object min = readNativeValue(type, minValues[channelIndex], 0);
Object max = readNativeValue(type, maxValues[channelIndex], 0);
Domain domain = Domain.create(
ValueSet.ofRanges(range(type, min, true, max, true)),
false);
domainsBuilder.put(channels.get(channelIndex).filterId, domain);
}
minValues = null;
maxValues = null;
dynamicPredicateConsumer.accept(TupleDomain.withColumnDomains(domainsBuilder.build()));
return;
}

verify(blockBuilders != null, "blockBuilders is null when finish is called in DynamicFilterSourceOperator");
ImmutableMap.Builder<String, Domain> domainsBuilder = ImmutableMap.builder();
for (int channelIndex = 0; channelIndex < channels.size(); ++channelIndex) {
Block block = blockBuilders[channelIndex].build();
Type type = channels.get(channelIndex).getType();
Expand All @@ -267,9 +403,12 @@ private Domain convertToDomain(Type type, Block block)
{
ImmutableList.Builder<Object> values = ImmutableList.builder();
for (int position = 0; position < block.getPositionCount(); ++position) {
Object value = TypeUtils.readNativeValue(type, block, position);
Object value = readNativeValue(type, block, position);
if (value != null) {
values.add(value);
// join doesn't match rows with NaN values.
if (!isFloatingPointNaN(type, value)) {
values.add(value);
}
}
}
// Inner and right join doesn't match rows with null key column values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public class FeaturesConfig
private boolean enableDynamicFiltering;
private int dynamicFilteringMaxPerDriverRowCount = 100;
private DataSize dynamicFilteringMaxPerDriverSize = new DataSize(10, KILOBYTE);
private int dynamicFilteringRangeRowLimitPerDriver;

private boolean fragmentResultCachingEnabled;

Expand Down Expand Up @@ -1150,6 +1151,19 @@ public FeaturesConfig setDynamicFilteringMaxPerDriverSize(DataSize dynamicFilter
return this;
}

public int getDynamicFilteringRangeRowLimitPerDriver()
{
return dynamicFilteringRangeRowLimitPerDriver;
}

@Config("experimental.dynamic-filtering-range-row-limit-per-driver")
@ConfigDescription("Maximum number of build-side rows per driver up to which min and max values will be collected for dynamic filtering")
public FeaturesConfig setDynamicFilteringRangeRowLimitPerDriver(int dynamicFilteringRangeRowLimitPerDriver)
{
this.dynamicFilteringRangeRowLimitPerDriver = dynamicFilteringRangeRowLimitPerDriver;
return this;
}

public boolean isFragmentResultCachingEnabled()
{
return fragmentResultCachingEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@
import static com.facebook.presto.SystemSessionProperties.getAggregationOperatorUnspillMemoryLimit;
import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringMaxPerDriverRowCount;
import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringMaxPerDriverSize;
import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringRangeRowLimitPerDriver;
import static com.facebook.presto.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount;
import static com.facebook.presto.SystemSessionProperties.getFilterAndProjectMinOutputPageSize;
import static com.facebook.presto.SystemSessionProperties.getIndexLoaderTimeout;
Expand Down Expand Up @@ -2323,7 +2324,8 @@ private DynamicFilterSourceOperator.DynamicFilterSourceOperatorFactory createDyn
dynamicFilter.getTupleDomainConsumer(),
filterBuildChannels,
getDynamicFilteringMaxPerDriverRowCount(context.getSession()),
getDynamicFilteringMaxPerDriverSize(context.getSession()));
getDynamicFilteringMaxPerDriverSize(context.getSession()),
getDynamicFilteringRangeRowLimitPerDriver(context.getSession()));
}

private Optional<LocalDynamicFilter> createDynamicFilter(PhysicalOperation buildSource, AbstractJoinNode node, LocalExecutionPlanContext context, int partitionCount)
Expand Down
Loading

0 comments on commit 59c0073

Please sign in to comment.