From d43b6f81151849fac4f9d65e420accba945cd171 Mon Sep 17 00:00:00 2001 From: wangd Date: Sat, 9 Mar 2024 01:48:50 +0800 Subject: [PATCH] Fix TimestampType/TimeType values when get partitions for Iceberg table --- .../iceberg/IcebergUpdateablePageSource.java | 14 +---------- .../facebook/presto/iceberg/IcebergUtil.java | 24 ++++++++++++------- .../iceberg/IcebergDistributedTestBase.java | 15 ++++++++++-- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java index cda88ee8ad02..98b7a4690145 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java @@ -14,11 +14,8 @@ package com.facebook.presto.iceberg; import com.facebook.presto.common.Page; -import com.facebook.presto.common.Utils; import com.facebook.presto.common.block.Block; import com.facebook.presto.common.block.RunLengthEncodedBlock; -import com.facebook.presto.common.type.TimeType; -import com.facebook.presto.common.type.TimestampType; import com.facebook.presto.common.type.Type; import com.facebook.presto.hive.HivePartitionKey; import com.facebook.presto.iceberg.delete.IcebergDeletePageSink; @@ -38,13 +35,12 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import static com.facebook.presto.common.Utils.nativeValueToBlock; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static com.facebook.presto.iceberg.IcebergUtil.deserializePartitionValue; import static com.google.common.base.Throwables.throwIfInstanceOf; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.MICROSECONDS; -import static java.util.concurrent.TimeUnit.MILLISECONDS; public class IcebergUpdateablePageSource implements UpdatablePageSource @@ -226,12 +222,4 @@ protected void closeWithSuppression(Throwable throwable) } } } - - private Block nativeValueToBlock(Type type, Object prefilledValue) - { - if (prefilledValue != null && (type instanceof TimestampType && ((TimestampType) type).getPrecision() == MILLISECONDS || type instanceof TimeType)) { - return Utils.nativeValueToBlock(type, MICROSECONDS.toMillis((long) prefilledValue)); - } - return Utils.nativeValueToBlock(type, prefilledValue); - } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 7edab43e5361..851ede529492 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -109,6 +109,7 @@ import static com.facebook.presto.common.type.SmallintType.SMALLINT; import static com.facebook.presto.common.type.TimeType.TIME; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.common.type.TimestampType.TIMESTAMP_MICROSECONDS; import static com.facebook.presto.common.type.TinyintType.TINYINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.common.type.Varchars.isVarcharType; @@ -122,6 +123,7 @@ import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES; import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent; +import static com.facebook.presto.iceberg.FileFormat.PARQUET; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_FORMAT_VERSION; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; @@ -158,6 +160,7 @@ import static java.util.Collections.emptyIterator; import static java.util.Comparator.comparing; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_ENABLED; @@ -427,7 +430,7 @@ public static Optional tryGetLocation(Table table) } } - private static boolean isValidPartitionType(Type type) + private static boolean isValidPartitionType(FileFormat fileFormat, Type type) { return type instanceof DecimalType || BOOLEAN.equals(type) || @@ -438,25 +441,27 @@ private static boolean isValidPartitionType(Type type) REAL.equals(type) || DOUBLE.equals(type) || DATE.equals(type) || - TIMESTAMP.equals(type) || + type instanceof TimestampType || + (TIME.equals(type) && fileFormat == PARQUET) || VARBINARY.equals(type) || isVarcharType(type) || isCharType(type); } - private static void verifyPartitionTypeSupported(String partitionName, com.facebook.presto.common.type.Type type) + private static void verifyPartitionTypeSupported(FileFormat fileFormat, String partitionName, Type type) { - if (!isValidPartitionType(type)) { + if (!isValidPartitionType(fileFormat, type)) { throw new PrestoException(NOT_SUPPORTED, format("Unsupported type [%s] for partition: %s", type, partitionName)); } } private static NullableValue parsePartitionValue( + FileFormat fileFormat, String partitionStringValue, Type prestoType, String partitionName) { - verifyPartitionTypeSupported(partitionName, prestoType); + verifyPartitionTypeSupported(fileFormat, partitionName, prestoType); Object partitionValue = deserializePartitionValue(prestoType, partitionStringValue, partitionName); return partitionValue == null ? NullableValue.asNull(prestoType) : NullableValue.of(prestoType, partitionValue); @@ -470,7 +475,7 @@ public static List getPartitions( List partitionColumns) { IcebergTableName name = ((IcebergTableHandle) tableHandle).getIcebergTableName(); - + FileFormat fileFormat = getFileFormat(icebergTable); // Empty iceberg table would cause `snapshotId` not present Optional snapshotId = resolveSnapshotIdByName(icebergTable, name); if (!snapshotId.isPresent()) { @@ -513,7 +518,7 @@ public static List getPartitions( } } - NullableValue partitionValue = parsePartitionValue(partitionStringValue, toPrestoType(type, typeManager), partition.toString()); + NullableValue partitionValue = parsePartitionValue(fileFormat, partitionStringValue, toPrestoType(type, typeManager), partition.toString()); Optional column = partitionColumns.stream() .filter(icebergColumnHandle -> Objects.equals(icebergColumnHandle.getId(), field.sourceId())) .findAny(); @@ -603,7 +608,10 @@ public static Object deserializePartitionValue(Type type, String valueString, St if (type.equals(DOUBLE)) { return parseDouble(valueString); } - if (type.equals(DATE) || type.equals(TIME) || type.equals(TIMESTAMP)) { + if (type.equals(TIMESTAMP) || type.equals(TIME)) { + return MICROSECONDS.toMillis(parseLong(valueString)); + } + if (type.equals(DATE) || type.equals(TIMESTAMP_MICROSECONDS)) { return parseLong(valueString); } if (type instanceof VarcharType) { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 47a9af024386..824a519ef686 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -88,6 +88,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -492,11 +493,15 @@ public Object[][] timezones() @Test(dataProvider = "timezones") public void testPartitionedByTimestampType(String zoneId, boolean legacyTimestamp) { - Session session = sessionForTimezone(zoneId, legacyTimestamp); + Session sessionForTimeZone = sessionForTimezone(zoneId, legacyTimestamp); + testWithAllFileFormats(sessionForTimeZone, (session, fileFormat) -> testPartitionedByTimestampTypeForFormat(session, fileFormat)); + } + private void testPartitionedByTimestampTypeForFormat(Session session, FileFormat fileFormat) + { try { // create iceberg table partitioned by column of TimestampType, and insert some data - assertQuerySucceeds(session, "create table test_partition_columns(a bigint, b timestamp) with (partitioning = ARRAY['b'])"); + assertQuerySucceeds(session, format("create table test_partition_columns(a bigint, b timestamp) with (partitioning = ARRAY['b'], format = '%s')", fileFormat.name())); assertQuerySucceeds(session, "insert into test_partition_columns values(1, timestamp '1984-12-08 00:10:00'), (2, timestamp '2001-01-08 12:01:01')"); // validate return data of TimestampType @@ -1439,4 +1444,10 @@ private Session sessionForTimezone(String zoneId, boolean legacyTimestamp) } return sessionBuilder.build(); } + + private void testWithAllFileFormats(Session session, BiConsumer test) + { + test.accept(session, FileFormat.PARQUET); + test.accept(session, FileFormat.ORC); + } }