Skip to content

Commit

Permalink
Fix TimestampType/TimeType values when get partitions for Iceberg table
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed May 9, 2024
1 parent c4e1d8a commit d43b6f8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
package com.facebook.presto.iceberg;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.Utils;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.common.type.TimeType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.HivePartitionKey;
import com.facebook.presto.iceberg.delete.IcebergDeletePageSink;
Expand All @@ -38,13 +35,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import static com.facebook.presto.common.Utils.nativeValueToBlock;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static com.facebook.presto.iceberg.IcebergUtil.deserializePartitionValue;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class IcebergUpdateablePageSource
implements UpdatablePageSource
Expand Down Expand Up @@ -226,12 +222,4 @@ protected void closeWithSuppression(Throwable throwable)
}
}
}

private Block nativeValueToBlock(Type type, Object prefilledValue)
{
if (prefilledValue != null && (type instanceof TimestampType && ((TimestampType) type).getPrecision() == MILLISECONDS || type instanceof TimeType)) {
return Utils.nativeValueToBlock(type, MICROSECONDS.toMillis((long) prefilledValue));
}
return Utils.nativeValueToBlock(type, prefilledValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimeType.TIME;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP_MICROSECONDS;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
Expand All @@ -122,6 +123,7 @@
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES;
import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent;
import static com.facebook.presto.iceberg.FileFormat.PARQUET;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_FORMAT_VERSION;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
Expand Down Expand Up @@ -158,6 +160,7 @@
import static java.util.Collections.emptyIterator;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_ENABLED;
Expand Down Expand Up @@ -427,7 +430,7 @@ public static Optional<String> tryGetLocation(Table table)
}
}

private static boolean isValidPartitionType(Type type)
private static boolean isValidPartitionType(FileFormat fileFormat, Type type)
{
return type instanceof DecimalType ||
BOOLEAN.equals(type) ||
Expand All @@ -438,25 +441,27 @@ private static boolean isValidPartitionType(Type type)
REAL.equals(type) ||
DOUBLE.equals(type) ||
DATE.equals(type) ||
TIMESTAMP.equals(type) ||
type instanceof TimestampType ||
(TIME.equals(type) && fileFormat == PARQUET) ||
VARBINARY.equals(type) ||
isVarcharType(type) ||
isCharType(type);
}

private static void verifyPartitionTypeSupported(String partitionName, com.facebook.presto.common.type.Type type)
private static void verifyPartitionTypeSupported(FileFormat fileFormat, String partitionName, Type type)
{
if (!isValidPartitionType(type)) {
if (!isValidPartitionType(fileFormat, type)) {
throw new PrestoException(NOT_SUPPORTED, format("Unsupported type [%s] for partition: %s", type, partitionName));
}
}

private static NullableValue parsePartitionValue(
FileFormat fileFormat,
String partitionStringValue,
Type prestoType,
String partitionName)
{
verifyPartitionTypeSupported(partitionName, prestoType);
verifyPartitionTypeSupported(fileFormat, partitionName, prestoType);

Object partitionValue = deserializePartitionValue(prestoType, partitionStringValue, partitionName);
return partitionValue == null ? NullableValue.asNull(prestoType) : NullableValue.of(prestoType, partitionValue);
Expand All @@ -470,7 +475,7 @@ public static List<HivePartition> getPartitions(
List<IcebergColumnHandle> partitionColumns)
{
IcebergTableName name = ((IcebergTableHandle) tableHandle).getIcebergTableName();

FileFormat fileFormat = getFileFormat(icebergTable);
// Empty iceberg table would cause `snapshotId` not present
Optional<Long> snapshotId = resolveSnapshotIdByName(icebergTable, name);
if (!snapshotId.isPresent()) {
Expand Down Expand Up @@ -513,7 +518,7 @@ public static List<HivePartition> getPartitions(
}
}

NullableValue partitionValue = parsePartitionValue(partitionStringValue, toPrestoType(type, typeManager), partition.toString());
NullableValue partitionValue = parsePartitionValue(fileFormat, partitionStringValue, toPrestoType(type, typeManager), partition.toString());
Optional<IcebergColumnHandle> column = partitionColumns.stream()
.filter(icebergColumnHandle -> Objects.equals(icebergColumnHandle.getId(), field.sourceId()))
.findAny();
Expand Down Expand Up @@ -603,7 +608,10 @@ public static Object deserializePartitionValue(Type type, String valueString, St
if (type.equals(DOUBLE)) {
return parseDouble(valueString);
}
if (type.equals(DATE) || type.equals(TIME) || type.equals(TIMESTAMP)) {
if (type.equals(TIMESTAMP) || type.equals(TIME)) {
return MICROSECONDS.toMillis(parseLong(valueString));
}
if (type.equals(DATE) || type.equals(TIMESTAMP_MICROSECONDS)) {
return parseLong(valueString);
}
if (type instanceof VarcharType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -492,11 +493,15 @@ public Object[][] timezones()
@Test(dataProvider = "timezones")
public void testPartitionedByTimestampType(String zoneId, boolean legacyTimestamp)
{
Session session = sessionForTimezone(zoneId, legacyTimestamp);
Session sessionForTimeZone = sessionForTimezone(zoneId, legacyTimestamp);
testWithAllFileFormats(sessionForTimeZone, (session, fileFormat) -> testPartitionedByTimestampTypeForFormat(session, fileFormat));
}

private void testPartitionedByTimestampTypeForFormat(Session session, FileFormat fileFormat)
{
try {
// create iceberg table partitioned by column of TimestampType, and insert some data
assertQuerySucceeds(session, "create table test_partition_columns(a bigint, b timestamp) with (partitioning = ARRAY['b'])");
assertQuerySucceeds(session, format("create table test_partition_columns(a bigint, b timestamp) with (partitioning = ARRAY['b'], format = '%s')", fileFormat.name()));
assertQuerySucceeds(session, "insert into test_partition_columns values(1, timestamp '1984-12-08 00:10:00'), (2, timestamp '2001-01-08 12:01:01')");

// validate return data of TimestampType
Expand Down Expand Up @@ -1439,4 +1444,10 @@ private Session sessionForTimezone(String zoneId, boolean legacyTimestamp)
}
return sessionBuilder.build();
}

private void testWithAllFileFormats(Session session, BiConsumer<Session, FileFormat> test)
{
test.accept(session, FileFormat.PARQUET);
test.accept(session, FileFormat.ORC);
}
}

0 comments on commit d43b6f8

Please sign in to comment.