Skip to content

Commit

Permalink
Fix hash calculation for Timestamp in HiveBucketing to be Hive Compat…
Browse files Browse the repository at this point in the history
…ible
  • Loading branch information
kewang1024 committed Jun 24, 2024
1 parent 4818d3c commit cfd99ca
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,22 @@ public class BucketAdaptation
private final int tableBucketCount;
private final int partitionBucketCount;
private final int bucketToKeep;
private final boolean useLegacyTimestampBucketing;

public BucketAdaptation(int[] bucketColumnIndices, List<HiveType> bucketColumnHiveTypes, int tableBucketCount, int partitionBucketCount, int bucketToKeep)
public BucketAdaptation(
int[] bucketColumnIndices,
List<HiveType> bucketColumnHiveTypes,
int tableBucketCount,
int partitionBucketCount,
int bucketToKeep,
boolean useLegacyTimestampBucketing)
{
this.bucketColumnIndices = bucketColumnIndices;
this.bucketColumnHiveTypes = bucketColumnHiveTypes;
this.tableBucketCount = tableBucketCount;
this.partitionBucketCount = partitionBucketCount;
this.bucketToKeep = bucketToKeep;
this.useLegacyTimestampBucketing = useLegacyTimestampBucketing;
}

public int[] getBucketColumnIndices()
Expand All @@ -56,4 +64,9 @@ public int getBucketToKeep()
{
return bucketToKeep;
}

