Skip to content

Commit

Permalink
Remove predicate in IcebergTableHandle to clarify the responsibilities
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd authored and tdcmeehan committed May 14, 2024
1 parent 1863d80 commit 5f2afc9
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
new IcebergTableLayoutHandle.Builder()
.setPartitionColumns(ImmutableList.copyOf(partitionColumns))
.setDataColumns(toHiveColumns(icebergTable.schema().columns()))
.setDomainPredicate(constraint.getSummary().transform(IcebergAbstractMetadata::toSubfield))
.setDomainPredicate(constraint.getSummary().simplify().transform(IcebergAbstractMetadata::toSubfield))
.setRemainingPredicate(TRUE_CONSTANT)
.setPredicateColumns(predicateColumns)
.setRequestedColumns(requestedColumns)
.setPushdownFilterEnabled(isPushdownFilterEnabled(session))
.setPartitionColumnPredicate(partitionColumnPredicate)
.setPartitionColumnPredicate(partitionColumnPredicate.simplify())
.setPartitions(Optional.ofNullable(partitions.size() == 0 ? null : partitions))
.setTable(handle)
.build());
Expand Down Expand Up @@ -689,7 +689,12 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
return TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
return TableStatisticsMaker.getTableStatistics(session, typeManager,
tableLayoutHandle
.map(IcebergTableLayoutHandle.class::cast)
.map(IcebergTableLayoutHandle::getValidPredicate),
constraint, handle, icebergTable,
columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
}

@Override
Expand Down Expand Up @@ -903,13 +908,10 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
}

TableScan scan = icebergTable.newScan();
TupleDomain<ColumnHandle> domainPredicate = layoutHandle.getDomainPredicate()
.transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null)
.transform(layoutHandle.getPredicateColumns()::get)
.transform(ColumnHandle.class::cast);
TupleDomain<IcebergColumnHandle> domainPredicate = layoutHandle.getValidPredicate();

