From fa50c51c2ed8bae7e77a9a2346d4ef5a3f5df958 Mon Sep 17 00:00:00 2001 From: wangd Date: Sat, 25 May 2024 02:09:07 +0800 Subject: [PATCH] [Iceberg] Refine the partition specs that really need to be checked --- .../iceberg/IcebergAbstractMetadata.java | 8 +- .../facebook/presto/iceberg/IcebergUtil.java | 9 ++ .../optimizer/IcebergPlanOptimizer.java | 10 +- .../IcebergDistributedSmokeTestBase.java | 52 +++++++++++ .../iceberg/IcebergDistributedTestBase.java | 76 +++++++++++++++ .../iceberg/TestIcebergLogicalPlanner.java | 93 +++++++++++++++++++ .../iceberg/rest/TestIcebergSmokeRest.java | 28 ++++++ 7 files changed, 262 insertions(+), 14 deletions(-) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 7297cf7cd8c5..7c22eb41c80c 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -69,7 +69,6 @@ import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileMetadata; -import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; @@ -125,6 +124,7 @@ import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode; import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles; +import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData; import static com.facebook.presto.iceberg.IcebergUtil.getPartitions; import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdAsOfTime; import static com.facebook.presto.iceberg.IcebergUtil.getTableComment; @@ -874,11 +874,7 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa // Get partition specs that really need to be checked Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); - Set partitionSpecIds = handle.getIcebergTableName().getSnapshotId().map( - snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream() - .map(ManifestFile::partitionSpecId) - .collect(toImmutableSet())) - .orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter. + Set partitionSpecIds = getPartitionSpecsIncludingValidData(icebergTable, handle.getIcebergTableName().getSnapshotId()); Set enforcedColumnIds = getEnforcedColumns(icebergTable, partitionSpecIds, domainPredicate, session) .stream() diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 77d5624e5b9b..d538776aa6ce 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -300,6 +300,15 @@ public static Map getIdentityPartitions(PartitionSpec p return columns.build(); } + public static Set getPartitionSpecsIncludingValidData(Table icebergTable, Optional snapshotId) + { + return snapshotId.map(snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream() + .filter(manifestFile -> manifestFile.hasAddedFiles() || manifestFile.hasExistingFiles()) + .map(ManifestFile::partitionSpecId) + .collect(toImmutableSet())) + .orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter. + } + public static List toHiveColumns(List columns) { return columns.stream() diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java index 0c78350fc1c3..35f8c6b356bb 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java @@ -51,7 +51,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; -import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -71,11 +70,11 @@ import static com.facebook.presto.iceberg.IcebergTableType.DATA; import static com.facebook.presto.iceberg.IcebergUtil.getAdjacentValue; import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable; +import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData; import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -177,12 +176,7 @@ public PlanNode visitFilter(FilterNode filter, RewriteContext context) RowExpression subfieldPredicate = rowExpressionService.getDomainTranslator().toPredicate(subfieldTupleDomain); // Get partition specs that really need to be checked - Set partitionSpecIds = tableHandle.getIcebergTableName().getSnapshotId().map( - snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream() - .map(ManifestFile::partitionSpecId) - .collect(toImmutableSet())) - .orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter. - + Set partitionSpecIds = getPartitionSpecsIncludingValidData(icebergTable, tableHandle.getIcebergTableName().getSnapshotId()); Set enforcedColumns = getEnforcedColumns(icebergTable, partitionSpecIds, entireColumnDomain, diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index 8a8972fd8b2d..b08defa50f01 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -1588,6 +1588,58 @@ public void testDeleteOnPartitionedV1Table() dropTable(session, tableName); } + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(String version, String mode) + { + String tableName = "test_empty_partition_spec_table"; + try { + // Create a table with no partition + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')"); + + // Do not insert data, and evaluate the partition spec by adding a partition column `c` + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + + // Insert data under the new partition spec + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4); + + // We can do metadata delete on partition column `c`, because the initial partition spec contains no data + assertUpdate("DELETE FROM " + tableName + " WHERE c in (1, 3)", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', 2), (4, '1004', 4)"); + } + finally { + dropTable(getSession(), tableName); + } + } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(String version, String mode) + { + String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*"; + String tableName = "test_data_deleted_partition_spec_table"; + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + // Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + // Do not support metadata delete with filter on column `c`, because we have data with old partition spec + assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage); + + // Do metadata delete on column `a`, because all partition specs contains partition column `a` + assertUpdate("DELETE FROM " + tableName + " WHERE a in (1, 2)", 2); + + // Then we can do metadata delete on column `c`, because the old partition spec contains no data now + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3)"); + } + finally { + dropTable(getSession(), tableName); + } + } + @DataProvider(name = "version_and_mode") public Object[][] versionAndMode() { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 9636aca5a301..6ae0751e8c5b 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -1406,6 +1406,82 @@ public void testMetadataDeleteOnPartitionedTableWithDeleteFiles() } } + @Test + public void testMetadataDeleteOnV2MorTableWithEmptyUnsupportedSpecs() + { + String tableName = "test_empty_partition_spec_table"; + try { + // Create a table with no partition + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '2', delete_mode = 'merge-on-read')"); + + // Do not insert data, and evaluate the partition spec by adding a partition column `c` + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + + // Insert data under the new partition spec + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4); + + Table icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 4); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + // Do metadata delete on partition column `c`, because the initial partition spec contains no data + assertUpdate("DELETE FROM " + tableName + " WHERE c in (1, 3)", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', 2), (4, '1004', 4)"); + + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 2); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testMetadataDeleteOnV2MorTableWithUnsupportedSpecsWhoseDataAllDeleted() + { + String tableName = "test_data_deleted_partition_spec_table"; + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '2', delete_mode = 'merge-on-read', partitioning = ARRAY['a'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + Table icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 2); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + // Evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 5); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + // Execute row level delete with filter on column `c`, because we have data with old partition spec + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 5); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 2); + + // Do metadata delete on column `a`, because all partition specs contains partition column `a` + assertUpdate("DELETE FROM " + tableName + " WHERE a in (1, 2)", 2); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 3); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 2); + + // Then do metadata delete on column `c`, because the old partition spec contains no data now + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 0); + assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 1); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List expectedFileContent) { // check delete file list diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java index 5ac7542f7942..af5aef01101f 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java @@ -18,11 +18,13 @@ import com.facebook.presto.common.Subfield; import com.facebook.presto.common.predicate.Domain; import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.predicate.ValueSet; import com.facebook.presto.common.type.TimeZoneKey; import com.facebook.presto.cost.StatsProvider; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.plan.AggregationNode; import com.facebook.presto.spi.plan.FilterNode; @@ -88,6 +90,7 @@ import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static com.facebook.presto.iceberg.IcebergSessionProperties.PARQUET_DEREFERENCE_PUSHDOWN_ENABLED; import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED; +import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled; import static com.facebook.presto.parquet.ParquetTypeUtils.pushdownColumnNameForSubfield; import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.AND; import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.OR; @@ -635,6 +638,96 @@ public void testFiltersWithPushdownDisable() assertUpdate("DROP TABLE test_filters_with_pushdown_disable"); } + @Test + public void testThoroughlyPushdownForTableWithUnsupportedSpecsIncludingNoData() + { + // The filter pushdown session property is disabled by default + Session sessionWithoutFilterPushdown = getQueryRunner().getDefaultSession(); + assertEquals(isPushdownFilterEnabled(sessionWithoutFilterPushdown.toConnectorSession(new ConnectorId(ICEBERG_CATALOG))), false); + + String tableName = "test_empty_partition_spec_table"; + try { + // Create a table with no partition + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '1')"); + + // Do not insert data, and evaluate the partition spec by adding a partition column `c` + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + + // Insert data under the new partition spec + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4); + + // Only identity partition column predicates, would be enforced totally by tableScan + assertPlan(sessionWithoutFilterPushdown, "SELECT a, b FROM " + tableName + " WHERE c > 2", + output(exchange( + strictTableScan(tableName, identityMap("a", "b")))), + plan -> assertTableLayout( + plan, + tableName, + withColumnDomains(ImmutableMap.of(new Subfield( + "c", + ImmutableList.of()), + Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 2L)), false))), + TRUE_CONSTANT, + ImmutableSet.of("c"))); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)"); + } + finally { + assertUpdate("DROP TABLE " + tableName); + } + } + + @Test + public void testThoroughlyPushdownForTableWithUnsupportedSpecsWhoseDataAllDeleted() + { + // The filter pushdown session property is disabled by default + Session sessionWithoutFilterPushdown = getQueryRunner().getDefaultSession(); + assertEquals(isPushdownFilterEnabled(sessionWithoutFilterPushdown.toConnectorSession(new ConnectorId(ICEBERG_CATALOG))), false); + + String tableName = "test_data_deleted_partition_spec_table"; + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '1', partitioning = ARRAY['a'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + // Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + // The predicate was enforced partially by tableScan, filter on `c` could not be thoroughly pushed down, so the filterNode drop it's filter condition `a > 2` + assertPlan(sessionWithoutFilterPushdown, "SELECT b FROM " + tableName + " WHERE a > 2 and c = 4", + output(exchange(project( + filter("c = 4", + strictTableScan(tableName, identityMap("b", "c"))))))); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)"); + + // Do metadata delete on column `a`, because all partition specs contains partition column `a` + assertUpdate("DELETE FROM " + tableName + " WHERE a IN (1, 2)", 2); + + // Only identity partition column predicates, would be enforced totally by tableScan + assertPlan(sessionWithoutFilterPushdown, "SELECT b FROM " + tableName + " WHERE a > 2 and c = 4", + output(exchange( + strictTableScan(tableName, identityMap("b")))), + plan -> assertTableLayout( + plan, + tableName, + withColumnDomains(ImmutableMap.of( + new Subfield( + "a", + ImmutableList.of()), + Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 2L)), false), + new Subfield( + "c", + ImmutableList.of()), + singleValue(INTEGER, 4L))), + TRUE_CONSTANT, + ImmutableSet.of("a", "c"))); + assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)"); + } + finally { + assertUpdate("DROP TABLE " + tableName); + } + } + @DataProvider(name = "timezones") public Object[][] timezones() { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java index 3298c57ce075..65c34bc31bbe 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java @@ -164,4 +164,32 @@ public void testMetadataDeleteOnNonIdentityPartitionColumn(String version, Strin super.testMetadataDeleteOnNonIdentityPartitionColumn(version, mode); } } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(String version, String mode) + { + if (version.equals("1")) { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(() -> super.testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(version, mode)) + .isInstanceOf(RuntimeException.class); + } + else { + // v2 succeeds + super.testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(version, mode); + } + } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(String version, String mode) + { + if (version.equals("1")) { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(() -> super.testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(version, mode)) + .isInstanceOf(RuntimeException.class); + } + else { + // v2 succeeds + super.testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(version, mode); + } + } }