public boolean useLegacyTimestampBucketing()
{
return useLegacyTimestampBucketing;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class HiveBucketAdapterRecordCursor
private final int bucketToKeep;

private final Object[] scratch;
private final boolean useLegacyTimestampBucketing;

public HiveBucketAdapterRecordCursor(
int[] bucketColumnIndices,
Expand All @@ -49,7 +50,8 @@ public HiveBucketAdapterRecordCursor(
int partitionBucketCount,
int bucketToKeep,
TypeManager typeManager,
RecordCursor delegate)
RecordCursor delegate,
boolean useLegacyTimestampBucketing)
{
this.bucketColumnIndices = requireNonNull(bucketColumnIndices, "bucketColumnIndices is null");
this.delegate = requireNonNull(delegate, "delegate is null");
Expand All @@ -67,6 +69,7 @@ public HiveBucketAdapterRecordCursor(
this.bucketToKeep = bucketToKeep;

this.scratch = new Object[bucketColumnHiveTypes.size()];
this.useLegacyTimestampBucketing = useLegacyTimestampBucketing;
}

@Override
Expand Down Expand Up @@ -121,7 +124,7 @@ else if (javaType == Block.class) {
throw new UnsupportedOperationException("unknown java type");
}
}
int bucket = HiveBucketing.getHiveBucket(tableBucketCount, typeInfoList, scratch);
int bucket = HiveBucketing.getHiveBucket(tableBucketCount, typeInfoList, scratch, useLegacyTimestampBucketing);
if ((bucket - bucketToKeep) % partitionBucketCount != 0) {
throw new PrestoException(HIVE_INVALID_BUCKET_FILES, format(
"A row that is supposed to be in bucket %s is encountered. Only rows in bucket %s (modulo %s) are expected",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,30 @@ public class HiveBucketFunction
private final BucketFunctionType bucketFunctionType;
private final Optional<List<TypeInfo>> typeInfos;
private final Optional<List<Type>> types;
private final boolean useLegacyTimestampBucketing;

public static HiveBucketFunction createHiveCompatibleBucketFunction(
int bucketCount,
List<HiveType> hiveTypes)
List<HiveType> hiveTypes,
boolean useLegacyTimestampBucketing)
{
return new HiveBucketFunction(bucketCount, HIVE_COMPATIBLE, Optional.of(hiveTypes), Optional.empty());
return new HiveBucketFunction(bucketCount, HIVE_COMPATIBLE, Optional.of(hiveTypes), Optional.empty(), useLegacyTimestampBucketing);
}

public static HiveBucketFunction createPrestoNativeBucketFunction(
int bucketCount,
List<Type> types)
List<Type> types,
boolean useLegacyTimestampBucketing)
{
return new HiveBucketFunction(bucketCount, PRESTO_NATIVE, Optional.empty(), Optional.of(types));
return new HiveBucketFunction(bucketCount, PRESTO_NATIVE, Optional.empty(), Optional.of(types), useLegacyTimestampBucketing);
}

private HiveBucketFunction(
int bucketCount,
BucketFunctionType bucketFunctionType,
Optional<List<HiveType>> hiveTypes,
Optional<List<Type>> types)
Optional<List<Type>> types,
boolean useLegacyTimestampBucketing)
{
this.bucketCount = bucketCount;
this.bucketFunctionType = requireNonNull(bucketFunctionType, "bucketFunctionType is null");
Expand All @@ -66,14 +70,15 @@ private HiveBucketFunction(
.map(HiveType::getTypeInfo)
.collect(toImmutableList()));
this.types = requireNonNull(types, "types is null");
this.useLegacyTimestampBucketing = useLegacyTimestampBucketing;
}

@Override
public int getBucket(Page page, int position)
{
switch (bucketFunctionType) {
case HIVE_COMPATIBLE:
return getHiveBucket(bucketCount, typeInfos.get(), page, position);
return getHiveBucket(bucketCount, typeInfos.get(), page, position, useLegacyTimestampBucketing);
case PRESTO_NATIVE:
return HiveBucketing.getBucket(bucketCount, types.get(), page, position);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ public static int getBucket(int bucketCount, List<Type> types, Page page, int po
return (getHashCode(types, page, position) & Integer.MAX_VALUE) % bucketCount;
}

public static int getHiveBucket(int bucketCount, List<TypeInfo> types, Page page, int position)
public static int getHiveBucket(int bucketCount, List<TypeInfo> types, Page page, int position, boolean useLegacyTimestampBucketing)
{
return (getBucketHashCode(types, page, position) & Integer.MAX_VALUE) % bucketCount;
return (getBucketHashCode(types, page, position, useLegacyTimestampBucketing) & Integer.MAX_VALUE) % bucketCount;
}

public static int getHiveBucket(int bucketCount, List<TypeInfo> types, Object[] values)
public static int getHiveBucket(int bucketCount, List<TypeInfo> types, Object[] values, boolean useLegacyTimestampBucketing)
{
return (getBucketHashCode(types, values) & Integer.MAX_VALUE) % bucketCount;
return (getBucketHashCode(types, values, useLegacyTimestampBucketing) & Integer.MAX_VALUE) % bucketCount;
}

private static int getHashCode(List<Type> types, Page page, int position)
Expand All @@ -108,29 +108,29 @@ private static int getHashCode(List<Type> types, Page page, int position)
return result;
}

private static int getBucketHashCode(List<TypeInfo> types, Page page, int position)
private static int getBucketHashCode(List<TypeInfo> types, Page page, int position, boolean useLegacyTimestampBucketing)
{
checkArgument(types.size() == page.getChannelCount());
int result = 0;
for (int i = 0; i < page.getChannelCount(); i++) {
int fieldHash = hash(types.get(i), page.getBlock(i), position);
int fieldHash = hash(types.get(i), page.getBlock(i), position, useLegacyTimestampBucketing);
result = result * 31 + fieldHash;
}
return result;
}

private static int getBucketHashCode(List<TypeInfo> types, Object[] values)
private static int getBucketHashCode(List<TypeInfo> types, Object[] values, boolean useLegacyTimestampBucketing)
{
checkArgument(types.size() == values.length);
int result = 0;
for (int i = 0; i < values.length; i++) {
int fieldHash = hash(types.get(i), values[i]);
int fieldHash = hash(types.get(i), values[i], useLegacyTimestampBucketing);
result = result * 31 + fieldHash;
}
return result;
}

private static int hash(TypeInfo type, Block block, int position)
private static int hash(TypeInfo type, Block block, int position, boolean useLegacyTimestampBucketing)
{
// This function mirrors the behavior of function hashCode in
// HIVE-12025 ba83fd7bff serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
Expand Down Expand Up @@ -174,28 +174,26 @@ private static int hash(TypeInfo type, Block block, int position)
return toIntExact(days);
case TIMESTAMP:
long millisSinceEpoch = prestoType.getLong(block, position);
// seconds << 30 + milliseconds
long secondsAndMillis = (Math.floorDiv(millisSinceEpoch, 1000L) << 30) + Math.floorMod(millisSinceEpoch, 1000L);
return (int) ((secondsAndMillis >>> 32) ^ secondsAndMillis);
return getHashForTimestamp(millisSinceEpoch, useLegacyTimestampBucketing);
default:
throw new UnsupportedOperationException("Computation of Hive bucket hashCode is not supported for Hive primitive category: " + primitiveCategory.toString() + ".");
}
}
case LIST: {
Block elementsBlock = block.getBlock(position);
return hashOfList((ListTypeInfo) type, elementsBlock);
return hashOfList((ListTypeInfo) type, elementsBlock, useLegacyTimestampBucketing);
}
case MAP: {
Block elementsBlock = block.getBlock(position);
return hashOfMap((MapTypeInfo) type, elementsBlock);
return hashOfMap((MapTypeInfo) type, elementsBlock, useLegacyTimestampBucketing);
}
default:
// TODO: support more types, e.g. ROW
throw new UnsupportedOperationException("Computation of Hive bucket hashCode is not supported for Hive category: " + type.getCategory().toString() + ".");
}
}

