diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index b95ce83575ab..a12dea70c762 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -239,12 +239,12 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint( new IcebergTableLayoutHandle.Builder() .setPartitionColumns(ImmutableList.copyOf(partitionColumns)) .setDataColumns(toHiveColumns(icebergTable.schema().columns())) - .setDomainPredicate(constraint.getSummary().transform(IcebergAbstractMetadata::toSubfield)) + .setDomainPredicate(constraint.getSummary().simplify().transform(IcebergAbstractMetadata::toSubfield)) .setRemainingPredicate(TRUE_CONSTANT) .setPredicateColumns(predicateColumns) .setRequestedColumns(requestedColumns) .setPushdownFilterEnabled(isPushdownFilterEnabled(session)) - .setPartitionColumnPredicate(partitionColumnPredicate) + .setPartitionColumnPredicate(partitionColumnPredicate.simplify()) .setPartitions(Optional.ofNullable(partitions.size() == 0 ? null : partitions)) .setTable(handle) .build()); @@ -689,7 +689,12 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); - return TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList())); + return TableStatisticsMaker.getTableStatistics(session, typeManager, + tableLayoutHandle + .map(IcebergTableLayoutHandle.class::cast) + .map(IcebergTableLayoutHandle::getValidPredicate), + constraint, handle, icebergTable, + columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList())); } @Override @@ -903,13 +908,10 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl } TableScan scan = icebergTable.newScan(); - TupleDomain domainPredicate = layoutHandle.getDomainPredicate() - .transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null) - .transform(layoutHandle.getPredicateColumns()::get) - .transform(ColumnHandle.class::cast); + TupleDomain domainPredicate = layoutHandle.getValidPredicate(); if (!domainPredicate.isAll()) { - Expression filterExpression = toIcebergExpression(handle.getPredicate()); + Expression filterExpression = toIcebergExpression(domainPredicate); scan = scan.filter(filterExpression); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 35f0107a7aa6..5fb8c99011b6 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -443,18 +443,16 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); - TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList())); + TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(session, typeManager, + tableLayoutHandle + .map(IcebergTableLayoutHandle.class::cast) + .map(IcebergTableLayoutHandle::getValidPredicate), + constraint, handle, icebergTable, + columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList())); EnumSet mergeFlags = getHiveStatisticsMergeStrategy(session); return tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast).map(layoutHandle -> { - TupleDomain domainPredicate = layoutHandle.getDomainPredicate() - .transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null) - .transform(layoutHandle.getPredicateColumns()::get) - .transform(ColumnHandle.class::cast); - - TupleDomain predicate = domainPredicate.transform(icebergLayout -> { - IcebergColumnHandle columnHandle = (IcebergColumnHandle) icebergLayout; - return new VariableReferenceExpression(Optional.empty(), columnHandle.getName(), columnHandle.getType()); - }); + TupleDomain predicate = layoutHandle.getValidPredicate() + .transform(columnHandle -> new VariableReferenceExpression(Optional.empty(), columnHandle.getName(), columnHandle.getType())); RowExpression translatedPredicate = rowExpressionService.getDomainTranslator().toPredicate(predicate); TableStatistics mergedStatistics = Optional.of(mergeFlags) .filter(set -> !set.isEmpty()) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java index e179eb21d653..b99ba6d3f785 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java @@ -764,7 +764,7 @@ public ConnectorPageSource createPageSource( split.getLength(), split.getFileFormat(), regularColumns, - table.getPredicate(), + icebergLayout.getValidPredicate(), splitContext.isCacheable()); ConnectorPageSource dataPageSource = connectorPageSourceWithRowPositions.getConnectorPageSource(); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java index e9ed7f0454de..360ef046d69a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java @@ -34,10 +34,8 @@ import javax.inject.Inject; -import static com.facebook.presto.hive.rule.FilterPushdownUtils.isEntireColumn; import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; -import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled; import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG; import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES; import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable; @@ -71,14 +69,7 @@ public ConnectorSplitSource getSplits( return new FixedSplitSource(ImmutableList.of()); } - TupleDomain predicate = isPushdownFilterEnabled(session) ? - layoutHandle.getPartitionColumnPredicate() - .transform(IcebergColumnHandle.class::cast) - .intersect(layoutHandle.getDomainPredicate() - .transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null) - .transform(layoutHandle.getPredicateColumns()::get)) : - table.getPredicate(); - + TupleDomain predicate = layoutHandle.getValidPredicate(); Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName()); if (table.getIcebergTableName().getTableType() == CHANGELOG) { @@ -93,7 +84,7 @@ public ConnectorSplitSource getSplits( else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) { CloseableIterable deleteFiles = IcebergUtil.getDeleteFiles(icebergTable, table.getIcebergTableName().getSnapshotId().get(), - table.getPredicate(), + predicate, table.getPartitionSpecId(), table.getEqualityFieldIds()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java index aab3b28ac8d3..9d40f7a6afb4 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java @@ -29,6 +29,7 @@ public class IcebergTableHandle extends BaseHiveTableHandle { private final IcebergTableName icebergTableName; + // TODO: this field is no longer useful, would be removed in a subsequent PR private final TupleDomain predicate; private final boolean snapshotSpecified; private final Optional outputPath; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java index a9a8c3b0b6b7..3b1a7c669181 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java @@ -31,6 +31,7 @@ import java.util.Optional; import java.util.Set; +import static com.facebook.presto.iceberg.IcebergAbstractMetadata.isEntireColumn; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -67,7 +68,7 @@ public IcebergTableLayoutHandle( table); } - protected IcebergTableLayoutHandle( + public IcebergTableLayoutHandle( List partitionColumns, List dataColumns, TupleDomain domainPredicate, @@ -117,6 +118,17 @@ public IcebergTableHandle getTable() return table; } + public TupleDomain getValidPredicate() + { + TupleDomain predicate = getDomainPredicate() + .transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null) + .transform(getPredicateColumns()::get); + if (isPushdownFilterEnabled()) { + predicate = predicate.intersect(getPartitionColumnPredicate().transform(IcebergColumnHandle.class::cast)); + } + return predicate; + } + @Override public boolean equals(Object o) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java index 557addb1b451..6ce34fb5b6fe 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java @@ -127,12 +127,12 @@ private TableStatisticsMaker(Table icebergTable, ConnectorSession session, TypeM .put(ICEBERG_DATA_SIZE_BLOB_TYPE_ID, TableStatisticsMaker::readDataSizeBlob) .build(); - public static TableStatistics getTableStatistics(ConnectorSession session, TypeManager typeManager, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List columns) + public static TableStatistics getTableStatistics(ConnectorSession session, TypeManager typeManager, Optional> currentPredicate, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List columns) { - return new TableStatisticsMaker(icebergTable, session, typeManager).makeTableStatistics(tableHandle, constraint, columns); + return new TableStatisticsMaker(icebergTable, session, typeManager).makeTableStatistics(tableHandle, currentPredicate, constraint, columns); } - private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Constraint constraint, List selectedColumns) + private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Optional> currentPredicate, Constraint constraint, List selectedColumns) { if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent() || constraint.getSummary().isNone()) { return TableStatistics.builder() @@ -141,8 +141,10 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons } TupleDomain intersection = constraint.getSummary() - .transform(IcebergColumnHandle.class::cast) - .intersect(tableHandle.getPredicate()); + .transform(IcebergColumnHandle.class::cast); + if (currentPredicate.isPresent()) { + intersection.intersect(currentPredicate.get()); + } if (intersection.isNone()) { return TableStatistics.builder() diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java index 569f8fe8a504..86a2805cddb6 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java @@ -23,6 +23,7 @@ import com.facebook.presto.iceberg.IcebergColumnHandle; import com.facebook.presto.iceberg.IcebergMetadataColumn; import com.facebook.presto.iceberg.IcebergTableHandle; +import com.facebook.presto.iceberg.IcebergTableLayoutHandle; import com.facebook.presto.iceberg.IcebergTableName; import com.facebook.presto.iceberg.IcebergTableType; import com.facebook.presto.iceberg.IcebergTransactionManager; @@ -167,6 +168,7 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext context) { TableHandle table = node.getTable(); IcebergTableHandle icebergTableHandle = (IcebergTableHandle) table.getConnectorHandle(); + Optional icebergTableLayoutHandle = table.getLayout().map(IcebergTableLayoutHandle.class::cast); IcebergTableName tableName = icebergTableHandle.getIcebergTableName(); if (!tableName.getSnapshotId().isPresent() || tableName.getTableType() != IcebergTableType.DATA) { // Node is already optimized or not ready for planning @@ -176,8 +178,12 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext context) IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) transactionManager.get(table.getTransaction()); Table icebergTable = getIcebergTable(metadata, session, icebergTableHandle.getSchemaTableName()); + TupleDomain predicate = icebergTableLayoutHandle + .map(IcebergTableLayoutHandle::getValidPredicate) + .orElse(TupleDomain.all()); + // Collect info about each unique delete schema to join by - ImmutableMap, DeleteSetInfo> deleteSchemas = collectDeleteInformation(icebergTable, icebergTableHandle, tableName.getSnapshotId().get()); + ImmutableMap, DeleteSetInfo> deleteSchemas = collectDeleteInformation(icebergTable, predicate, tableName.getSnapshotId().get()); if (deleteSchemas.isEmpty()) { // no equality deletes @@ -262,13 +268,13 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext context) } private static ImmutableMap, DeleteSetInfo> collectDeleteInformation(Table icebergTable, - IcebergTableHandle icebergTableHandle, - long snapshotId) + TupleDomain predicate, + long snapshotId) { // Delete schemas can repeat, so using a normal hashmap to dedup, will be converted to immutable at the end of the function. HashMap, DeleteSetInfo> deleteInformations = new HashMap<>(); try (CloseableIterator files = - getDeleteFiles(icebergTable, snapshotId, icebergTableHandle.getPredicate(), Optional.empty(), Optional.empty()).iterator()) { + getDeleteFiles(icebergTable, snapshotId, predicate, Optional.empty(), Optional.empty()).iterator()) { files.forEachRemaining(delete -> { if (fromIcebergFileContent(delete.content()) == EQUALITY_DELETES) { ImmutableMap.Builder partitionFieldsBuilder = new ImmutableMap.Builder<>(); @@ -316,7 +322,7 @@ private TableScanNode createDeletesTableScan(ImmutableMap context) TupleDomain simplifiedColumnDomain = entireColumnDomain.simplify(); boolean predicateNotChangedBySimplification = simplifiedColumnDomain.equals(entireColumnDomain); - IcebergTableHandle oldTableHandle = (IcebergTableHandle) handle.getConnectorHandle(); - IcebergTableHandle newTableHandle = new IcebergTableHandle( - oldTableHandle.getSchemaName(), - oldTableHandle.getIcebergTableName(), - oldTableHandle.isSnapshotSpecified(), - simplifiedColumnDomain.intersect(oldTableHandle.getPredicate()), - oldTableHandle.getOutputPath(), - oldTableHandle.getStorageProperties(), - oldTableHandle.getTableSchemaJson(), - oldTableHandle.getPartitionSpecId(), - oldTableHandle.getEqualityFieldIds()); + Optional newConnectorTableLayoutHandle = handle.getLayout().map(IcebergTableLayoutHandle.class::cast) + .map(icebergTableLayoutHandle -> new IcebergTableLayoutHandle( + icebergTableLayoutHandle.getPartitionColumns().stream() + .map(IcebergColumnHandle.class::cast).collect(toList()), + icebergTableLayoutHandle.getDataColumns(), + simplifiedColumnDomain.transform(columnHandle -> toSubfield(columnHandle)) + .intersect(icebergTableLayoutHandle.getDomainPredicate()), + icebergTableLayoutHandle.getRemainingPredicate(), + icebergTableLayoutHandle.getPredicateColumns(), + icebergTableLayoutHandle.getRequestedColumns(), + icebergTableLayoutHandle.isPushdownFilterEnabled(), + identityPartitionColumnPredicate.simplify() + .intersect(icebergTableLayoutHandle.getPartitionColumnPredicate()), + icebergTableLayoutHandle.getPartitions(), + icebergTableLayoutHandle.getTable())); TableScanNode newTableScan = new TableScanNode( tableScan.getSourceLocation(), tableScan.getId(), - new TableHandle(handle.getConnectorId(), newTableHandle, handle.getTransaction(), handle.getLayout()), + new TableHandle(handle.getConnectorId(), handle.getConnectorHandle(), handle.getTransaction(), newConnectorTableLayoutHandle), tableScan.getOutputVariables(), tableScan.getAssignments(), simplifiedColumnDomain.transform(ColumnHandle.class::cast) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index 79e17a9fd8ad..1506e6460d39 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -50,6 +50,8 @@ import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; +import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -187,6 +189,33 @@ private void testDecimalWithPrecisionAndScale(Session session, FileFormat format dropTable(session, tableName); } + @Test + public void testSimplifyPredicateOnPartitionedColumn() + { + try { + assertUpdate("create table simplify_predicate_on_partition_column(a int) with (partitioning = ARRAY['a'])"); + String insertValues = range(0, 100) + .mapToObj(Integer::toString) + .collect(joining(", ")); + assertUpdate("insert into simplify_predicate_on_partition_column values " + insertValues, 100); + + String inValues = range(0, 50) + .map(i -> i * 2 + 1) + .mapToObj(Integer::toString) + .collect(joining(", ")); + String notInValues = range(0, 50) + .map(i -> i * 2) + .mapToObj(Integer::toString) + .collect(joining(", ")); + + assertQuery("select * from simplify_predicate_on_partition_column where a in (" + inValues + ")", "values " + inValues); + assertQuery("select * from simplify_predicate_on_partition_column where a not in (" + inValues + ")", "values " + notInValues); + } + finally { + assertUpdate("drop table if exists simplify_predicate_on_partition_column"); + } + } + @Test public void testParquetPartitionByTimestamp() {