Skip to content

Commit

Permalink
Revert "[Iceberg] Add predicate in layout without pushdown_filter_ena…
Browse files Browse the repository at this point in the history
…bled"

This reverts commit b2d4532.
  • Loading branch information
feilong-liu committed May 3, 2024
1 parent 30cb315 commit d3d28c0
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import java.util.stream.Collectors;

import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.MetadataUtils.createPredicate;
import static com.facebook.presto.hive.MetadataUtils.getCombinedRemainingPredicate;
import static com.facebook.presto.hive.MetadataUtils.getDiscretePredicates;
import static com.facebook.presto.hive.MetadataUtils.getPredicate;
Expand Down Expand Up @@ -215,14 +216,13 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
IcebergTableHandle handle = (IcebergTableHandle) table;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());

List<IcebergColumnHandle> partitionColumns = getPartitionKeyColumnHandles(icebergTable, typeManager);
TupleDomain<ColumnHandle> partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), Predicates.in(partitionColumns)));
TupleDomain<ColumnHandle> partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), Predicates.in(getPartitionKeyColumnHandles(icebergTable, typeManager))));
Optional<Set<IcebergColumnHandle>> requestedColumns = desiredColumns.map(columns -> columns.stream().map(column -> (IcebergColumnHandle) column).collect(toImmutableSet()));

ConnectorTableLayout layout = getTableLayout(
session,
new IcebergTableLayoutHandle.Builder()
.setPartitionColumns(ImmutableList.copyOf(partitionColumns))
.setPartitionColumns(ImmutableList.copyOf(getPartitionKeyColumnHandles(icebergTable, typeManager)))
.setDataColumns(toHiveColumns(icebergTable.schema().columns()))
.setDomainPredicate(constraint.getSummary().transform(IcebergAbstractMetadata::toSubfield))
.setRemainingPredicate(TRUE_CONSTANT)
Expand Down Expand Up @@ -258,52 +258,59 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa

Table icebergTable = getIcebergTable(session, tableHandle.getSchemaTableName());
validateTableMode(session, icebergTable);
List<ColumnHandle> partitionColumns = ImmutableList.copyOf(icebergTableLayoutHandle.getPartitionColumns());

if (!isPushdownFilterEnabled(session)) {
return new ConnectorTableLayout(handle);
}

if (!icebergTableLayoutHandle.getPartitions().isPresent()) {
return new ConnectorTableLayout(
icebergTableLayoutHandle,
Optional.empty(),
icebergTableLayoutHandle.getPartitionColumnPredicate(),
TupleDomain.none(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableList.of(),
Optional.empty());
}
Optional<List<HivePartition>> partitions = icebergTableLayoutHandle.getPartitions();
Optional<DiscretePredicates> discretePredicates = partitions.flatMap(parts -> getDiscretePredicates(partitionColumns, parts));
List<ColumnHandle> partitionColumns = ImmutableList.copyOf(icebergTableLayoutHandle.getPartitionColumns());
List<HivePartition> partitions = icebergTableLayoutHandle.getPartitions().get();

Optional<DiscretePredicates> discretePredicates = getDiscretePredicates(partitionColumns, partitions);

TupleDomain<ColumnHandle> predicate;
RowExpression subfieldPredicate;
if (isPushdownFilterEnabled(session)) {
Map<String, ColumnHandle> predicateColumns = icebergTableLayoutHandle.getPredicateColumns().entrySet()
.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Map<String, ColumnHandle> predicateColumns = icebergTableLayoutHandle.getPredicateColumns().entrySet()
.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Optional<TupleDomain<ColumnHandle>> predicate = partitions.map(parts -> getPredicate(icebergTableLayoutHandle, partitionColumns, parts, predicateColumns));
// capture subfields from domainPredicate to add to remainingPredicate
// so those filters don't get lost
Map<String, com.facebook.presto.common.type.Type> columnTypes = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getName, icebergColumnHandle -> getColumnMetadata(session, tableHandle, icebergColumnHandle).getType()));
predicate = getPredicate(icebergTableLayoutHandle, partitionColumns, partitions, predicateColumns);

RowExpression subfieldPredicate = getSubfieldPredicate(session, icebergTableLayoutHandle, columnTypes, functionResolution, rowExpressionService);
// capture subfields from domainPredicate to add to remainingPredicate
// so those filters don't get lost
Map<String, com.facebook.presto.common.type.Type> columnTypes = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getName, icebergColumnHandle -> getColumnMetadata(session, tableHandle, icebergColumnHandle).getType()));

subfieldPredicate = getSubfieldPredicate(session, icebergTableLayoutHandle, columnTypes, functionResolution, rowExpressionService);
}
else {
predicate = createPredicate(partitionColumns, partitions);
subfieldPredicate = TRUE_CONSTANT;
}