private static int hash(TypeInfo type, Object value)
private static int hash(TypeInfo type, Object value, boolean useLegacyTimestampBucketing)
{
if (value == null) {
return 0;
Expand Down Expand Up @@ -233,42 +231,52 @@ private static int hash(TypeInfo type, Object value)
return toIntExact(days);
case TIMESTAMP:
long millisSinceEpoch = (long) value;
// seconds << 30 + milliseconds
long secondsAndMillis = (Math.floorDiv(millisSinceEpoch, 1000L) << 30) + Math.floorMod(millisSinceEpoch, 1000L);
return (int) ((secondsAndMillis >>> 32) ^ secondsAndMillis);
return getHashForTimestamp(millisSinceEpoch, useLegacyTimestampBucketing);
default:
throw new UnsupportedOperationException("Computation of Hive bucket hashCode is not supported for Hive primitive category: " + primitiveCategory.toString() + ".");
}
}
case LIST: {
return hashOfList((ListTypeInfo) type, (Block) value);
return hashOfList((ListTypeInfo) type, (Block) value, useLegacyTimestampBucketing);
}
case MAP: {
return hashOfMap((MapTypeInfo) type, (Block) value);
return hashOfMap((MapTypeInfo) type, (Block) value, useLegacyTimestampBucketing);
}
default:
// TODO: support more types, e.g. ROW
throw new UnsupportedOperationException("Computation of Hive bucket hashCode is not supported for Hive category: " + type.getCategory().toString() + ".");
}
}

private static int hashOfMap(MapTypeInfo type, Block singleMapBlock)
private static int getHashForTimestamp(long millisSinceEpoch, boolean useLegacyTimestampBucketing)
{
if (useLegacyTimestampBucketing) {
// seconds << 30 + milliseconds
long secondsAndMillis = (Math.floorDiv(millisSinceEpoch, 1000L) << 30) + Math.floorMod(millisSinceEpoch, 1000L);
return (int) ((secondsAndMillis >>> 32) ^ secondsAndMillis);
}
// seconds << 30 + nanoseconds
long secondsAndNanos = (Math.floorDiv(millisSinceEpoch, 1000L) << 30) + Math.floorMod(millisSinceEpoch, 1000L) * 1000L * 1000L;
return (int) ((secondsAndNanos >>> 32) ^ secondsAndNanos);
}

private static int hashOfMap(MapTypeInfo type, Block singleMapBlock, boolean useLegacyTimestampBucketing)
{
TypeInfo keyTypeInfo = type.getMapKeyTypeInfo();
TypeInfo valueTypeInfo = type.getMapValueTypeInfo();
int result = 0;
for (int i = 0; i < singleMapBlock.getPositionCount(); i += 2) {
result += hash(keyTypeInfo, singleMapBlock, i) ^ hash(valueTypeInfo, singleMapBlock, i + 1);
result += hash(keyTypeInfo, singleMapBlock, i, useLegacyTimestampBucketing) ^ hash(valueTypeInfo, singleMapBlock, i + 1, useLegacyTimestampBucketing);
}
return result;
}

private static int hashOfList(ListTypeInfo type, Block singleListBlock)
private static int hashOfList(ListTypeInfo type, Block singleListBlock, boolean useLegacyTimestampBucketing)
{
TypeInfo elementTypeInfo = type.getListElementTypeInfo();
int result = 0;
for (int i = 0; i < singleListBlock.getPositionCount(); i++) {
result = result * 31 + hash(elementTypeInfo, singleListBlock, i);
result = result * 31 + hash(elementTypeInfo, singleListBlock, i, useLegacyTimestampBucketing);
}
return result;
}
Expand Down Expand Up @@ -310,12 +318,16 @@ public static Optional<HiveBucketHandle> getHiveBucketHandle(ConnectorSession se
return Optional.of(new HiveBucketHandle(bucketColumns.build(), bucketCount, bucketCount));
}

public static Optional<HiveBucketFilter> getHiveBucketFilter(Table table, TupleDomain<ColumnHandle> effectivePredicate)
public static Optional<HiveBucketFilter> getHiveBucketFilter(Table table, TupleDomain<ColumnHandle> effectivePredicate, boolean useLegacyTimestampBucketing)
{
return getHiveBucketFilter(table.getStorage().getBucketProperty(), table.getDataColumns(), effectivePredicate);
return getHiveBucketFilter(table.getStorage().getBucketProperty(), table.getDataColumns(), effectivePredicate, useLegacyTimestampBucketing);
}

