diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/BucketAdaptation.java b/presto-hive/src/main/java/com/facebook/presto/hive/BucketAdaptation.java index b36da21395fb..b87016c76037 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/BucketAdaptation.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/BucketAdaptation.java @@ -22,14 +22,22 @@ public class BucketAdaptation private final int tableBucketCount; private final int partitionBucketCount; private final int bucketToKeep; + private final boolean useLegacyTimestampBucketing; - public BucketAdaptation(int[] bucketColumnIndices, List bucketColumnHiveTypes, int tableBucketCount, int partitionBucketCount, int bucketToKeep) + public BucketAdaptation( + int[] bucketColumnIndices, + List bucketColumnHiveTypes, + int tableBucketCount, + int partitionBucketCount, + int bucketToKeep, + boolean useLegacyTimestampBucketing) { this.bucketColumnIndices = bucketColumnIndices; this.bucketColumnHiveTypes = bucketColumnHiveTypes; this.tableBucketCount = tableBucketCount; this.partitionBucketCount = partitionBucketCount; this.bucketToKeep = bucketToKeep; + this.useLegacyTimestampBucketing = useLegacyTimestampBucketing; } public int[] getBucketColumnIndices() @@ -56,4 +64,9 @@ public int getBucketToKeep() { return bucketToKeep; } + + public boolean useLegacyTimestampBucketing() + { + return useLegacyTimestampBucketing; + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketAdapterRecordCursor.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketAdapterRecordCursor.java index 0d54c7343c72..0f1c5d782ae1 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketAdapterRecordCursor.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketAdapterRecordCursor.java @@ -41,6 +41,7 @@ public class HiveBucketAdapterRecordCursor private final int bucketToKeep; private final Object[] scratch; + private final boolean useLegacyTimestampBucketing; public HiveBucketAdapterRecordCursor( int[] bucketColumnIndices, @@ -49,7 +50,8 @@ public HiveBucketAdapterRecordCursor( int partitionBucketCount, int bucketToKeep, TypeManager typeManager, - RecordCursor delegate) + RecordCursor delegate, + boolean useLegacyTimestampBucketing) { this.bucketColumnIndices = requireNonNull(bucketColumnIndices, "bucketColumnIndices is null"); this.delegate = requireNonNull(delegate, "delegate is null"); @@ -67,6 +69,7 @@ public HiveBucketAdapterRecordCursor( this.bucketToKeep = bucketToKeep; this.scratch = new Object[bucketColumnHiveTypes.size()]; + this.useLegacyTimestampBucketing = useLegacyTimestampBucketing; } @Override @@ -121,7 +124,7 @@ else if (javaType == Block.class) { throw new UnsupportedOperationException("unknown java type"); } } - int bucket = HiveBucketing.getHiveBucket(tableBucketCount, typeInfoList, scratch); + int bucket = HiveBucketing.getHiveBucket(tableBucketCount, typeInfoList, scratch, useLegacyTimestampBucketing); if ((bucket - bucketToKeep) % partitionBucketCount != 0) { throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format( "A row that is supposed to be in bucket %s is encountered. Only rows in bucket %s (modulo %s) are expected", diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketFunction.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketFunction.java index bfdd482a5c27..e3655513c246 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketFunction.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketFunction.java @@ -36,26 +36,30 @@ public class HiveBucketFunction private final BucketFunctionType bucketFunctionType; private final Optional> typeInfos; private final Optional> types; + private final boolean useLegacyTimestampBucketing; public static HiveBucketFunction createHiveCompatibleBucketFunction( int bucketCount, - List hiveTypes) + List hiveTypes, + boolean useLegacyTimestampBucketing) { - return new HiveBucketFunction(bucketCount, HIVE_COMPATIBLE, Optional.of(hiveTypes), Optional.empty()); + return new HiveBucketFunction(bucketCount, HIVE_COMPATIBLE, Optional.of(hiveTypes), Optional.empty(), useLegacyTimestampBucketing); } public static HiveBucketFunction createPrestoNativeBucketFunction( int bucketCount, - List types) + List types, + boolean useLegacyTimestampBucketing) { - return new HiveBucketFunction(bucketCount, PRESTO_NATIVE, Optional.empty(), Optional.of(types)); + return new HiveBucketFunction(bucketCount, PRESTO_NATIVE, Optional.empty(), Optional.of(types), useLegacyTimestampBucketing); } private HiveBucketFunction( int bucketCount, BucketFunctionType bucketFunctionType, Optional> hiveTypes, - Optional> types) + Optional> types, + boolean useLegacyTimestampBucketing) { this.bucketCount = bucketCount; this.bucketFunctionType = requireNonNull(bucketFunctionType, "bucketFunctionType is null"); @@ -66,6 +70,7 @@ private HiveBucketFunction( .map(HiveType::getTypeInfo) .collect(toImmutableList())); this.types = requireNonNull(types, "types is null"); + this.useLegacyTimestampBucketing = useLegacyTimestampBucketing; } @Override @@ -73,7 +78,7 @@ public int getBucket(Page page, int position) { switch (bucketFunctionType) { case HIVE_COMPATIBLE: - return getHiveBucket(bucketCount, typeInfos.get(), page, position); + return getHiveBucket(bucketCount, typeInfos.get(), page, position, useLegacyTimestampBucketing); case PRESTO_NATIVE: return HiveBucketing.getBucket(bucketCount, types.get(), page, position); default: diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketing.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketing.java index b07a73c23859..b4ec1d8dcb57 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketing.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveBucketing.java @@ -87,14 +87,14 @@ public static int getBucket(int bucketCount, List types, Page page, int po return (getHashCode(types, page, position) & Integer.MAX_VALUE) % bucketCount; } - public static int getHiveBucket(int bucketCount, List types, Page page, int position) + public static int getHiveBucket(int bucketCount, List types, Page page, int position, boolean useLegacyTimestampBucketing) { - return (getBucketHashCode(types, page, position) & Integer.MAX_VALUE) % bucketCount; + return (getBucketHashCode(types, page, position, useLegacyTimestampBucketing) & Integer.MAX_VALUE) % bucketCount; } - public static int getHiveBucket(int bucketCount, List types, Object[] values) + public static int getHiveBucket(int bucketCount, List types, Object[] values, boolean useLegacyTimestampBucketing) { - return (getBucketHashCode(types, values) & Integer.MAX_VALUE) % bucketCount; + return (getBucketHashCode(types, values, useLegacyTimestampBucketing) & Integer.MAX_VALUE) % bucketCount; } private static int getHashCode(List types, Page page, int position) @@ -108,29 +108,29 @@ private static int getHashCode(List types, Page page, int position) return result; } - private static int getBucketHashCode(List types, Page page, int position) + private static int getBucketHashCode(List types, Page page, int position, boolean useLegacyTimestampBucketing) { checkArgument(types.size() == page.getChannelCount()); int result = 0; for (int i = 0; i < page.getChannelCount(); i++) { - int fieldHash = hash(types.get(i), page.getBlock(i), position); + int fieldHash = hash(types.get(i), page.getBlock(i), position, useLegacyTimestampBucketing); result = result * 31 + fieldHash; } return result; } - private static int getBucketHashCode(List types, Object[] values) + private static int getBucketHashCode(List types, Object[] values, boolean useLegacyTimestampBucketing) { checkArgument(types.size() == values.length); int result = 0; for (int i = 0; i < values.length; i++) { - int fieldHash = hash(types.get(i), values[i]); + int fieldHash = hash(types.get(i), values[i], useLegacyTimestampBucketing); result = result * 31 + fieldHash; } return result; } - private static int hash(TypeInfo type, Block block, int position) + private static int hash(TypeInfo type, Block block, int position, boolean useLegacyTimestampBucketing) { // This function mirrors the behavior of function hashCode in // HIVE-12025 ba83fd7bff serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java @@ -174,20 +174,18 @@ private static int hash(TypeInfo type, Block block, int position) return toIntExact(days); case TIMESTAMP: long millisSinceEpoch = prestoType.getLong(block, position); - // seconds << 30 + milliseconds - long secondsAndMillis = (Math.floorDiv(millisSinceEpoch, 1000L) << 30) + Math.floorMod(millisSinceEpoch, 1000L); - return (int) ((secondsAndMillis >>> 32) ^ secondsAndMillis); + return getHashForTimestamp(millisSinceEpoch, useLegacyTimestampBucketing); default: throw new UnsupportedOperationException("Computation of Hive bucket hashCode is not supported for Hive primitive category: " + primitiveCategory.toString() + "."); } } case LIST: { Block elementsBlock = block.getBlock(position); - return hashOfList((ListTypeInfo) type, elementsBlock); + return hashOfList((ListTypeInfo) type, elementsBlock, useLegacyTimestampBucketing); } case MAP: { Block elementsBlock = block.getBlock(position); - return hashOfMap((MapTypeInfo) type, elementsBlock); + return hashOfMap((MapTypeInfo) type, elementsBlock, useLegacyTimestampBucketing); } default: // TODO: support more types, e.g. ROW @@ -195,7 +193,7 @@ private static int hash(TypeInfo type, Block block, int position) } } - private static int hash(TypeInfo type, Object value) + private static int hash(TypeInfo type, Object value, boolean useLegacyTimestampBucketing) { if (value == null) { return 0; @@ -233,18 +231,16 @@ private static int hash(TypeInfo type, Object value) return toIntExact(days); case TIMESTAMP: long millisSinceEpoch = (long) value; - // seconds << 30 + milliseconds - long secondsAndMillis = (Math.floorDiv(millisSinceEpoch, 1000L) << 30) + Math.floorMod(millisSinceEpoch, 1000L); - return (int) ((secondsAndMillis >>> 32) ^ secondsAndMillis); + return getHashForTimestamp(millisSinceEpoch, useLegacyTimestampBucketing); default: throw new UnsupportedOperationException("Computation of Hive bucket hashCode is not supported for Hive primitive category: " + primitiveCategory.toString() + "."); } } case LIST: { - return hashOfList((ListTypeInfo) type, (Block) value); + return hashOfList((ListTypeInfo) type, (Block) value, useLegacyTimestampBucketing); } case MAP: { - return hashOfMap((MapTypeInfo) type, (Block) value); + return hashOfMap((MapTypeInfo) type, (Block) value, useLegacyTimestampBucketing); } default: // TODO: support more types, e.g. ROW @@ -252,23 +248,35 @@ private static int hash(TypeInfo type, Object value) } } - private static int hashOfMap(MapTypeInfo type, Block singleMapBlock) + private static int getHashForTimestamp(long millisSinceEpoch, boolean useLegacyTimestampBucketing) + { + if (useLegacyTimestampBucketing) { + // seconds << 30 + milliseconds + long secondsAndMillis = (Math.floorDiv(millisSinceEpoch, 1000L) << 30) + Math.floorMod(millisSinceEpoch, 1000L); + return (int) ((secondsAndMillis >>> 32) ^ secondsAndMillis); + } + // seconds << 30 + nanoseconds + long secondsAndNanos = (Math.floorDiv(millisSinceEpoch, 1000L) << 30) + Math.floorMod(millisSinceEpoch, 1000L) * 1000L * 1000L; + return (int) ((secondsAndNanos >>> 32) ^ secondsAndNanos); + } + + private static int hashOfMap(MapTypeInfo type, Block singleMapBlock, boolean useLegacyTimestampBucketing) { TypeInfo keyTypeInfo = type.getMapKeyTypeInfo(); TypeInfo valueTypeInfo = type.getMapValueTypeInfo(); int result = 0; for (int i = 0; i < singleMapBlock.getPositionCount(); i += 2) { - result += hash(keyTypeInfo, singleMapBlock, i) ^ hash(valueTypeInfo, singleMapBlock, i + 1); + result += hash(keyTypeInfo, singleMapBlock, i, useLegacyTimestampBucketing) ^ hash(valueTypeInfo, singleMapBlock, i + 1, useLegacyTimestampBucketing); } return result; } - private static int hashOfList(ListTypeInfo type, Block singleListBlock) + private static int hashOfList(ListTypeInfo type, Block singleListBlock, boolean useLegacyTimestampBucketing) { TypeInfo elementTypeInfo = type.getListElementTypeInfo(); int result = 0; for (int i = 0; i < singleListBlock.getPositionCount(); i++) { - result = result * 31 + hash(elementTypeInfo, singleListBlock, i); + result = result * 31 + hash(elementTypeInfo, singleListBlock, i, useLegacyTimestampBucketing); } return result; } @@ -310,12 +318,16 @@ public static Optional getHiveBucketHandle(ConnectorSession se return Optional.of(new HiveBucketHandle(bucketColumns.build(), bucketCount, bucketCount)); } - public static Optional getHiveBucketFilter(Table table, TupleDomain effectivePredicate) + public static Optional getHiveBucketFilter(Table table, TupleDomain effectivePredicate, boolean useLegacyTimestampBucketing) { - return getHiveBucketFilter(table.getStorage().getBucketProperty(), table.getDataColumns(), effectivePredicate); + return getHiveBucketFilter(table.getStorage().getBucketProperty(), table.getDataColumns(), effectivePredicate, useLegacyTimestampBucketing); } - public static Optional getHiveBucketFilter(Optional hiveBucketProperty, List dataColumns, TupleDomain effectivePredicate) + public static Optional getHiveBucketFilter( + Optional hiveBucketProperty, + List dataColumns, + TupleDomain effectivePredicate, + boolean useLegacyTimestampBucketing) { if (!hiveBucketProperty.isPresent()) { return Optional.empty(); @@ -331,7 +343,7 @@ public static Optional getHiveBucketFilter(Optional> buckets = getHiveBuckets(hiveBucketProperty, dataColumns, bindings.get()); + Optional> buckets = getHiveBuckets(hiveBucketProperty, dataColumns, bindings.get(), useLegacyTimestampBucketing); if (buckets.isPresent()) { return Optional.of(new HiveBucketFilter(buckets.get())); } @@ -357,7 +369,11 @@ public static Optional getHiveBucketFilter(Optional> getHiveBuckets(Optional hiveBucketPropertyOptional, List dataColumns, Map> bindings) + private static Optional> getHiveBuckets( + Optional hiveBucketPropertyOptional, + List dataColumns, + Map> bindings, + boolean useLegacyTimestampBucketing) { if (bindings.isEmpty() || !hiveBucketPropertyOptional.isPresent()) { return Optional.empty(); @@ -400,7 +416,7 @@ private static Optional> getHiveBuckets(Optional buckets = ImmutableSet.builder(); - getHiveBuckets(new Object[types.size()], 0, orderedBindings, bucketCount, types, buckets); + getHiveBuckets(new Object[types.size()], 0, orderedBindings, bucketCount, types, buckets, useLegacyTimestampBucketing); return Optional.of(buckets.build()); } @@ -410,16 +426,17 @@ private static void getHiveBuckets( List> bindings, int bucketCount, List typeInfos, - ImmutableSet.Builder buckets) + ImmutableSet.Builder buckets, + boolean useLegacyTimestampBucketing) { if (valuesCount == typeInfos.size()) { - buckets.add(getHiveBucket(bucketCount, typeInfos, values)); + buckets.add(getHiveBucket(bucketCount, typeInfos, values, useLegacyTimestampBucketing)); return; } for (NullableValue value : bindings.get(valuesCount)) { values[valuesCount] = value.getValue(); - getHiveBuckets(values, valuesCount + 1, bindings, bucketCount, typeInfos, buckets); + getHiveBuckets(values, valuesCount + 1, bindings, bucketCount, typeInfos, buckets, useLegacyTimestampBucketing); } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index f6f840c1f8df..b47c6eb4ac8d 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -219,6 +219,7 @@ public class HiveClientConfig private int parquetQuickStatsMaxConcurrentCalls = 500; private int quickStatsMaxConcurrentCalls = 100; private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE); + private boolean legacyTimestampBucketing; @Min(0) public int getMaxInitialSplits() @@ -1832,4 +1833,17 @@ public HiveClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySc this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize; return this; } + + public boolean isLegacyTimestampBucketing() + { + return legacyTimestampBucketing; + } + + @Config("hive.legacy-timestamp-bucketing") + @ConfigDescription("Use legacy timestamp bucketing algorithm (which is not Hive compatible) for table bucketed by timestamp type.") + public HiveClientConfig setLegacyTimestampBucketing(boolean legacyTimestampBucketing) + { + this.legacyTimestampBucketing = legacyTimestampBucketing; + return this; + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveNodePartitioningProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveNodePartitioningProvider.java index 33511711dbcc..d98ca4ac3732 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveNodePartitioningProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveNodePartitioningProvider.java @@ -34,6 +34,7 @@ import static com.facebook.presto.hive.HiveBucketFunction.createHiveCompatibleBucketFunction; import static com.facebook.presto.hive.HiveBucketFunction.createPrestoNativeBucketFunction; import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy; +import static com.facebook.presto.hive.HiveSessionProperties.isLegacyTimestampBucketing; import static com.facebook.presto.spi.StandardErrorCode.NODE_SELECTION_NOT_SUPPORTED; import static com.facebook.presto.spi.connector.ConnectorBucketNodeMap.createBucketNodeMap; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -54,9 +55,9 @@ public BucketFunction getBucketFunction( BucketFunctionType bucketFunctionType = handle.getBucketFunctionType(); switch (bucketFunctionType) { case HIVE_COMPATIBLE: - return createHiveCompatibleBucketFunction(bucketCount, handle.getHiveTypes().get()); + return createHiveCompatibleBucketFunction(bucketCount, handle.getHiveTypes().get(), isLegacyTimestampBucketing(session)); case PRESTO_NATIVE: - return createPrestoNativeBucketFunction(bucketCount, handle.getTypes().get()); + return createPrestoNativeBucketFunction(bucketCount, handle.getTypes().get(), isLegacyTimestampBucketing(session)); default: throw new IllegalArgumentException("Unsupported bucket function type " + bucketFunctionType); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSink.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSink.java index b7d33b777cec..77e8a163f51e 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSink.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSink.java @@ -61,6 +61,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_TOO_MANY_OPEN_PARTITIONS; import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; import static com.facebook.presto.hive.HiveSessionProperties.isFileRenamingEnabled; +import static com.facebook.presto.hive.HiveSessionProperties.isLegacyTimestampBucketing; import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedPartitionUpdateSerializationEnabled; import static com.facebook.presto.hive.HiveUtil.serializeZstdCompressed; import static com.facebook.presto.hive.PartitionUpdate.FileWriteInfo; @@ -183,10 +184,10 @@ public HivePageSink( List bucketColumnHiveTypes = bucketProperty.get().getBucketedBy().stream() .map(dataColumnNameToHiveTypeMap::get) .collect(toImmutableList()); - bucketFunction = createHiveCompatibleBucketFunction(bucketCount, bucketColumnHiveTypes); + bucketFunction = createHiveCompatibleBucketFunction(bucketCount, bucketColumnHiveTypes, isLegacyTimestampBucketing(session)); break; case PRESTO_NATIVE: - bucketFunction = createPrestoNativeBucketFunction(bucketCount, bucketProperty.get().getTypes().get()); + bucketFunction = createPrestoNativeBucketFunction(bucketCount, bucketProperty.get().getTypes().get(), isLegacyTimestampBucketing(session)); break; default: throw new IllegalArgumentException("Unsupported bucket function type " + bucketFunctionType); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSource.java index a031e9903cdb..4a1a178b2608 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSource.java @@ -299,6 +299,7 @@ private static class BucketAdapter public final int tableBucketCount; public final int partitionBucketCount; // for sanity check only private final List typeInfoList; + private final boolean useLegacyTimestampBucketing; public BucketAdapter(BucketAdaptation bucketAdaptation) { @@ -309,6 +310,7 @@ public BucketAdapter(BucketAdaptation bucketAdaptation) .collect(toImmutableList()); this.tableBucketCount = bucketAdaptation.getTableBucketCount(); this.partitionBucketCount = bucketAdaptation.getPartitionBucketCount(); + this.useLegacyTimestampBucketing = bucketAdaptation.useLegacyTimestampBucketing(); } @Nullable @@ -317,7 +319,7 @@ public Page filterPageToEligibleRowsOrDiscard(Page page) IntArrayList ids = new IntArrayList(page.getPositionCount()); Page bucketColumnsPage = page.extractChannels(bucketColumns); for (int position = 0; position < page.getPositionCount(); position++) { - int bucket = getHiveBucket(tableBucketCount, typeInfoList, bucketColumnsPage, position); + int bucket = getHiveBucket(tableBucketCount, typeInfoList, bucketColumnsPage, position, useLegacyTimestampBucketing); if ((bucket - bucketToKeep) % partitionBucketCount != 0) { throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format( "A row that is supposed to be in bucket %s is encountered. Only rows in bucket %s (modulo %s) are expected", diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java index 9be6f7109b94..7065dc672081 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java @@ -73,6 +73,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR; import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static com.facebook.presto.hive.HivePageSourceProvider.ColumnMapping.toColumnHandles; +import static com.facebook.presto.hive.HiveSessionProperties.isLegacyTimestampBucketing; import static com.facebook.presto.hive.HiveSessionProperties.isUseRecordPageSourceForCustomSplit; import static com.facebook.presto.hive.HiveUtil.getPrefilledColumnValue; import static com.facebook.presto.hive.HiveUtil.parsePartitionValue; @@ -208,7 +209,7 @@ public ConnectorPageSource createPageSource( .transform(Subfield::getRootName) .transform(hiveLayout.getPredicateColumns()::get); - if (shouldSkipBucket(hiveLayout, hiveSplit, splitContext)) { + if (shouldSkipBucket(hiveLayout, hiveSplit, splitContext, isLegacyTimestampBucketing(session))) { return new HiveEmptySplitPageSource(); } @@ -343,7 +344,7 @@ private static Optional createSelectivePageSource( split.getFileSplit(), split.getTableBucketNumber()); - Optional bucketAdaptation = split.getBucketConversion().map(conversion -> toBucketAdaptation(conversion, columnMappings, split.getTableBucketNumber(), mapping -> mapping.getHiveColumnHandle().getHiveColumnIndex())); + Optional bucketAdaptation = split.getBucketConversion().map(conversion -> toBucketAdaptation(conversion, columnMappings, split.getTableBucketNumber(), mapping -> mapping.getHiveColumnHandle().getHiveColumnIndex(), isLegacyTimestampBucketing(session))); Map prefilledValues = columnMappings.stream() .filter(mapping -> mapping.getKind() == ColumnMappingKind.PREFILLED) @@ -361,7 +362,7 @@ private static Optional createSelectivePageSource( RowExpression optimizedRemainingPredicate = rowExpressionCache.getUnchecked(new RowExpressionCacheKey(layout.getRemainingPredicate(), session)); - if (shouldSkipBucket(layout, split, splitContext)) { + if (shouldSkipBucket(layout, split, splitContext, isLegacyTimestampBucketing(session))) { return Optional.of(new HiveEmptySplitPageSource()); } @@ -462,7 +463,12 @@ public static Optional createHivePageSource( // Finds the non-synthetic columns. List regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings); - Optional bucketAdaptation = bucketConversion.map(conversion -> toBucketAdaptation(conversion, regularAndInterimColumnMappings, tableBucketNumber, ColumnMapping::getIndex)); + Optional bucketAdaptation = bucketConversion.map(conversion -> toBucketAdaptation( + conversion, + regularAndInterimColumnMappings, + tableBucketNumber, + ColumnMapping::getIndex, + isLegacyTimestampBucketing(session))); if (isUseRecordPageSourceForCustomSplit(session) && shouldUseRecordReaderFromInputFormat(configuration, storage, fileSplit.getCustomSplitInfo())) { return getPageSourceFromCursorProvider( @@ -623,7 +629,8 @@ private static Optional getPageSourceFromCursorProvider( bucketAdaptation.get().getPartitionBucketCount(), bucketAdaptation.get().getBucketToKeep(), typeManager, - delegate); + delegate, + isLegacyTimestampBucketing(session)); } // Need to wrap RcText and RcBinary into a wrapper, which will do the coercion for mismatch columns @@ -659,7 +666,7 @@ private static Optional getPageSourceFromCursorProvider( return Optional.empty(); } - private static boolean shouldSkipBucket(HiveTableLayoutHandle hiveLayout, HiveSplit hiveSplit, SplitContext splitContext) + private static boolean shouldSkipBucket(HiveTableLayoutHandle hiveLayout, HiveSplit hiveSplit, SplitContext splitContext, boolean useLegacyTimestampBucketing) { if (!splitContext.getDynamicFilterPredicate().isPresent() || !hiveSplit.getReadBucketNumber().isPresent() @@ -668,7 +675,7 @@ private static boolean shouldSkipBucket(HiveTableLayoutHandle hiveLayout, HiveSp } TupleDomain dynamicFilter = splitContext.getDynamicFilterPredicate().get(); - Optional hiveBucketFilter = getHiveBucketFilter(hiveSplit.getStorage().getBucketProperty(), hiveLayout.getDataColumns(), dynamicFilter); + Optional hiveBucketFilter = getHiveBucketFilter(hiveSplit.getStorage().getBucketProperty(), hiveLayout.getDataColumns(), dynamicFilter, useLegacyTimestampBucketing); return hiveBucketFilter.map(filter -> !filter.getBucketsToKeep().contains(hiveSplit.getReadBucketNumber().getAsInt())).orElse(false); } @@ -705,7 +712,12 @@ private static boolean shouldSkipPartition(TypeManager typeManager, HiveTableLay return false; } - private static BucketAdaptation toBucketAdaptation(BucketConversion conversion, List columnMappings, OptionalInt tableBucketNumber, Function bucketColumnIndexProducer) + private static BucketAdaptation toBucketAdaptation( + BucketConversion conversion, + List columnMappings, + OptionalInt tableBucketNumber, + Function bucketColumnIndexProducer, + boolean useLegacyTimestamp) { Map hiveIndexToBlockIndex = uniqueIndex(columnMappings, columnMapping -> columnMapping.getHiveColumnHandle().getHiveColumnIndex()); int[] bucketColumnIndices = conversion.getBucketColumnHandles().stream() @@ -720,7 +732,13 @@ private static BucketAdaptation toBucketAdaptation(BucketConversion conversion, .map(ColumnMapping::getHiveColumnHandle) .map(HiveColumnHandle::getHiveType) .collect(toImmutableList()); - return new BucketAdaptation(bucketColumnIndices, bucketColumnHiveTypes, conversion.getTableBucketCount(), conversion.getPartitionBucketCount(), tableBucketNumber.getAsInt()); + return new BucketAdaptation( + bucketColumnIndices, + bucketColumnHiveTypes, + conversion.getTableBucketCount(), + conversion.getPartitionBucketCount(), + tableBucketNumber.getAsInt(), + useLegacyTimestamp); } public static class ColumnMapping diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionManager.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionManager.java index cf185488a288..f6170becee95 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionManager.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePartitionManager.java @@ -69,6 +69,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT; import static com.facebook.presto.hive.HiveSessionProperties.getMaxBucketsForGroupedExecution; import static com.facebook.presto.hive.HiveSessionProperties.getMinBucketCountToNotIgnoreTableBucketing; +import static com.facebook.presto.hive.HiveSessionProperties.isLegacyTimestampBucketing; import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isParallelParsingOfPartitionValuesEnabled; import static com.facebook.presto.hive.HiveSessionProperties.shouldIgnoreTableBucketing; @@ -270,7 +271,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor } Optional hiveBucketHandle = getBucketHandle(table, session, effectivePredicate); - Optional bucketFilter = hiveBucketHandle.flatMap(value -> getHiveBucketFilter(table, effectivePredicate)); + Optional bucketFilter = hiveBucketHandle.flatMap(value -> getHiveBucketFilter(table, effectivePredicate, isLegacyTimestampBucketing(session))); if (!queryUsesHiveBucketColumn(effectivePredicate) && hiveBucketHandle.isPresent() diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index 7bdefeef3b3a..816e4d04e21b 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -132,6 +132,7 @@ public final class HiveSessionProperties public static final String QUICK_STATS_BACKGROUND_BUILD_TIMEOUT = "quick_stats_background_build_timeout"; public static final String DYNAMIC_SPLIT_SIZES_ENABLED = "dynamic_split_sizes_enabled"; public static final String AFFINITY_SCHEDULING_FILE_SECTION_SIZE = "affinity_scheduling_file_section_size"; + public static final String LEGACY_TIMESTAMP_BUCKETING = "legacy_timestamp_bucketing"; private final List> sessionProperties; @@ -641,6 +642,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon AFFINITY_SCHEDULING_FILE_SECTION_SIZE, "Size of file section for affinity scheduling", hiveClientConfig.getAffinitySchedulingFileSectionSize(), + false), + booleanProperty( + LEGACY_TIMESTAMP_BUCKETING, + "Use legacy timestamp bucketing algorithm (which is not Hive compatible) for table bucketed by timestamp type.", + hiveClientConfig.isLegacyTimestampBucketing(), false)); } @@ -1118,4 +1124,9 @@ public static DataSize getAffinitySchedulingFileSectionSize(ConnectorSession ses { return session.getProperty(AFFINITY_SCHEDULING_FILE_SECTION_SIZE, DataSize.class); } + + public static boolean isLegacyTimestampBucketing(ConnectorSession session) + { + return session.getProperty(LEGACY_TIMESTAMP_BUCKETING, Boolean.class); + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java index 2d3c6e89dc76..9b2392a36efe 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java @@ -107,6 +107,7 @@ import static com.facebook.presto.hive.HiveCommonSessionProperties.isOrcZstdJniDecompressionEnabled; import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES; import static com.facebook.presto.hive.HiveSessionProperties.isAdaptiveFilterReorderingEnabled; +import static com.facebook.presto.hive.HiveSessionProperties.isLegacyTimestampBucketing; import static com.facebook.presto.hive.HiveUtil.getPhysicalHiveColumnHandles; import static com.facebook.presto.hive.HiveUtil.typedPartitionKey; import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.getOrcDataSource; @@ -359,7 +360,8 @@ public static ConnectorPageSource createOrcPageSource( adaptation.getBucketColumnHiveTypes(), adaptation.getTableBucketCount(), adaptation.getPartitionBucketCount(), - adaptation.getBucketToKeep())); + adaptation.getBucketToKeep(), + isLegacyTimestampBucketing(session))); List filterFunctions = toFilterFunctions(replaceExpression(remainingPredicate, variableToInput), bucketAdapter, session, rowExpressionService.getDeterminismEvaluator(), rowExpressionService.getPredicateCompiler()); @@ -654,8 +656,9 @@ private static class BucketAdapter public final int tableBucketCount; public final int partitionBucketCount; // for sanity check only private final List typeInfoList; + private final boolean useLegacyTimestampBucketing; - public BucketAdapter(int[] bucketColumnIndices, List bucketColumnHiveTypes, int tableBucketCount, int partitionBucketCount, int bucketToKeep) + public BucketAdapter(int[] bucketColumnIndices, List bucketColumnHiveTypes, int tableBucketCount, int partitionBucketCount, int bucketToKeep, boolean useLegacyTimestampBucketing) { this.bucketColumns = requireNonNull(bucketColumnIndices, "bucketColumnIndices is null"); this.bucketToKeep = bucketToKeep; @@ -664,6 +667,7 @@ public BucketAdapter(int[] bucketColumnIndices, List bucketColumnHiveT .collect(toImmutableList()); this.tableBucketCount = tableBucketCount; this.partitionBucketCount = partitionBucketCount; + this.useLegacyTimestampBucketing = useLegacyTimestampBucketing; } @Override @@ -675,7 +679,7 @@ public int[] getInputChannels() @Override public boolean evaluate(SqlFunctionProperties properties, Page page, int position) { - int bucket = getHiveBucket(tableBucketCount, typeInfoList, page, position); + int bucket = getHiveBucket(tableBucketCount, typeInfoList, page, position, useLegacyTimestampBucketing); if ((bucket - bucketToKeep) % partitionBucketCount != 0) { throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format( "A row that is supposed to be in bucket %s is encountered. Only rows in bucket %s (modulo %s) are expected", diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveBucketing.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveBucketing.java index 44a4b726cdfa..5e4dcb8c119b 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveBucketing.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveBucketing.java @@ -104,11 +104,6 @@ public void testHashingCompare() assertBucketEquals("date", new DateWritable(toIntExact(LocalDate.of(1970, 1, 1).toEpochDay())).get()); assertBucketEquals("date", new DateWritable(toIntExact(LocalDate.of(2015, 11, 19).toEpochDay())).get()); assertBucketEquals("date", new DateWritable(toIntExact(LocalDate.of(1950, 11, 19).toEpochDay())).get()); - assertBucketEquals("timestamp", null); - assertBucketEquals("timestamp", new Timestamp(1000 * LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0).toEpochSecond(ZoneOffset.UTC))); - assertBucketEquals("timestamp", new Timestamp(1000 * LocalDateTime.of(1969, 12, 31, 23, 59, 59, 999_000_000).toEpochSecond(ZoneOffset.UTC))); - assertBucketEquals("timestamp", new Timestamp(1000 * LocalDateTime.of(1950, 11, 19, 12, 34, 56, 789_000_000).toEpochSecond(ZoneOffset.UTC))); - assertBucketEquals("timestamp", new Timestamp(1000 * LocalDateTime.of(2015, 11, 19, 7, 6, 5, 432_000_000).toEpochSecond(ZoneOffset.UTC))); assertBucketEquals("array", null); assertBucketEquals("array", ImmutableList.of()); assertBucketEquals("array", ImmutableList.of((short) 5, (short) 8, (short) 13)); @@ -128,6 +123,23 @@ public void testHashingCompare() asList(null, ImmutableList.of((short) 5, (short) 8, (short) 13), null, ImmutableMap.of("key", 123L), null)); } + @Test + public void testTimestamp() + throws Exception + { + // Test seconds + assertBucketEquals("timestamp", null); + assertBucketEquals("timestamp", new Timestamp(1000 * LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0).toEpochSecond(ZoneOffset.UTC))); + assertBucketEquals("timestamp", new Timestamp(1000 * LocalDateTime.of(1969, 12, 31, 23, 59, 59, 999_000_000).toEpochSecond(ZoneOffset.UTC))); + assertBucketEquals("timestamp", new Timestamp(1000 * LocalDateTime.of(1950, 11, 19, 12, 34, 56, 789_000_000).toEpochSecond(ZoneOffset.UTC))); + assertBucketEquals("timestamp", new Timestamp(1000 * LocalDateTime.of(2015, 11, 19, 7, 6, 5, 432_000_000).toEpochSecond(ZoneOffset.UTC))); + // Test milliseconds + assertBucketEquals("timestamp", new Timestamp(10 + 1000 * LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0).toEpochSecond(ZoneOffset.UTC))); + assertBucketEquals("timestamp", new Timestamp(22 + 1000 * LocalDateTime.of(1969, 12, 31, 23, 59, 59, 999_000_000).toEpochSecond(ZoneOffset.UTC))); + assertBucketEquals("timestamp", new Timestamp(100 + 1000 * LocalDateTime.of(1950, 11, 19, 12, 34, 56, 789_000_000).toEpochSecond(ZoneOffset.UTC))); + assertBucketEquals("timestamp", new Timestamp(250 + 1000 * LocalDateTime.of(2015, 11, 19, 7, 6, 5, 432_000_000).toEpochSecond(ZoneOffset.UTC))); + } + private static void assertBucketEquals(String hiveTypeStrings, Object hiveValues) throws HiveException { @@ -185,8 +197,8 @@ private static int computeActual(List hiveTypeStrings, List hive nativeContainerValues[i] = toNativeContainerValue(type, hiveValue); } ImmutableList blockList = blockListBuilder.build(); - int result1 = HiveBucketing.getHiveBucket(bucketCount, hiveTypeInfos, new Page(blockList.toArray(new Block[blockList.size()])), 2); - int result2 = HiveBucketing.getHiveBucket(bucketCount, hiveTypeInfos, nativeContainerValues); + int result1 = HiveBucketing.getHiveBucket(bucketCount, hiveTypeInfos, new Page(blockList.toArray(new Block[blockList.size()])), 2, false); + int result2 = HiveBucketing.getHiveBucket(bucketCount, hiveTypeInfos, nativeContainerValues, false); assertEquals(result1, result2, "Overloads of getHiveBucket produced different result"); return result1; } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index e6ea3d0d9308..ef206efe0be0 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -165,7 +165,8 @@ public void testDefaults() .setMaxConcurrentQuickStatsCalls(100) .setMaxConcurrentParquetQuickStatsCalls(500) .setCteVirtualBucketCount(128) - .setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE))); + .setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE)) + .setLegacyTimestampBucketing(false)); } @Test @@ -292,6 +293,7 @@ public void testExplicitPropertyMappings() .put("hive.quick-stats.max-concurrent-calls", "101") .put("hive.cte-virtual-bucket-count", "256") .put("hive.affinity-scheduling-file-section-size", "512MB") + .put("hive.legacy-timestamp-bucketing", "true") .build(); HiveClientConfig expected = new HiveClientConfig() @@ -413,7 +415,8 @@ public void testExplicitPropertyMappings() .setMaxConcurrentParquetQuickStatsCalls(399) .setMaxConcurrentQuickStatsCalls(101) .setCteVirtualBucketCount(256) - .setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE)); + .setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE)) + .setLegacyTimestampBucketing(true); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/exchange/LocalExchange.java b/presto-main/src/main/java/com/facebook/presto/operator/exchange/LocalExchange.java index 74cdcad53b1d..a0e7c9d30acd 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/exchange/LocalExchange.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/exchange/LocalExchange.java @@ -180,7 +180,7 @@ static PartitionFunction createPartitionFunction( BucketFunction bucketFunction = partitioningProvider.getBucketFunction( partitioning.getTransactionHandle().orElse(null), - session.toConnectorSession(), + session.toConnectorSession(partitioning.getConnectorId().get()), partitioning.getConnectorHandle(), partitioningChannelTypes, bucketCount); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java index dcf29c73c0a3..665fc5cf67fc 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java @@ -93,7 +93,7 @@ public PartitionFunction getPartitionFunction( bucketFunction = partitioningProvider.getBucketFunction( partitioningHandle.getTransactionHandle().orElse(null), - session.toConnectorSession(), + session.toConnectorSession(partitioningHandle.getConnectorId().get()), partitioningHandle.getConnectorHandle(), partitionChannelTypes, bucketToPartition.get().length);