// combine subfieldPredicate with remainingPredicate
RowExpression combinedRemainingPredicate = getCombinedRemainingPredicate(icebergTableLayoutHandle, subfieldPredicate);

return predicate.map(pred -> new ConnectorTableLayout(
icebergTableLayoutHandle,
Optional.empty(),
pred,
Optional.empty(),
Optional.empty(),
discretePredicates,
ImmutableList.of(),
Optional.of(combinedRemainingPredicate)))
.orElseGet(() -> new ConnectorTableLayout(
icebergTableLayoutHandle,
Optional.empty(),
TupleDomain.none(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableList.of(),
Optional.empty()));
return new ConnectorTableLayout(
icebergTableLayoutHandle,
Optional.empty(),
predicate,
Optional.empty(),
Optional.empty(),
discretePredicates,
ImmutableList.of(),
Optional.of(combinedRemainingPredicate));
}

protected Optional<SystemTable> getIcebergSystemTable(SchemaTableName tableName, Table table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons
.setRowCount(Estimate.of(0))
.build();
}
// the total record count for the whole table
Optional<Long> totalRecordCount = Optional.of(intersection)
.filter(domain -> !domain.isAll())
.map(domain -> getDataTableSummary(tableHandle, ImmutableList.of(), TupleDomain.all(), idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields).getRecordCount());

double recordCount = summary.getRecordCount();
TableStatistics.Builder result = TableStatistics.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -117,7 +116,7 @@
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public abstract class IcebergDistributedTestBase
public class IcebergDistributedTestBase
extends AbstractTestDistributedQueries
{
private final CatalogType catalogType;
Expand Down Expand Up @@ -1201,63 +1200,6 @@ public void testEqualityDeletesWithHiddenPartitionsEvolution(String fileFormat,
assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, '1001', NULL, NULL), (3, '1003', NULL, NULL), (6, '1004', 1, NULL), (6, '1006', 2, 'th002')");
}

@Test
public void testPartShowStatsWithFilters()
{
assertQuerySucceeds("CREATE TABLE showstatsfilters (i int) WITH (partitioning = ARRAY['i'])");
assertQuerySucceeds("INSERT INTO showstatsfilters VALUES 1, 2, 3, 4, 5, 6, 7, 7, 7, 7");
assertQuerySucceeds("ANALYZE showstatsfilters");

MaterializedResult statsTable = getQueryRunner().execute("SHOW STATS for showstatsfilters");
MaterializedRow columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(2), 7.0); // ndvs;
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals(columnStats.getField(5), "1"); // min
assertEquals(columnStats.getField(6), "7"); // max

// EQ
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i = 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "7"); // min
assertEquals(columnStats.getField(6), "7"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals((double) columnStats.getField(2), 7.0d * (4.0d / 10.0d), 1E-8); // ndvs;

// LT
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i < 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "1"); // min
assertEquals(columnStats.getField(6), "6"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals((double) columnStats.getField(2), 7.0d * (6.0d / 10.0d), 1E-8); // ndvs;

// LTE
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i <= 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "1"); // min
assertEquals(columnStats.getField(6), "7"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals(columnStats.getField(2), 7.0d); // ndvs;

// GT
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i > 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), null); // min
assertEquals(columnStats.getField(6), null); // max
assertEquals(columnStats.getField(3), null); // nulls
assertEquals(columnStats.getField(2), null); // ndvs;

// GTE
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i >= 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "7"); // min
assertEquals(columnStats.getField(6), "7"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals((double) columnStats.getField(2), 7.0d * (4.0d / 10.0d), 1E-8); // ndvs;

assertQuerySucceeds("DROP TABLE showstatsfilters");
}

private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List<FileContent> expectedFileContent)
{
// check delete file list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ public void testStatsByDistance()
// ignore because HMS doesn't support statistics versioning
}

@Override
public void testPartShowStatsWithFilters()
{
// Hive doesn't support returning statistics on partitioned tables
}

@Override
protected Table loadTable(String tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,33 +139,18 @@ public Builder setNullsFraction(Estimate nullsFraction)
return this;
}

public Estimate getNullsFraction()
{
return nullsFraction;
}

public Builder setDistinctValuesCount(Estimate distinctValuesCount)
{
this.distinctValuesCount = requireNonNull(distinctValuesCount, "distinctValuesCount is null");
return this;
}

public Estimate getDistinctValuesCount()
{
return distinctValuesCount;
}

public Builder setDataSize(Estimate dataSize)
{
this.dataSize = requireNonNull(dataSize, "dataSize is null");
return this;
}

public Estimate getDataSize()
{
return dataSize;
}

public Builder setRange(DoubleRange range)
{
this.range = Optional.of(requireNonNull(range, "range is null"));
Expand Down

0 comments on commit d3d28c0

Please sign in to comment.