public static Optional<HiveBucketFilter> getHiveBucketFilter(Optional<HiveBucketProperty> hiveBucketProperty, List<Column> dataColumns, TupleDomain<ColumnHandle> effectivePredicate)
public static Optional<HiveBucketFilter> getHiveBucketFilter(
Optional<HiveBucketProperty> hiveBucketProperty,
List<Column> dataColumns,
TupleDomain<ColumnHandle> effectivePredicate,
boolean useLegacyTimestampBucketing)
{
if (!hiveBucketProperty.isPresent()) {
return Optional.empty();
Expand All @@ -331,7 +343,7 @@ public static Optional<HiveBucketFilter> getHiveBucketFilter(Optional<HiveBucket
return Optional.empty();
}

Optional<Set<Integer>> buckets = getHiveBuckets(hiveBucketProperty, dataColumns, bindings.get());
Optional<Set<Integer>> buckets = getHiveBuckets(hiveBucketProperty, dataColumns, bindings.get(), useLegacyTimestampBucketing);
if (buckets.isPresent()) {
return Optional.of(new HiveBucketFilter(buckets.get()));
}
Expand All @@ -357,7 +369,11 @@ public static Optional<HiveBucketFilter> getHiveBucketFilter(Optional<HiveBucket
return Optional.of(new HiveBucketFilter(builder.build()));
}

private static Optional<Set<Integer>> getHiveBuckets(Optional<HiveBucketProperty> hiveBucketPropertyOptional, List<Column> dataColumns, Map<ColumnHandle, Set<NullableValue>> bindings)
private static Optional<Set<Integer>> getHiveBuckets(
Optional<HiveBucketProperty> hiveBucketPropertyOptional,
List<Column> dataColumns,
Map<ColumnHandle, Set<NullableValue>> bindings,
boolean useLegacyTimestampBucketing)
{
if (bindings.isEmpty() || !hiveBucketPropertyOptional.isPresent()) {
return Optional.empty();
Expand Down Expand Up @@ -400,7 +416,7 @@ private static Optional<Set<Integer>> getHiveBuckets(Optional<HiveBucketProperty
.map(HiveType::getTypeInfo)
.collect(toImmutableList());
ImmutableSet.Builder<Integer> buckets = ImmutableSet.builder();
getHiveBuckets(new Object[types.size()], 0, orderedBindings, bucketCount, types, buckets);
getHiveBuckets(new Object[types.size()], 0, orderedBindings, bucketCount, types, buckets, useLegacyTimestampBucketing);
return Optional.of(buckets.build());
}

Expand All @@ -410,16 +426,17 @@ private static void getHiveBuckets(
List<Set<NullableValue>> bindings,
int bucketCount,
List<TypeInfo> typeInfos,
ImmutableSet.Builder<Integer> buckets)
ImmutableSet.Builder<Integer> buckets,
boolean useLegacyTimestampBucketing)
{
if (valuesCount == typeInfos.size()) {
buckets.add(getHiveBucket(bucketCount, typeInfos, values));
buckets.add(getHiveBucket(bucketCount, typeInfos, values, useLegacyTimestampBucketing));
return;
}

for (NullableValue value : bindings.get(valuesCount)) {
values[valuesCount] = value.getValue();
getHiveBuckets(values, valuesCount + 1, bindings, bucketCount, typeInfos, buckets);
getHiveBuckets(values, valuesCount + 1, bindings, bucketCount, typeInfos, buckets, useLegacyTimestampBucketing);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public class HiveClientConfig
private int parquetQuickStatsMaxConcurrentCalls = 500;
private int quickStatsMaxConcurrentCalls = 100;
private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE);
private boolean legacyTimestampBucketing;

@Min(0)
public int getMaxInitialSplits()
Expand Down Expand Up @@ -1832,4 +1833,17 @@ public HiveClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySc
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
return this;
}

public boolean isLegacyTimestampBucketing()
{
return legacyTimestampBucketing;
}

@Config("hive.legacy-timestamp-bucketing")
@ConfigDescription("Use legacy timestamp bucketing algorithm (which is not Hive compatible) for table bucketed by timestamp type.")
public HiveClientConfig setLegacyTimestampBucketing(boolean legacyTimestampBucketing)
{
this.legacyTimestampBucketing = legacyTimestampBucketing;
return this;
}
}
Loading

0 comments on commit cfd99ca

Please sign in to comment.