if (!domainPredicate.isAll()) {
Expression filterExpression = toIcebergExpression(handle.getPredicate());
Expression filterExpression = toIcebergExpression(domainPredicate);
scan = scan.filter(filterExpression);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,18 +443,16 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(session, typeManager,
tableLayoutHandle
.map(IcebergTableLayoutHandle.class::cast)
.map(IcebergTableLayoutHandle::getValidPredicate),
constraint, handle, icebergTable,
columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
EnumSet<ColumnStatisticType> mergeFlags = getHiveStatisticsMergeStrategy(session);
return tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast).map(layoutHandle -> {
TupleDomain<ColumnHandle> domainPredicate = layoutHandle.getDomainPredicate()
.transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null)
.transform(layoutHandle.getPredicateColumns()::get)
.transform(ColumnHandle.class::cast);

TupleDomain<VariableReferenceExpression> predicate = domainPredicate.transform(icebergLayout -> {
IcebergColumnHandle columnHandle = (IcebergColumnHandle) icebergLayout;
return new VariableReferenceExpression(Optional.empty(), columnHandle.getName(), columnHandle.getType());
});
TupleDomain<VariableReferenceExpression> predicate = layoutHandle.getValidPredicate()
.transform(columnHandle -> new VariableReferenceExpression(Optional.empty(), columnHandle.getName(), columnHandle.getType()));
RowExpression translatedPredicate = rowExpressionService.getDomainTranslator().toPredicate(predicate);
TableStatistics mergedStatistics = Optional.of(mergeFlags)
.filter(set -> !set.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ public ConnectorPageSource createPageSource(
split.getLength(),
split.getFileFormat(),
regularColumns,
table.getPredicate(),
icebergLayout.getValidPredicate(),
splitContext.isCacheable());
ConnectorPageSource dataPageSource = connectorPageSourceWithRowPositions.getConnectorPageSource();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@

import javax.inject.Inject;

import static com.facebook.presto.hive.rule.FilterPushdownUtils.isEntireColumn;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
Expand Down Expand Up @@ -71,14 +69,7 @@ public ConnectorSplitSource getSplits(
return new FixedSplitSource(ImmutableList.of());
}

TupleDomain<IcebergColumnHandle> predicate = isPushdownFilterEnabled(session) ?
layoutHandle.getPartitionColumnPredicate()
.transform(IcebergColumnHandle.class::cast)
.intersect(layoutHandle.getDomainPredicate()
.transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null)
.transform(layoutHandle.getPredicateColumns()::get)) :
table.getPredicate();

TupleDomain<IcebergColumnHandle> predicate = layoutHandle.getValidPredicate();
Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName());

if (table.getIcebergTableName().getTableType() == CHANGELOG) {
Expand All @@ -93,7 +84,7 @@ public ConnectorSplitSource getSplits(
else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
table.getIcebergTableName().getSnapshotId().get(),
table.getPredicate(),
predicate,
table.getPartitionSpecId(),
table.getEqualityFieldIds());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class IcebergTableHandle
extends BaseHiveTableHandle
{
private final IcebergTableName icebergTableName;
// TODO: this field is no longer useful, would be removed in a subsequent PR
private final TupleDomain<IcebergColumnHandle> predicate;
private final boolean snapshotSpecified;
private final Optional<String> outputPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.iceberg.IcebergAbstractMetadata.isEntireColumn;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -67,7 +68,7 @@ public IcebergTableLayoutHandle(
table);
}

protected IcebergTableLayoutHandle(
public IcebergTableLayoutHandle(
List<BaseHiveColumnHandle> partitionColumns,
List<Column> dataColumns,
TupleDomain<Subfield> domainPredicate,
Expand Down Expand Up @@ -117,6 +118,17 @@ public IcebergTableHandle getTable()
return table;
}

public TupleDomain<IcebergColumnHandle> getValidPredicate()
{
TupleDomain<IcebergColumnHandle> predicate = getDomainPredicate()
.transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null)
.transform(getPredicateColumns()::get);
if (isPushdownFilterEnabled()) {
predicate = predicate.intersect(getPartitionColumnPredicate().transform(IcebergColumnHandle.class::cast));
}
return predicate;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ private TableStatisticsMaker(Table icebergTable, ConnectorSession session, TypeM
.put(ICEBERG_DATA_SIZE_BLOB_TYPE_ID, TableStatisticsMaker::readDataSizeBlob)
.build();

public static TableStatistics getTableStatistics(ConnectorSession session, TypeManager typeManager, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List<IcebergColumnHandle> columns)
public static TableStatistics getTableStatistics(ConnectorSession session, TypeManager typeManager, Optional<TupleDomain<IcebergColumnHandle>> currentPredicate, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List<IcebergColumnHandle> columns)
{
return new TableStatisticsMaker(icebergTable, session, typeManager).makeTableStatistics(tableHandle, constraint, columns);
return new TableStatisticsMaker(icebergTable, session, typeManager).makeTableStatistics(tableHandle, currentPredicate, constraint, columns);
}

private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Constraint constraint, List<IcebergColumnHandle> selectedColumns)
private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Optional<TupleDomain<IcebergColumnHandle>> currentPredicate, Constraint constraint, List<IcebergColumnHandle> selectedColumns)
{
if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent() || constraint.getSummary().isNone()) {
return TableStatistics.builder()
Expand All @@ -141,8 +141,10 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons
}

TupleDomain<IcebergColumnHandle> intersection = constraint.getSummary()
.transform(IcebergColumnHandle.class::cast)
.intersect(tableHandle.getPredicate());
.transform(IcebergColumnHandle.class::cast);
if (currentPredicate.isPresent()) {
intersection.intersect(currentPredicate.get());
}

if (intersection.isNone()) {
return TableStatistics.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.iceberg.IcebergColumnHandle;
import com.facebook.presto.iceberg.IcebergMetadataColumn;
import com.facebook.presto.iceberg.IcebergTableHandle;
import com.facebook.presto.iceberg.IcebergTableLayoutHandle;
import com.facebook.presto.iceberg.IcebergTableName;
import com.facebook.presto.iceberg.IcebergTableType;
import com.facebook.presto.iceberg.IcebergTransactionManager;
Expand Down Expand Up @@ -167,6 +168,7 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
{
TableHandle table = node.getTable();
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) table.getConnectorHandle();
Optional<IcebergTableLayoutHandle> icebergTableLayoutHandle = table.getLayout().map(IcebergTableLayoutHandle.class::cast);
IcebergTableName tableName = icebergTableHandle.getIcebergTableName();
if (!tableName.getSnapshotId().isPresent() || tableName.getTableType() != IcebergTableType.DATA) {
// Node is already optimized or not ready for planning
Expand All @@ -176,8 +178,12 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) transactionManager.get(table.getTransaction());
Table icebergTable = getIcebergTable(metadata, session, icebergTableHandle.getSchemaTableName());

TupleDomain<IcebergColumnHandle> predicate = icebergTableLayoutHandle
.map(IcebergTableLayoutHandle::getValidPredicate)
.orElse(TupleDomain.all());

// Collect info about each unique delete schema to join by
ImmutableMap<Set<Integer>, DeleteSetInfo> deleteSchemas = collectDeleteInformation(icebergTable, icebergTableHandle, tableName.getSnapshotId().get());
ImmutableMap<Set<Integer>, DeleteSetInfo> deleteSchemas = collectDeleteInformation(icebergTable, predicate, tableName.getSnapshotId().get());

if (deleteSchemas.isEmpty()) {
// no equality deletes
Expand Down Expand Up @@ -262,13 +268,13 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
}

private static ImmutableMap<Set<Integer>, DeleteSetInfo> collectDeleteInformation(Table icebergTable,
IcebergTableHandle icebergTableHandle,
long snapshotId)
TupleDomain<IcebergColumnHandle> predicate,
long snapshotId)
{
// Delete schemas can repeat, so using a normal hashmap to dedup, will be converted to immutable at the end of the function.
HashMap<Set<Integer>, DeleteSetInfo> deleteInformations = new HashMap<>();
try (CloseableIterator<DeleteFile> files =
getDeleteFiles(icebergTable, snapshotId, icebergTableHandle.getPredicate(), Optional.empty(), Optional.empty()).iterator()) {
getDeleteFiles(icebergTable, snapshotId, predicate, Optional.empty(), Optional.empty()).iterator()) {
files.forEachRemaining(delete -> {
if (fromIcebergFileContent(delete.content()) == EQUALITY_DELETES) {
ImmutableMap.Builder<Integer, PartitionFieldInfo> partitionFieldsBuilder = new ImmutableMap.Builder<>();
Expand Down Expand Up @@ -316,7 +322,7 @@ private TableScanNode createDeletesTableScan(ImmutableMap<VariableReferenceExpre
tableName.getSnapshotId(),
Optional.empty()),
icebergTableHandle.isSnapshotSpecified(),
icebergTableHandle.getPredicate(),
TupleDomain.all(),
icebergTableHandle.getOutputPath(),
icebergTableHandle.getStorageProperties(),
Optional.of(SchemaParser.toJson(new Schema(deleteFields))),
Expand Down Expand Up @@ -344,7 +350,7 @@ private TableScanNode createNewRoot(TableScanNode node, IcebergTableHandle icebe
tableName.getSnapshotId(),
tableName.getChangelogEndSnapshot()),
icebergTableHandle.isSnapshotSpecified(),
icebergTableHandle.getPredicate(),
TupleDomain.all(),
icebergTableHandle.getOutputPath(),
icebergTableHandle.getStorageProperties(),
icebergTableHandle.getTableSchemaJson(),
Expand Down
Loading

0 comments on commit 5f2afc9

Please sign in to comment.