Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement collection of min/max values in DynamicFilterSourceOperator #16314

Merged
merged 1 commit into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.toIntExact;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;

Expand Down Expand Up @@ -129,6 +132,20 @@ public static long hashPosition(Type type, Block block, int position)
return type.hash(block, position);
}

public static boolean isFloatingPointNaN(Type type, Object value)
{
requireNonNull(type, "type is null");
requireNonNull(value, "value is null");

if (type == DOUBLE) {
return Double.isNaN((double) value);
}
if (type == REAL) {
return Float.isNaN(intBitsToFloat(toIntExact((long) value)));
}
return false;
}

static void checkElementNotNull(boolean isNull, String errorMsg)
{
if (isNull) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public final class SystemSessionProperties
public static final String ENABLE_DYNAMIC_FILTERING = "enable_dynamic_filtering";
public static final String DYNAMIC_FILTERING_MAX_PER_DRIVER_ROW_COUNT = "dynamic_filtering_max_per_driver_row_count";
public static final String DYNAMIC_FILTERING_MAX_PER_DRIVER_SIZE = "dynamic_filtering_max_per_driver_size";
public static final String DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER = "dynamic_filtering_range_row_limit_per_driver";
public static final String FRAGMENT_RESULT_CACHING_ENABLED = "fragment_result_caching_enabled";
public static final String LEGACY_TYPE_COERCION_WARNING_ENABLED = "legacy_type_coercion_warning_enabled";
public static final String INLINE_SQL_FUNCTIONS = "inline_sql_functions";
Expand Down Expand Up @@ -936,6 +937,11 @@ public SystemSessionProperties(
false,
value -> DataSize.valueOf((String) value),
DataSize::toString),
integerProperty(
DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER,
"Maximum number of build-side rows per driver up to which min and max values will be collected for dynamic filtering",
featuresConfig.getDynamicFilteringRangeRowLimitPerDriver(),
false),
booleanProperty(
FRAGMENT_RESULT_CACHING_ENABLED,
"Enable fragment result caching and read/write leaf fragment result pages from/to cache when applicable",
Expand Down Expand Up @@ -1683,6 +1689,11 @@ public static DataSize getDynamicFilteringMaxPerDriverSize(Session session)
return session.getSystemProperty(DYNAMIC_FILTERING_MAX_PER_DRIVER_SIZE, DataSize.class);
}

public static int getDynamicFilteringRangeRowLimitPerDriver(Session session)
{
return session.getSystemProperty(DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER, Integer.class);
}

public static boolean isFragmentResultCachingEnabled(Session session)
{
return session.getSystemProperty(FRAGMENT_RESULT_CACHING_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeUtils;
import com.facebook.presto.operator.aggregation.TypedSet;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.google.common.collect.ImmutableList;
Expand All @@ -33,6 +32,11 @@
import java.util.Optional;
import java.util.function.Consumer;

import static com.facebook.presto.common.predicate.Range.range;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.TypeUtils.isFloatingPointNaN;
import static com.facebook.presto.common.type.TypeUtils.readNativeValue;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
Expand All @@ -41,7 +45,8 @@
/**
* This operator acts as a simple "pass-through" pipe, while saving its input pages.
* The collected pages' value are used for creating a run-time filtering constraint (for probe-side table scan in an inner join).
* We support only small build-side pages (which should be the case when using "broadcast" join).
* We record all values for the run-time filter only for small build-side pages (which should be the case when using "broadcast" join).
* For large inputs on build side, we can optionally record the min and max values per channel for orderable types (except Double and Real).
*/
public class DynamicFilterSourceOperator
implements Operator
Expand Down Expand Up @@ -86,6 +91,7 @@ public static class DynamicFilterSourceOperatorFactory
private final List<Channel> channels;
private final int maxFilterPositionsCount;
private final DataSize maxFilterSize;
private final int minMaxCollectionLimit;

private boolean closed;

Expand All @@ -95,7 +101,8 @@ public DynamicFilterSourceOperatorFactory(
Consumer<TupleDomain<String>> dynamicPredicateConsumer,
List<Channel> channels,
int maxFilterPositionsCount,
DataSize maxFilterSize)
DataSize maxFilterSize,
int minMaxCollectionLimit)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
Expand All @@ -109,6 +116,7 @@ public DynamicFilterSourceOperatorFactory(
"duplicate channel indices are not allowed");
this.maxFilterPositionsCount = maxFilterPositionsCount;
this.maxFilterSize = maxFilterSize;
this.minMaxCollectionLimit = minMaxCollectionLimit;
}

@Override
Expand All @@ -121,7 +129,8 @@ public Operator createOperator(DriverContext driverContext)
channels,
planNodeId,
maxFilterPositionsCount,
maxFilterSize);
maxFilterSize,
minMaxCollectionLimit);
}

