diff --git a/presto-common/src/main/java/com/facebook/presto/common/type/TypeUtils.java b/presto-common/src/main/java/com/facebook/presto/common/type/TypeUtils.java index 0c4e9d0bf422..4a8612cdce74 100644 --- a/presto-common/src/main/java/com/facebook/presto/common/type/TypeUtils.java +++ b/presto-common/src/main/java/com/facebook/presto/common/type/TypeUtils.java @@ -29,7 +29,10 @@ import static com.facebook.presto.common.type.RealType.REAL; import static com.facebook.presto.common.type.SmallintType.SMALLINT; import static com.facebook.presto.common.type.TinyintType.TINYINT; +import static java.lang.Float.intBitsToFloat; +import static java.lang.Math.toIntExact; import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; @@ -129,6 +132,20 @@ public static long hashPosition(Type type, Block block, int position) return type.hash(block, position); } + public static boolean isFloatingPointNaN(Type type, Object value) + { + requireNonNull(type, "type is null"); + requireNonNull(value, "value is null"); + + if (type == DOUBLE) { + return Double.isNaN((double) value); + } + if (type == REAL) { + return Float.isNaN(intBitsToFloat(toIntExact((long) value))); + } + return false; + } + static void checkElementNotNull(boolean isNull, String errorMsg) { if (isNull) { diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 846d92381d6f..7bd981a7e027 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -176,6 +176,7 @@ public final class SystemSessionProperties public static final String ENABLE_DYNAMIC_FILTERING = "enable_dynamic_filtering"; public static final String DYNAMIC_FILTERING_MAX_PER_DRIVER_ROW_COUNT = "dynamic_filtering_max_per_driver_row_count"; public static final String DYNAMIC_FILTERING_MAX_PER_DRIVER_SIZE = "dynamic_filtering_max_per_driver_size"; + public static final String DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER = "dynamic_filtering_range_row_limit_per_driver"; public static final String FRAGMENT_RESULT_CACHING_ENABLED = "fragment_result_caching_enabled"; public static final String LEGACY_TYPE_COERCION_WARNING_ENABLED = "legacy_type_coercion_warning_enabled"; public static final String INLINE_SQL_FUNCTIONS = "inline_sql_functions"; @@ -936,6 +937,11 @@ public SystemSessionProperties( false, value -> DataSize.valueOf((String) value), DataSize::toString), + integerProperty( + DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER, + "Maximum number of build-side rows per driver up to which min and max values will be collected for dynamic filtering", + featuresConfig.getDynamicFilteringRangeRowLimitPerDriver(), + false), booleanProperty( FRAGMENT_RESULT_CACHING_ENABLED, "Enable fragment result caching and read/write leaf fragment result pages from/to cache when applicable", @@ -1683,6 +1689,11 @@ public static DataSize getDynamicFilteringMaxPerDriverSize(Session session) return session.getSystemProperty(DYNAMIC_FILTERING_MAX_PER_DRIVER_SIZE, DataSize.class); } + public static int getDynamicFilteringRangeRowLimitPerDriver(Session session) + { + return session.getSystemProperty(DYNAMIC_FILTERING_RANGE_ROW_LIMIT_PER_DRIVER, Integer.class); + } + public static boolean isFragmentResultCachingEnabled(Session session) { return session.getSystemProperty(FRAGMENT_RESULT_CACHING_ENABLED, Boolean.class); diff --git a/presto-main/src/main/java/com/facebook/presto/operator/DynamicFilterSourceOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/DynamicFilterSourceOperator.java index d0b820f42441..a8f41061e42e 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/DynamicFilterSourceOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/DynamicFilterSourceOperator.java @@ -20,7 +20,6 @@ import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.predicate.ValueSet; import com.facebook.presto.common.type.Type; -import com.facebook.presto.common.type.TypeUtils; import com.facebook.presto.operator.aggregation.TypedSet; import com.facebook.presto.spi.plan.PlanNodeId; import com.google.common.collect.ImmutableList; @@ -33,6 +32,11 @@ import java.util.Optional; import java.util.function.Consumer; +import static com.facebook.presto.common.predicate.Range.range; +import static com.facebook.presto.common.type.DoubleType.DOUBLE; +import static com.facebook.presto.common.type.RealType.REAL; +import static com.facebook.presto.common.type.TypeUtils.isFloatingPointNaN; +import static com.facebook.presto.common.type.TypeUtils.readNativeValue; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static java.util.Objects.requireNonNull; @@ -41,7 +45,8 @@ /** * This operator acts as a simple "pass-through" pipe, while saving its input pages. * The collected pages' value are used for creating a run-time filtering constraint (for probe-side table scan in an inner join). - * We support only small build-side pages (which should be the case when using "broadcast" join). + * We record all values for the run-time filter only for small build-side pages (which should be the case when using "broadcast" join). + * For large inputs on build side, we can optionally record the min and max values per channel for orderable types (except Double and Real). */ public class DynamicFilterSourceOperator implements Operator @@ -86,6 +91,7 @@ public static class DynamicFilterSourceOperatorFactory private final List channels; private final int maxFilterPositionsCount; private final DataSize maxFilterSize; + private final int minMaxCollectionLimit; private boolean closed; @@ -95,7 +101,8 @@ public DynamicFilterSourceOperatorFactory( Consumer> dynamicPredicateConsumer, List channels, int maxFilterPositionsCount, - DataSize maxFilterSize) + DataSize maxFilterSize, + int minMaxCollectionLimit) { this.operatorId = operatorId; this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); @@ -109,6 +116,7 @@ public DynamicFilterSourceOperatorFactory( "duplicate channel indices are not allowed"); this.maxFilterPositionsCount = maxFilterPositionsCount; this.maxFilterSize = maxFilterSize; + this.minMaxCollectionLimit = minMaxCollectionLimit; } @Override @@ -121,7 +129,8 @@ public Operator createOperator(DriverContext driverContext) channels, planNodeId, maxFilterPositionsCount, - maxFilterSize); + maxFilterSize, + minMaxCollectionLimit); } @Override @@ -143,6 +152,7 @@ public OperatorFactory duplicate() private final int maxFilterPositionsCount; private final long maxFilterSizeInBytes; private final List channels; + private final List minMaxChannels; private boolean finished; private Page current; @@ -153,13 +163,20 @@ public OperatorFactory duplicate() @Nullable private TypedSet[] valueSets; + private int minMaxCollectionLimit; + @Nullable + private Block[] minValues; + @Nullable + private Block[] maxValues; + private DynamicFilterSourceOperator( OperatorContext context, Consumer> dynamicPredicateConsumer, List channels, PlanNodeId planNodeId, int maxFilterPositionsCount, - DataSize maxFilterSize) + DataSize maxFilterSize, + int minMaxCollectionLimit) { this.context = requireNonNull(context, "context is null"); this.maxFilterPositionsCount = maxFilterPositionsCount; @@ -170,8 +187,13 @@ private DynamicFilterSourceOperator( this.blockBuilders = new BlockBuilder[channels.size()]; this.valueSets = new TypedSet[channels.size()]; + ImmutableList.Builder minMaxChannelsBuilder = ImmutableList.builder(); for (int channelIndex = 0; channelIndex < channels.size(); ++channelIndex) { Type type = channels.get(channelIndex).getType(); + // Skipping DOUBLE and REAL in collectMinMaxValues to avoid dealing with NaN values + if (minMaxCollectionLimit > 0 && type.isOrderable() && type != DOUBLE && type != REAL) { + minMaxChannelsBuilder.add(channelIndex); + } this.blockBuilders[channelIndex] = type.createBlockBuilder(null, EXPECTED_BLOCK_BUILDER_SIZE); this.valueSets[channelIndex] = new TypedSet( type, @@ -180,6 +202,12 @@ private DynamicFilterSourceOperator( String.format("DynamicFilterSourceOperator_%s_%d", planNodeId, channelIndex), Optional.empty() /* maxBlockMemory */); } + this.minMaxCollectionLimit = minMaxCollectionLimit; + minMaxChannels = minMaxChannelsBuilder.build(); + if (!minMaxChannels.isEmpty()) { + minValues = new Block[channels.size()]; + maxValues = new Block[channels.size()]; + } } @Override @@ -200,9 +228,24 @@ public void addInput(Page page) verify(!finished, "DynamicFilterSourceOperator: addInput() shouldn't not be called after finish()"); current = page; if (valueSets == null) { - return; // the predicate became too large. + // the exact predicate became too large. + if (minValues == null) { + // there are too many rows to collect min/max range + return; + } + minMaxCollectionLimit -= page.getPositionCount(); + if (minMaxCollectionLimit < 0) { + handleMinMaxCollectionLimitExceeded(); + return; + } + // the predicate became too large, record only min and max values for each orderable channel + for (Integer channelIndex : minMaxChannels) { + Block block = page.getBlock(channels.get(channelIndex).index); + updateMinMaxValues(block, channelIndex); + } + return; } - + minMaxCollectionLimit -= page.getPositionCount(); // TODO: we should account for the memory used for collecting build-side values using MemoryContext long filterSizeInBytes = 0; int filterPositionsCount = 0; @@ -224,13 +267,82 @@ public void addInput(Page page) private void handleTooLargePredicate() { - // The resulting predicate is too large, allow all probe-side values to be read. - dynamicPredicateConsumer.accept(TupleDomain.all()); + // The resulting predicate is too large + if (minMaxChannels.isEmpty()) { + // allow all probe-side values to be read. + dynamicPredicateConsumer.accept(TupleDomain.all()); + } + else { + if (minMaxCollectionLimit < 0) { + handleMinMaxCollectionLimitExceeded(); + } + else { + // convert to min/max per column for orderable types + for (Integer channelIndex : minMaxChannels) { + Block block = blockBuilders[channelIndex].build(); + updateMinMaxValues(block, channelIndex); + } + } + } + // Drop references to collected values. valueSets = null; blockBuilders = null; } + private void handleMinMaxCollectionLimitExceeded() + { + // allow all probe-side values to be read. + dynamicPredicateConsumer.accept(TupleDomain.all()); + // Drop references to collected values. + minValues = null; + maxValues = null; + } + + private void updateMinMaxValues(Block block, int channelIndex) + { + checkState(minValues != null && maxValues != null); + Type type = channels.get(channelIndex).type; + int minValuePosition = -1; + int maxValuePosition = -1; + for (int position = 0; position < block.getPositionCount(); ++position) { + if (block.isNull(position)) { + continue; + } + if (minValuePosition == -1) { + // First non-null value + minValuePosition = position; + maxValuePosition = position; + continue; + } + if (type.compareTo(block, position, block, minValuePosition) < 0) { + minValuePosition = position; + } + else if (type.compareTo(block, position, block, maxValuePosition) > 0) { + maxValuePosition = position; + } + } + if (minValuePosition == -1) { + // all block values are nulls + return; + } + if (minValues[channelIndex] == null) { + // First Page with non-null value for this block + minValues[channelIndex] = block.getSingleValueBlock(minValuePosition); + maxValues[channelIndex] = block.getSingleValueBlock(maxValuePosition); + return; + } + // Compare with min/max values from previous Pages + Block currentMin = minValues[channelIndex]; + Block currentMax = maxValues[channelIndex]; + if (type.compareTo(block, minValuePosition, currentMin, 0) < 0) { + minValues[channelIndex] = block.getSingleValueBlock(minValuePosition); + } + if (type.compareTo(block, maxValuePosition, currentMax, 0) > 0) { + maxValues[channelIndex] = block.getSingleValueBlock(maxValuePosition); + } + } + @Override public Page getOutput() { @@ -247,12 +359,36 @@ public void finish() return; } finished = true; + ImmutableMap.Builder domainsBuilder = ImmutableMap.builder(); if (valueSets == null) { - return; // the predicate became too large. + if (minValues == null) { + // there were too many rows to collect collect min/max range + // dynamicPredicateConsumer was notified with 'all' in handleTooLargePredicate if there are no orderable types, + // else it was notified with 'all' in handleMinMaxCollectionLimitExceeded + return; + } + // valueSets became too large, create TupleDomain from min/max values + for (Integer channelIndex : minMaxChannels) { + Type type = channels.get(channelIndex).type; + if (minValues[channelIndex] == null) { + // all values were null + domainsBuilder.put(channels.get(channelIndex).filterId, Domain.none(type)); + continue; + } + Object min = readNativeValue(type, minValues[channelIndex], 0); + Object max = readNativeValue(type, maxValues[channelIndex], 0); + Domain domain = Domain.create( + ValueSet.ofRanges(range(type, min, true, max, true)), + false); + domainsBuilder.put(channels.get(channelIndex).filterId, domain); + } + minValues = null; + maxValues = null; + dynamicPredicateConsumer.accept(TupleDomain.withColumnDomains(domainsBuilder.build())); + return; } verify(blockBuilders != null, "blockBuilders is null when finish is called in DynamicFilterSourceOperator"); - ImmutableMap.Builder domainsBuilder = ImmutableMap.builder(); for (int channelIndex = 0; channelIndex < channels.size(); ++channelIndex) { Block block = blockBuilders[channelIndex].build(); Type type = channels.get(channelIndex).getType(); @@ -267,9 +403,12 @@ private Domain convertToDomain(Type type, Block block) { ImmutableList.Builder values = ImmutableList.builder(); for (int position = 0; position < block.getPositionCount(); ++position) { - Object value = TypeUtils.readNativeValue(type, block, position); + Object value = readNativeValue(type, block, position); if (value != null) { - values.add(value); + // join doesn't match rows with NaN values. + if (!isFloatingPointNaN(type, value)) { + values.add(value); + } } } // Inner and right join doesn't match rows with null key column values. diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 6431fb5c1bcc..64fafb886f71 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -152,6 +152,7 @@ public class FeaturesConfig private boolean enableDynamicFiltering; private int dynamicFilteringMaxPerDriverRowCount = 100; private DataSize dynamicFilteringMaxPerDriverSize = new DataSize(10, KILOBYTE); + private int dynamicFilteringRangeRowLimitPerDriver; private boolean fragmentResultCachingEnabled; @@ -1150,6 +1151,19 @@ public FeaturesConfig setDynamicFilteringMaxPerDriverSize(DataSize dynamicFilter return this; } + public int getDynamicFilteringRangeRowLimitPerDriver() + { + return dynamicFilteringRangeRowLimitPerDriver; + } + + @Config("experimental.dynamic-filtering-range-row-limit-per-driver") + @ConfigDescription("Maximum number of build-side rows per driver up to which min and max values will be collected for dynamic filtering") + public FeaturesConfig setDynamicFilteringRangeRowLimitPerDriver(int dynamicFilteringRangeRowLimitPerDriver) + { + this.dynamicFilteringRangeRowLimitPerDriver = dynamicFilteringRangeRowLimitPerDriver; + return this; + } + public boolean isFragmentResultCachingEnabled() { return fragmentResultCachingEnabled; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index eaddaf878411..cf62ac40d443 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -246,6 +246,7 @@ import static com.facebook.presto.SystemSessionProperties.getAggregationOperatorUnspillMemoryLimit; import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringMaxPerDriverRowCount; import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringMaxPerDriverSize; +import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringRangeRowLimitPerDriver; import static com.facebook.presto.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount; import static com.facebook.presto.SystemSessionProperties.getFilterAndProjectMinOutputPageSize; import static com.facebook.presto.SystemSessionProperties.getIndexLoaderTimeout; @@ -2323,7 +2324,8 @@ private DynamicFilterSourceOperator.DynamicFilterSourceOperatorFactory createDyn dynamicFilter.getTupleDomainConsumer(), filterBuildChannels, getDynamicFilteringMaxPerDriverRowCount(context.getSession()), - getDynamicFilteringMaxPerDriverSize(context.getSession())); + getDynamicFilteringMaxPerDriverSize(context.getSession()), + getDynamicFilteringRangeRowLimitPerDriver(context.getSession())); } private Optional createDynamicFilter(PhysicalOperation buildSource, AbstractJoinNode node, LocalExecutionPlanContext context, int partitionCount) diff --git a/presto-main/src/test/java/com/facebook/presto/block/BlockAssertions.java b/presto-main/src/test/java/com/facebook/presto/block/BlockAssertions.java index 0dfbd520c8cc..1e010d68b9da 100644 --- a/presto-main/src/test/java/com/facebook/presto/block/BlockAssertions.java +++ b/presto-main/src/test/java/com/facebook/presto/block/BlockAssertions.java @@ -62,6 +62,7 @@ import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.testing.TestingConnectorSession.SESSION; import static com.facebook.presto.testing.TestingEnvironment.getOperatorMethodHandle; +import static com.facebook.presto.type.ColorType.COLOR; import static com.facebook.presto.util.StructuralTestUtil.appendToBlockBuilder; import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.slice.Slices.utf8Slice; @@ -535,6 +536,15 @@ public static Block createLongRepeatBlock(int value, int length) return builder.build(); } + public static Block createDoubleRepeatBlock(double value, int length) + { + BlockBuilder builder = DOUBLE.createFixedSizeBlockBuilder(length); + for (int i = 0; i < length; i++) { + DOUBLE.writeDouble(builder, value); + } + return builder.build(); + } + public static Block createTimestampsWithTimezoneBlock(Long... values) { BlockBuilder builder = TIMESTAMP_WITH_TIME_ZONE.createFixedSizeBlockBuilder(values.length); @@ -562,7 +572,7 @@ public static Block createBlockOfReals(Float... values) return createBlockOfReals(Arrays.asList(values)); } - private static Block createBlockOfReals(Iterable values) + public static Block createBlockOfReals(Iterable values) { BlockBuilder builder = REAL.createBlockBuilder(null, 100); for (Float value : values) { @@ -684,6 +694,24 @@ public static Block createLongDecimalSequenceBlock(int start, int end, DecimalTy return builder.build(); } + public static Block createColorRepeatBlock(int value, int length) + { + BlockBuilder builder = COLOR.createFixedSizeBlockBuilder(length); + for (int i = 0; i < length; i++) { + COLOR.writeLong(builder, value); + } + return builder.build(); + } + + public static Block createColorSequenceBlock(int start, int end) + { + BlockBuilder builder = COLOR.createBlockBuilder(null, end - start); + for (int i = start; i < end; ++i) { + COLOR.writeLong(builder, i); + } + return builder.build(); + } + public static RunLengthEncodedBlock createRLEBlock(double value, int positionCount) { BlockBuilder blockBuilder = DOUBLE.createBlockBuilder(null, 1); diff --git a/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkDynamicFilterSourceOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkDynamicFilterSourceOperator.java index 47e369359400..da10ae019e50 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkDynamicFilterSourceOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkDynamicFilterSourceOperator.java @@ -48,6 +48,7 @@ import static com.facebook.presto.SessionTestUtils.TEST_SESSION; import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringMaxPerDriverRowCount; import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringMaxPerDriverSize; +import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringRangeRowLimitPerDriver; import static com.facebook.presto.common.type.BigintType.BIGINT; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static java.util.concurrent.Executors.newCachedThreadPool; @@ -88,7 +89,8 @@ public void setup() (tupleDomain -> {}), ImmutableList.of(new DynamicFilterSourceOperator.Channel("0", BIGINT, 0)), getDynamicFilteringMaxPerDriverRowCount(TEST_SESSION), - getDynamicFilteringMaxPerDriverSize(TEST_SESSION)); + getDynamicFilteringMaxPerDriverSize(TEST_SESSION), + getDynamicFilteringRangeRowLimitPerDriver(TEST_SESSION)); } @TearDown diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestDynamicFilterSourceOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestDynamicFilterSourceOperator.java index b34a820ea252..390a5796532f 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestDynamicFilterSourceOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestDynamicFilterSourceOperator.java @@ -15,6 +15,7 @@ import com.facebook.presto.common.Page; import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.block.BlockBuilder; import com.facebook.presto.common.predicate.Domain; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.predicate.ValueSet; @@ -23,38 +24,51 @@ import com.facebook.presto.testing.MaterializedResult; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.IntStream; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; -import static com.facebook.presto.SequencePageBuilder.createSequencePage; import static com.facebook.presto.SessionTestUtils.TEST_SESSION; -import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringMaxPerDriverRowCount; -import static com.facebook.presto.SystemSessionProperties.getDynamicFilteringMaxPerDriverSize; +import static com.facebook.presto.block.BlockAssertions.createBlockOfReals; import static com.facebook.presto.block.BlockAssertions.createBooleansBlock; +import static com.facebook.presto.block.BlockAssertions.createColorSequenceBlock; +import static com.facebook.presto.block.BlockAssertions.createDoubleRepeatBlock; +import static com.facebook.presto.block.BlockAssertions.createDoubleSequenceBlock; import static com.facebook.presto.block.BlockAssertions.createDoublesBlock; import static com.facebook.presto.block.BlockAssertions.createLongRepeatBlock; +import static com.facebook.presto.block.BlockAssertions.createLongSequenceBlock; import static com.facebook.presto.block.BlockAssertions.createLongsBlock; +import static com.facebook.presto.block.BlockAssertions.createSequenceBlockOfReal; import static com.facebook.presto.block.BlockAssertions.createStringsBlock; +import static com.facebook.presto.common.predicate.Range.range; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.BooleanType.BOOLEAN; import static com.facebook.presto.common.type.DoubleType.DOUBLE; import static com.facebook.presto.common.type.IntegerType.INTEGER; +import static com.facebook.presto.common.type.RealType.REAL; +import static com.facebook.presto.common.type.TypeUtils.readNativeValue; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.operator.OperatorAssertion.toMaterializedResult; import static com.facebook.presto.operator.OperatorAssertion.toPages; import static com.facebook.presto.testing.TestingTaskContext.createTaskContext; import static com.facebook.presto.testing.assertions.Assert.assertEquals; +import static com.facebook.presto.type.ColorType.COLOR; import static com.google.common.base.Strings.repeat; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.slice.Slices.utf8Slice; +import static io.airlift.units.DataSize.Unit.KILOBYTE; +import static java.lang.Float.floatToRawIntBits; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool; -import static java.util.stream.Collectors.toList; @Test(singleThreaded = true) public class TestDynamicFilterSourceOperator @@ -83,7 +97,12 @@ public void tearDown() scheduledExecutor.shutdownNow(); } - private void verifyPassThrough(Operator operator, List types, Page... pages) + private void verifyPassthrough(Operator operator, List types, Page... pages) + { + verifyPassthrough(operator, types, Arrays.asList(pages)); + } + + private void verifyPassthrough(Operator operator, List types, List pages) { List inputPages = ImmutableList.copyOf(pages); List outputPages = toPages(operator, inputPages.iterator()); @@ -93,14 +112,24 @@ private void verifyPassThrough(Operator operator, List types, Page... page } private OperatorFactory createOperatorFactory(DynamicFilterSourceOperator.Channel... buildChannels) + { + return createOperatorFactory(100, new DataSize(10, KILOBYTE), 1_000_000, Arrays.asList(buildChannels)); + } + + private OperatorFactory createOperatorFactory( + int maxFilterPositionsCount, + DataSize maxFilterSize, + int minMaxCollectionLimit, + Iterable buildChannels) { return new DynamicFilterSourceOperator.DynamicFilterSourceOperatorFactory( 0, new PlanNodeId("PLAN_NODE_ID"), this::consumePredicate, - Arrays.stream(buildChannels).collect(toList()), - getDynamicFilteringMaxPerDriverRowCount(TEST_SESSION), - getDynamicFilteringMaxPerDriverSize(TEST_SESSION)); + ImmutableList.copyOf(buildChannels), + maxFilterPositionsCount, + maxFilterSize, + minMaxCollectionLimit); } private void consumePredicate(TupleDomain partitionPredicate) @@ -118,49 +147,69 @@ private static DynamicFilterSourceOperator.Channel channel(int index, Type type) return new DynamicFilterSourceOperator.Channel(Integer.toString(index), type, index); } + private void assertDynamicFilters(int maxFilterPositionsCount, List types, List pages, List> expectedTupleDomains) + { + assertDynamicFilters(maxFilterPositionsCount, new DataSize(10, KILOBYTE), 1_000_000, types, pages, expectedTupleDomains); + } + + private void assertDynamicFilters( + int maxFilterPositionsCount, + DataSize maxFilterSize, + int minMaxCollectionLimit, + List types, + List pages, + List> expectedTupleDomains) + { + List buildChannels = IntStream.range(0, types.size()) + .mapToObj(i -> channel(i, types.get(i))) + .collect(toImmutableList()); + OperatorFactory operatorFactory = createOperatorFactory(maxFilterPositionsCount, maxFilterSize, minMaxCollectionLimit, buildChannels); + verifyPassthrough(createOperator(operatorFactory), types, pages); + operatorFactory.noMoreOperators(); + assertEquals(partitions.build(), expectedTupleDomains); + } + @Test public void testCollectMultipleOperators() { OperatorFactory operatorFactory = createOperatorFactory(channel(0, BIGINT)); - Operator operator1 = createOperator(operatorFactory); // will finish before noMoreOperators() - verifyPassThrough(operator1, - ImmutableList.of(BIGINT), - new Page(createLongsBlock(1, 2)), - new Page(createLongsBlock(3, 5))); + Operator op1 = createOperator(operatorFactory); // will finish before noMoreOperators() + verifyPassthrough(op1, + ImmutableList.of(BIGINT), + new Page(createLongsBlock(1, 2)), + new Page(createLongsBlock(3, 5))); - Operator operator2 = createOperator(operatorFactory); // will finish after noMoreOperators() + Operator op2 = createOperator(operatorFactory); // will finish after noMoreOperators() operatorFactory.noMoreOperators(); - assertEquals( - partitions.build(), - ImmutableList.of(TupleDomain.withColumnDomains(ImmutableMap.of("0", Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L, 5L)))))); + assertEquals(partitions.build(), ImmutableList.of( + TupleDomain.withColumnDomains(ImmutableMap.of( + "0", Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L, 5L)))))); - verifyPassThrough( - operator2, + verifyPassthrough(op2, ImmutableList.of(BIGINT), new Page(createLongsBlock(2, 3)), new Page(createLongsBlock(1, 4))); - assertEquals( - partitions.build(), - ImmutableList.of(TupleDomain.withColumnDomains(ImmutableMap.of("0", Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L, 5L)))), - TupleDomain.withColumnDomains(ImmutableMap.of("0", Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L, 4L)))))); + assertEquals(partitions.build(), ImmutableList.of( + TupleDomain.withColumnDomains(ImmutableMap.of( + "0", Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L, 5L)))), + TupleDomain.withColumnDomains(ImmutableMap.of( + "0", Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L, 4L)))))); } @Test public void testCollectMultipleColumns() { OperatorFactory operatorFactory = createOperatorFactory(channel(0, BOOLEAN), channel(1, DOUBLE)); - verifyPassThrough( - createOperator(operatorFactory), + verifyPassthrough(createOperator(operatorFactory), ImmutableList.of(BOOLEAN, DOUBLE), new Page(createBooleansBlock(true, 2), createDoublesBlock(1.5, 3.0)), new Page(createBooleansBlock(false, 1), createDoublesBlock(4.5))); operatorFactory.noMoreOperators(); - assertEquals( - partitions.build(), - ImmutableList.of(TupleDomain.withColumnDomains(ImmutableMap.of( + assertEquals(partitions.build(), ImmutableList.of( + TupleDomain.withColumnDomains(ImmutableMap.of( "0", Domain.multipleValues(BOOLEAN, ImmutableList.of(true, false)), "1", Domain.multipleValues(DOUBLE, ImmutableList.of(1.5, 3.0, 4.5)))))); } @@ -169,65 +218,152 @@ public void testCollectMultipleColumns() public void testCollectOnlyFirstColumn() { OperatorFactory operatorFactory = createOperatorFactory(channel(0, BOOLEAN)); - verifyPassThrough( - createOperator(operatorFactory), + verifyPassthrough(createOperator(operatorFactory), ImmutableList.of(BOOLEAN, DOUBLE), new Page(createBooleansBlock(true, 2), createDoublesBlock(1.5, 3.0)), new Page(createBooleansBlock(false, 1), createDoublesBlock(4.5))); operatorFactory.noMoreOperators(); - assertEquals( - partitions.build(), - ImmutableList.of(TupleDomain.withColumnDomains(ImmutableMap.of("0", Domain.multipleValues(BOOLEAN, ImmutableList.of(true, false)))))); + assertEquals(partitions.build(), ImmutableList.of( + TupleDomain.withColumnDomains(ImmutableMap.of( + "0", Domain.multipleValues(BOOLEAN, ImmutableList.of(true, false)))))); } @Test public void testCollectOnlyLastColumn() { OperatorFactory operatorFactory = createOperatorFactory(channel(1, DOUBLE)); - verifyPassThrough( - createOperator(operatorFactory), + verifyPassthrough(createOperator(operatorFactory), ImmutableList.of(BOOLEAN, DOUBLE), new Page(createBooleansBlock(true, 2), createDoublesBlock(1.5, 3.0)), new Page(createBooleansBlock(false, 1), createDoublesBlock(4.5))); operatorFactory.noMoreOperators(); - assertEquals( - partitions.build(), - ImmutableList.of(TupleDomain.withColumnDomains(ImmutableMap.of( + assertEquals(partitions.build(), ImmutableList.of( + TupleDomain.withColumnDomains(ImmutableMap.of( "1", Domain.multipleValues(DOUBLE, ImmutableList.of(1.5, 3.0, 4.5)))))); } @Test public void testCollectWithNulls() { - Block blockWithNulls = INTEGER.createFixedSizeBlockBuilder(0) + Block blockWithNulls = INTEGER + .createFixedSizeBlockBuilder(0) .writeInt(3) .appendNull() .writeInt(4) .build(); OperatorFactory operatorFactory = createOperatorFactory(channel(0, INTEGER)); - verifyPassThrough( - createOperator(operatorFactory), + verifyPassthrough(createOperator(operatorFactory), ImmutableList.of(INTEGER), new Page(createLongsBlock(1, 2, 3)), new Page(blockWithNulls), new Page(createLongsBlock(4, 5))); operatorFactory.noMoreOperators(); - assertEquals( - partitions.build(), - ImmutableList.of(TupleDomain.withColumnDomains(ImmutableMap.of( + assertEquals(partitions.build(), ImmutableList.of( + TupleDomain.withColumnDomains(ImmutableMap.of( "0", Domain.create(ValueSet.of(INTEGER, 1L, 2L, 3L, 4L, 5L), false))))); } + @Test + public void testCollectWithDoubleNaN() + { + BlockBuilder input = DOUBLE.createBlockBuilder(null, 10); + DOUBLE.writeDouble(input, 42.0); + DOUBLE.writeDouble(input, Double.NaN); + + OperatorFactory operatorFactory = createOperatorFactory(channel(0, DOUBLE)); + verifyPassthrough(createOperator(operatorFactory), + ImmutableList.of(DOUBLE), + new Page(input.build())); + operatorFactory.noMoreOperators(); + + assertEquals(partitions.build(), ImmutableList.of( + TupleDomain.withColumnDomains(ImmutableMap.of( + "0", Domain.multipleValues(DOUBLE, ImmutableList.of(42.0)))))); + } + + @Test + public void testCollectWithRealNaN() + { + BlockBuilder input = REAL.createBlockBuilder(null, 10); + REAL.writeLong(input, floatToRawIntBits(42.0f)); + REAL.writeLong(input, floatToRawIntBits(Float.NaN)); + + OperatorFactory operatorFactory = createOperatorFactory(channel(0, REAL)); + verifyPassthrough(createOperator(operatorFactory), + ImmutableList.of(REAL), + new Page(input.build())); + operatorFactory.noMoreOperators(); + + assertEquals(partitions.build(), ImmutableList.of( + TupleDomain.withColumnDomains(ImmutableMap.of( + "0", Domain.multipleValues(REAL, ImmutableList.of((long) floatToRawIntBits(42.0f))))))); + } + + @Test + public void testCollectTooMuchRowsDouble() + { + int maxPositionsCount = 100; + assertDynamicFilters( + maxPositionsCount, + ImmutableList.of(DOUBLE), + ImmutableList.of( + new Page(createDoubleSequenceBlock(0, maxPositionsCount + 1)), + new Page(createDoubleRepeatBlock(Double.NaN, maxPositionsCount + 1))), + ImmutableList.of(TupleDomain.all())); + } + + @Test + public void testCollectTooMuchRowsReal() + { + int maxPositionsCount = 100; + assertDynamicFilters( + maxPositionsCount, + ImmutableList.of(REAL), + ImmutableList.of( + new Page(createSequenceBlockOfReal(0, maxPositionsCount + 1)), + new Page(createBlockOfReals(Collections.nCopies(maxPositionsCount + 1, Float.NaN)))), + ImmutableList.of(TupleDomain.all())); + } + + @Test + public void testCollectTooMuchRowsNonOrderable() + { + int maxPositionsCount = 100; + assertDynamicFilters( + maxPositionsCount, + ImmutableList.of(COLOR), + ImmutableList.of(new Page(createColorSequenceBlock(0, maxPositionsCount + 1))), + ImmutableList.of(TupleDomain.all())); + } + + @Test + public void testCollectRowsNonOrderable() + { + int maxPositionsCount = 100; + Block block = createColorSequenceBlock(0, maxPositionsCount / 2); + ImmutableList.Builder values = ImmutableList.builder(); + for (int position = 0; position < block.getPositionCount(); ++position) { + values.add(readNativeValue(COLOR, block, position)); + } + + assertDynamicFilters( + maxPositionsCount, + ImmutableList.of(COLOR), + ImmutableList.of(new Page(block)), + ImmutableList.of(TupleDomain.withColumnDomains(ImmutableMap.of( + "0", + Domain.create(ValueSet.copyOf(COLOR, values.build()), false))))); + } + @Test public void testCollectNoFilters() { OperatorFactory operatorFactory = createOperatorFactory(); - verifyPassThrough( - createOperator(operatorFactory), + verifyPassthrough(createOperator(operatorFactory), ImmutableList.of(BIGINT), new Page(createLongsBlock(1, 2, 3))); operatorFactory.noMoreOperators(); @@ -238,69 +374,167 @@ public void testCollectNoFilters() public void testCollectEmptyBuildSide() { OperatorFactory operatorFactory = createOperatorFactory(channel(0, BIGINT)); - verifyPassThrough(createOperator(operatorFactory), ImmutableList.of(BIGINT)); + verifyPassthrough(createOperator(operatorFactory), + ImmutableList.of(BIGINT)); operatorFactory.noMoreOperators(); assertEquals(partitions.build(), ImmutableList.of(TupleDomain.none())); } @Test - public void testCollectTooManyRows() + public void testSingleColumnCollectMinMaxRangeWhenTooManyPositions() { - int maxRowCount = getDynamicFilteringMaxPerDriverRowCount(pipelineContext.getSession()); - Page largePage = createSequencePage(ImmutableList.of(BIGINT), maxRowCount + 1); + int maxPositionsCount = 100; + Page largePage = new Page(createLongSequenceBlock(0, maxPositionsCount + 1)); - OperatorFactory operatorFactory = createOperatorFactory(channel(0, BIGINT)); - verifyPassThrough(createOperator(operatorFactory), ImmutableList.of(BIGINT), largePage); - operatorFactory.noMoreOperators(); - assertEquals(partitions.build(), ImmutableList.of(TupleDomain.all())); + assertDynamicFilters( + maxPositionsCount, + ImmutableList.of(BIGINT), + ImmutableList.of(largePage), + ImmutableList.of(TupleDomain.withColumnDomains(ImmutableMap.of( + "0", + Domain.create( + ValueSet.ofRanges(range(BIGINT, 0L, true, (long) maxPositionsCount, true)), + false))))); } @Test - public void testCollectTooManyBytesSingleColumn() + public void testMultipleColumnsCollectMinMaxRangeWhenTooManyPositions() { - long maxByteSize = getDynamicFilteringMaxPerDriverSize(pipelineContext.getSession()).toBytes(); - Page largePage = new Page(createStringsBlock(repeat("A", (int) maxByteSize + 1))); - - OperatorFactory operatorFactory = createOperatorFactory(channel(0, VARCHAR)); - verifyPassThrough(createOperator(operatorFactory), ImmutableList.of(VARCHAR), largePage); - operatorFactory.noMoreOperators(); - assertEquals(partitions.build(), ImmutableList.of(TupleDomain.all())); + int maxPositionsCount = 300; + Page largePage = new Page( + createLongSequenceBlock(0, 101), + createColorSequenceBlock(100, 201), + createLongSequenceBlock(200, 301)); + + List> expectedTupleDomains = ImmutableList.of( + TupleDomain.withColumnDomains(ImmutableMap.of( + "0", Domain.create(ValueSet.ofRanges( + range(BIGINT, 0L, true, 100L, true)), false), + "2", Domain.create(ValueSet.ofRanges( + range(BIGINT, 200L, true, 300L, true)), false)))); + assertDynamicFilters(maxPositionsCount, ImmutableList.of(BIGINT, COLOR, BIGINT), ImmutableList.of(largePage), expectedTupleDomains); } @Test - public void testCollectTooManyBytesMultipleColumns() + public void testMultipleColumnsCollectMinMaxWithNulls() { - long maxByteSize = getDynamicFilteringMaxPerDriverSize(pipelineContext.getSession()).toBytes(); + int maxPositionsCount = 100; Page largePage = new Page( - createStringsBlock(repeat("A", (int) (maxByteSize / 2) + 1)), - createStringsBlock(repeat("B", (int) (maxByteSize / 2) + 1))); + createLongsBlock(Collections.nCopies(100, null)), + createLongSequenceBlock(200, 301)); + + assertDynamicFilters( + maxPositionsCount, + ImmutableList.of(BIGINT, BIGINT), + ImmutableList.of(largePage), + ImmutableList.of(TupleDomain.none())); + } + + @Test + public void testSingleColumnCollectMinMaxRangeWhenTooManyBytes() + { + DataSize maxSize = new DataSize(10, KILOBYTE); + long maxByteSize = maxSize.toBytes(); + String largeText = repeat("A", (int) maxByteSize + 1); + Page largePage = new Page(createStringsBlock(largeText)); + + assertDynamicFilters( + 100, + maxSize, + 100, + ImmutableList.of(VARCHAR), + ImmutableList.of(largePage), + ImmutableList.of(TupleDomain.withColumnDomains(ImmutableMap.of( + "0", + Domain.create( + ValueSet.ofRanges(range(VARCHAR, utf8Slice(largeText), true, utf8Slice(largeText), true)), + false))))); + } - OperatorFactory operatorFactory = createOperatorFactory(channel(0, VARCHAR), channel(1, VARCHAR)); - verifyPassThrough( - createOperator(operatorFactory), + @Test + public void testMultipleColumnsCollectMinMaxRangeWhenTooManyBytes() + { + DataSize maxSize = new DataSize(10, KILOBYTE); + long maxByteSize = maxSize.toBytes(); + String largeTextA = repeat("A", (int) (maxByteSize / 2) + 1); + String largeTextB = repeat("B", (int) (maxByteSize / 2) + 1); + Page largePage = new Page(createStringsBlock(largeTextA), createStringsBlock(largeTextB)); + + List> expectedTupleDomains = ImmutableList.of( + TupleDomain.withColumnDomains(ImmutableMap.of( + "0", Domain.create(ValueSet.ofRanges( + range(VARCHAR, utf8Slice(largeTextA), true, utf8Slice(largeTextA), true)), false), + "1", Domain.create(ValueSet.ofRanges( + range(VARCHAR, utf8Slice(largeTextB), true, utf8Slice(largeTextB), true)), false)))); + assertDynamicFilters( + 100, + maxSize, + 100, ImmutableList.of(VARCHAR, VARCHAR), - largePage); - operatorFactory.noMoreOperators(); - assertEquals(partitions.build(), ImmutableList.of(TupleDomain.all())); + ImmutableList.of(largePage), + expectedTupleDomains); + } + + @Test + public void testCollectMultipleLargePages() + { + int maxPositionsCount = 100; + Page page1 = new Page(createLongSequenceBlock(50, 151)); + Page page2 = new Page(createLongSequenceBlock(0, 101)); + Page page3 = new Page(createLongSequenceBlock(100, 201)); + + assertDynamicFilters( + maxPositionsCount, + ImmutableList.of(BIGINT), + ImmutableList.of(page1, page2, page3), + ImmutableList.of(TupleDomain.withColumnDomains(ImmutableMap.of( + "0", + Domain.create( + ValueSet.ofRanges(range(BIGINT, 0L, true, 200L, true)), + false))))); } @Test public void testCollectDeduplication() { - int maxRowCount = getDynamicFilteringMaxPerDriverRowCount(pipelineContext.getSession()); - Page largePage = new Page(createLongRepeatBlock(7, maxRowCount * 10)); // lots of zeros - Page nullsPage = new Page(createLongsBlock(Arrays.asList(new Long[maxRowCount * 10]))); // lots of nulls + int maxPositionsCount = 100; + Page largePage = new Page(createLongRepeatBlock(7, maxPositionsCount * 10)); // lots of zeros + Page nullsPage = new Page(createLongsBlock(Arrays.asList(new Long[maxPositionsCount * 10]))); // lots of nulls - OperatorFactory operatorFactory = createOperatorFactory(channel(0, BIGINT)); - verifyPassThrough( - createOperator(operatorFactory), + assertDynamicFilters( + maxPositionsCount, ImmutableList.of(BIGINT), - largePage, - nullsPage); - operatorFactory.noMoreOperators(); - assertEquals( - partitions.build(), + ImmutableList.of(largePage, nullsPage), ImmutableList.of(TupleDomain.withColumnDomains(ImmutableMap.of( - "0", Domain.create(ValueSet.of(BIGINT, 7L), false))))); + "0", + Domain.create(ValueSet.of(BIGINT, 7L), false))))); + } + + @Test + public void testCollectMinMaxLimitSinglePage() + { + int maxPositionsCount = 100; + assertDynamicFilters( + maxPositionsCount, + new DataSize(10, KILOBYTE), + 2 * maxPositionsCount, + ImmutableList.of(BIGINT), + ImmutableList.of(new Page(createLongSequenceBlock(0, (2 * maxPositionsCount) + 1))), + ImmutableList.of(TupleDomain.all())); + } + + @Test + public void testCollectMinMaxLimitMultiplePages() + { + int maxPositionsCount = 100; + assertDynamicFilters( + maxPositionsCount, + new DataSize(10, KILOBYTE), + (2 * maxPositionsCount) + 1, + ImmutableList.of(BIGINT), + ImmutableList.of( + new Page(createLongSequenceBlock(0, maxPositionsCount + 1)), + new Page(createLongSequenceBlock(0, maxPositionsCount + 1))), + ImmutableList.of(TupleDomain.all())); } } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 4810f206669e..878e9e4c9809 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -111,6 +111,7 @@ public void testDefaults() .setEnableDynamicFiltering(false) .setDynamicFilteringMaxPerDriverRowCount(100) .setDynamicFilteringMaxPerDriverSize(new DataSize(10, KILOBYTE)) + .setDynamicFilteringRangeRowLimitPerDriver(0) .setFragmentResultCachingEnabled(false) .setEnableStatsCalculator(true) .setEnableStatsCollectionForTemporaryTable(false) @@ -187,6 +188,7 @@ public void testExplicitPropertyMappings() .put("experimental.enable-dynamic-filtering", "true") .put("experimental.dynamic-filtering-max-per-driver-row-count", "256") .put("experimental.dynamic-filtering-max-per-driver-size", "64kB") + .put("experimental.dynamic-filtering-range-row-limit-per-driver", "1000") .put("experimental.fragment-result-caching-enabled", "true") .put("experimental.enable-stats-calculator", "false") .put("experimental.enable-stats-collection-for-temporary-table", "true") @@ -307,6 +309,7 @@ public void testExplicitPropertyMappings() .setEnableDynamicFiltering(true) .setDynamicFilteringMaxPerDriverRowCount(256) .setDynamicFilteringMaxPerDriverSize(new DataSize(64, KILOBYTE)) + .setDynamicFilteringRangeRowLimitPerDriver(1000) .setFragmentResultCachingEnabled(true) .setEnableStatsCalculator(false) .setEnableStatsCollectionForTemporaryTable(true)