@Override
Expand All @@ -143,6 +152,7 @@ public OperatorFactory duplicate()
private final int maxFilterPositionsCount;
private final long maxFilterSizeInBytes;
private final List<Channel> channels;
private final List<Integer> minMaxChannels;

private boolean finished;
private Page current;
Expand All @@ -153,13 +163,20 @@ public OperatorFactory duplicate()
@Nullable
private TypedSet[] valueSets;

private int minMaxCollectionLimit;
@Nullable
private Block[] minValues;
@Nullable
private Block[] maxValues;

private DynamicFilterSourceOperator(
OperatorContext context,
Consumer<TupleDomain<String>> dynamicPredicateConsumer,
List<Channel> channels,
PlanNodeId planNodeId,
int maxFilterPositionsCount,
DataSize maxFilterSize)
DataSize maxFilterSize,
int minMaxCollectionLimit)
{
this.context = requireNonNull(context, "context is null");
this.maxFilterPositionsCount = maxFilterPositionsCount;
Expand All @@ -170,8 +187,13 @@ private DynamicFilterSourceOperator(

this.blockBuilders = new BlockBuilder[channels.size()];
this.valueSets = new TypedSet[channels.size()];
ImmutableList.Builder<Integer> minMaxChannelsBuilder = ImmutableList.builder();
for (int channelIndex = 0; channelIndex < channels.size(); ++channelIndex) {
Type type = channels.get(channelIndex).getType();
// Skipping DOUBLE and REAL in collectMinMaxValues to avoid dealing with NaN values
rzeyde-varada marked this conversation as resolved.
Show resolved Hide resolved
if (minMaxCollectionLimit > 0 && type.isOrderable() && type != DOUBLE && type != REAL) {
minMaxChannelsBuilder.add(channelIndex);
}
this.blockBuilders[channelIndex] = type.createBlockBuilder(null, EXPECTED_BLOCK_BUILDER_SIZE);
this.valueSets[channelIndex] = new TypedSet(
type,
Expand All @@ -180,6 +202,12 @@ private DynamicFilterSourceOperator(
String.format("DynamicFilterSourceOperator_%s_%d", planNodeId, channelIndex),
Optional.empty() /* maxBlockMemory */);
}
this.minMaxCollectionLimit = minMaxCollectionLimit;
minMaxChannels = minMaxChannelsBuilder.build();
if (!minMaxChannels.isEmpty()) {
minValues = new Block[channels.size()];
maxValues = new Block[channels.size()];
}
}

@Override
Expand All @@ -200,9 +228,24 @@ public void addInput(Page page)
verify(!finished, "DynamicFilterSourceOperator: addInput() shouldn't not be called after finish()");
current = page;
if (valueSets == null) {
return; // the predicate became too large.
// the exact predicate became too large.
if (minValues == null) {
// there are too many rows to collect min/max range
return;
}
minMaxCollectionLimit -= page.getPositionCount();
if (minMaxCollectionLimit < 0) {
handleMinMaxCollectionLimitExceeded();
return;
}
// the predicate became too large, record only min and max values for each orderable channel
for (Integer channelIndex : minMaxChannels) {
Block block = page.getBlock(channels.get(channelIndex).index);
updateMinMaxValues(block, channelIndex);
}
return;
}

minMaxCollectionLimit -= page.getPositionCount();
// TODO: we should account for the memory used for collecting build-side values using MemoryContext
long filterSizeInBytes = 0;
int filterPositionsCount = 0;
Expand All @@ -224,13 +267,82 @@ public void addInput(Page page)

private void handleTooLargePredicate()
{
// The resulting predicate is too large, allow all probe-side values to be read.
dynamicPredicateConsumer.accept(TupleDomain.all());
// The resulting predicate is too large
if (minMaxChannels.isEmpty()) {
// allow all probe-side values to be read.
dynamicPredicateConsumer.accept(TupleDomain.all());
}
else {
if (minMaxCollectionLimit < 0) {
handleMinMaxCollectionLimitExceeded();
}
else {
// convert to min/max per column for orderable types
for (Integer channelIndex : minMaxChannels) {
Block block = blockBuilders[channelIndex].build();
updateMinMaxValues(block, channelIndex);
}
}
}

// Drop references to collected values.
valueSets = null;
blockBuilders = null;
}

private void handleMinMaxCollectionLimitExceeded()
{
// allow all probe-side values to be read.
dynamicPredicateConsumer.accept(TupleDomain.all());
// Drop references to collected values.
minValues = null;
maxValues = null;
}

private void updateMinMaxValues(Block block, int channelIndex)
{
checkState(minValues != null && maxValues != null);
Type type = channels.get(channelIndex).type;
int minValuePosition = -1;
int maxValuePosition = -1;
for (int position = 0; position < block.getPositionCount(); ++position) {
if (block.isNull(position)) {
continue;
}
if (minValuePosition == -1) {
// First non-null value
minValuePosition = position;
maxValuePosition = position;
continue;
}
if (type.compareTo(block, position, block, minValuePosition) < 0) {
minValuePosition = position;
}
else if (type.compareTo(block, position, block, maxValuePosition) > 0) {
maxValuePosition = position;
}
}
if (minValuePosition == -1) {
// all block values are nulls
return;
}
if (minValues[channelIndex] == null) {
// First Page with non-null value for this block
minValues[channelIndex] = block.getSingleValueBlock(minValuePosition);
maxValues[channelIndex] = block.getSingleValueBlock(maxValuePosition);
return;
}
// Compare with min/max values from previous Pages
Block currentMin = minValues[channelIndex];
Block currentMax = maxValues[channelIndex];
if (type.compareTo(block, minValuePosition, currentMin, 0) < 0) {
minValues[channelIndex] = block.getSingleValueBlock(minValuePosition);
}
if (type.compareTo(block, maxValuePosition, currentMax, 0) > 0) {
maxValues[channelIndex] = block.getSingleValueBlock(maxValuePosition);
}
}

@Override
public Page getOutput()
{
Expand All @@ -247,12 +359,36 @@ public void finish()
return;
}
finished = true;
ImmutableMap.Builder<String, Domain> domainsBuilder = ImmutableMap.builder();
if (valueSets == null) {
return; // the predicate became too large.
if (minValues == null) {
// there were too many rows to collect collect min/max range
// dynamicPredicateConsumer was notified with 'all' in handleTooLargePredicate if there are no orderable types,
// else it was notified with 'all' in handleMinMaxCollectionLimitExceeded
return;
}
// valueSets became too large, create TupleDomain from min/max values
for (Integer channelIndex : minMaxChannels) {
Type type = channels.get(channelIndex).type;
if (minValues[channelIndex] == null) {
// all values were null
domainsBuilder.put(channels.get(channelIndex).filterId, Domain.none(type));
continue;
}
Object min = readNativeValue(type, minValues[channelIndex], 0);
Object max = readNativeValue(type, maxValues[channelIndex], 0);
Domain domain = Domain.create(
ValueSet.ofRanges(range(type, min, true, max, true)),
false);
domainsBuilder.put(channels.get(channelIndex).filterId, domain);
}
minValues = null;
maxValues = null;
dynamicPredicateConsumer.accept(TupleDomain.withColumnDomains(domainsBuilder.build()));
return;
}

verify(blockBuilders != null, "blockBuilders is null when finish is called in DynamicFilterSourceOperator");
ImmutableMap.Builder<String, Domain> domainsBuilder = ImmutableMap.builder();
for (int channelIndex = 0; channelIndex < channels.size(); ++channelIndex) {
Block block = blockBuilders[channelIndex].build();
Type type = channels.get(channelIndex).getType();
Expand All @@ -267,9 +403,12 @@ private Domain convertToDomain(Type type, Block block)
{
ImmutableList.Builder<Object> values = ImmutableList.builder();
for (int position = 0; position < block.getPositionCount(); ++position) {
Object value = TypeUtils.readNativeValue(type, block, position);
Object value = readNativeValue(type, block, position);
if (value != null) {
values.add(value);
// join doesn't match rows with NaN values.
if (!isFloatingPointNaN(type, value)) {
values.add(value);
}
}
}
// Inner and right join doesn't match rows with null key column values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public class FeaturesConfig
private boolean enableDynamicFiltering;
private int dynamicFilteringMaxPerDriverRowCount = 100;
private DataSize dynamicFilteringMaxPerDriverSize = new DataSize(10, KILOBYTE);
private int dynamicFilteringRangeRowLimitPerDriver;

private boolean fragmentResultCachingEnabled;

Expand Down Expand Up @@ -1150,6 +1151,19 @@ public FeaturesConfig setDynamicFilteringMaxPerDriverSize(DataSize dynamicFilter
return this;
}

public int getDynamicFilteringRangeRowLimitPerDriver()
{
return dynamicFilteringRangeRowLimitPerDriver;
}

@Config("experimental.dynamic-filtering-range-row-limit-per-driver")
@ConfigDescription("Maximum number of build-side rows per driver up to which min and max values will be collected for dynamic filtering")
public FeaturesConfig setDynamicFilteringRangeRowLimitPerDriver(int dynamicFilteringRangeRowLimitPerDriver)
kewang1024 marked this conversation as resolved.
Show resolved Hide resolved
{
this.dynamicFilteringRangeRowLimitPerDriver = dynamicFilteringRangeRowLimitPerDriver;
return this;
}

public boolean isFragmentResultCachingEnabled()
{
return fragmentResultCachingEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@
import static com.facebook.presto.SystemSessionProperties.getAggregationOperatorUnspillMemoryLimit;
import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringMaxPerDriverRowCount;
import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringMaxPerDriverSize;
import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringRangeRowLimitPerDriver;
import static com.facebook.presto.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount;
import static com.facebook.presto.SystemSessionProperties.getFilterAndProjectMinOutputPageSize;
import static com.facebook.presto.SystemSessionProperties.getIndexLoaderTimeout;
Expand Down Expand Up @@ -2323,7 +2324,8 @@ private DynamicFilterSourceOperator.DynamicFilterSourceOperatorFactory createDyn
dynamicFilter.getTupleDomainConsumer(),
filterBuildChannels,
getDynamicFilteringMaxPerDriverRowCount(context.getSession()),
getDynamicFilteringMaxPerDriverSize(context.getSession()));
getDynamicFilteringMaxPerDriverSize(context.getSession()),
getDynamicFilteringRangeRowLimitPerDriver(context.getSession()));
}

private Optional<LocalDynamicFilter> createDynamicFilter(PhysicalOperation buildSource, AbstractJoinNode node, LocalExecutionPlanContext context, int partitionCount)
rzeyde-varada marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Loading