Skip to content

Commit

Permalink
Collect data size statistics for Iceberg tables
Browse files Browse the repository at this point in the history
Previously, the data size statistic was computed by using the
Iceberg data manifests data size field. This is value is misleading
for Presto because it represents the compressed on-disk size.
This change allows ANALYZE to read and write data size statistic
values to puffin files.

This change also updates the hive-statistics-merge-strategy
config value in the Iceberg connector to accept a comma-separated
list of valid values to override from the HMS instead of using
an independent enum. This allows for a wider variety of combinations
using less code.
  • Loading branch information
ZacBlanco committed May 6, 2024
1 parent dde7abe commit da86cbe
Show file tree
Hide file tree
Showing 15 changed files with 677 additions and 267 deletions.
17 changes: 11 additions & 6 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,13 @@ Property Name Description

``iceberg.enable-parquet-dereference-pushdown`` Enable parquet dereference pushdown. ``true``

``iceberg.hive-statistics-merge-strategy`` Determines how to merge statistics that are stored in the ``NONE``
Hive Metastore. The available values are ``NONE``,
``USE_NULLS_FRACTION_AND_NDV``, ``USE_NULLS_FRACTIONS``
and, ``USE_NDV``
``iceberg.hive-statistics-merge-strategy`` Comma separated list of statistics to use from the
Hive Metastore to override Iceberg table statistics.
The available values are ``NUMBER_OF_DISTINCT_VALUES``
and ``TOTAL_SIZE_IN_BYTES``.

**Note**: Only valid when the Iceberg connector is
configured with Hive.

``iceberg.statistic-snapshot-record-difference-weight`` The amount that the difference in total record count matters
when calculating the closest snapshot when picking
Expand Down Expand Up @@ -306,6 +309,8 @@ Property Name Description
============================================= ======================================================================
``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property
``iceberg.delete-as-join-rewrite-enabled`` in the current session.
``iceberg.hive_statistics_merge_strategy`` Overrides the behavior of the connector property
``iceberg.hive-statistics-merge-strategy`` in the current session.
============================================= ======================================================================

Caching Support
Expand Down Expand Up @@ -1172,7 +1177,7 @@ each Iceberg data type to the corresponding Presto data type, and from each Pres
The following tables detail the specific type maps between PrestoDB and Iceberg.

Iceberg to PrestoDB type mapping
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Map of Iceberg types to the relevant PrestoDB types:

Expand Down Expand Up @@ -1215,7 +1220,7 @@ Map of Iceberg types to the relevant PrestoDB types:
No other types are supported.

PrestoDB to Iceberg type mapping
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Map of PrestoDB types to the relevant Iceberg types:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.presto.hive.HiveCompressionCodec;
import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.hadoop.HadoopFileIO;
Expand All @@ -26,11 +26,13 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.util.EnumSet;
import java.util.List;

import static com.facebook.presto.hive.HiveCompressionCodec.GZIP;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergFileFormat.PARQUET;
import static com.facebook.presto.iceberg.util.StatisticsUtil.decodeMergeFlags;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
Expand All @@ -51,12 +53,13 @@ public class IcebergConfig
private boolean pushdownFilterEnabled;
private boolean deleteAsJoinRewriteEnabled = true;

private HiveStatisticsMergeStrategy hiveStatisticsMergeStrategy = HiveStatisticsMergeStrategy.NONE;
private EnumSet<ColumnStatisticType> hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
private boolean manifestCachingEnabled;
private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;

@NotNull
public FileFormat getFileFormat()
{
Expand Down Expand Up @@ -195,16 +198,16 @@ public boolean isMergeOnReadModeEnabled()
}

@Config("iceberg.hive-statistics-merge-strategy")
@ConfigDescription("determines how to merge statistics that are stored in the Hive Metastore")
public IcebergConfig setHiveStatisticsMergeStrategy(HiveStatisticsMergeStrategy mergeStrategy)
@ConfigDescription("Comma separated list of statistics to use from the Hive metastore to override iceberg table statistics")
public IcebergConfig setHiveStatisticsMergeFlags(String mergeFlags)
{
this.hiveStatisticsMergeStrategy = mergeStrategy;
this.hiveStatisticsMergeFlags = decodeMergeFlags(mergeFlags);
return this;
}

public HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy()
public EnumSet<ColumnStatisticType> getHiveStatisticsMergeFlags()
{
return hiveStatisticsMergeStrategy;
return hiveStatisticsMergeFlags;
}

@Config("iceberg.statistic-snapshot-record-difference-weight")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.facebook.presto.hive.metastore.PrestoTableType;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand All @@ -57,9 +56,9 @@
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.TableStatisticType;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
Expand All @@ -82,12 +81,14 @@
import java.io.IOException;
import java.time.ZoneId;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics;
import static com.facebook.presto.hive.HiveStatisticsUtil.updatePartitionStatistics;
Expand Down Expand Up @@ -122,6 +123,7 @@
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateAndSetTableSize;
import static com.facebook.presto.iceberg.util.StatisticsUtil.mergeHiveStatistics;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -442,7 +444,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
HiveStatisticsMergeStrategy mergeStrategy = getHiveStatisticsMergeStrategy(session);
EnumSet<ColumnStatisticType> mergeFlags = getHiveStatisticsMergeStrategy(session);
return tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast).map(layoutHandle -> {
TupleDomain<ColumnHandle> domainPredicate = layoutHandle.getDomainPredicate()
.transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null)
Expand All @@ -454,24 +456,24 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
return new VariableReferenceExpression(Optional.empty(), columnHandle.getName(), columnHandle.getType());
});
RowExpression translatedPredicate = rowExpressionService.getDomainTranslator().toPredicate(predicate);
PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName());
TableStatistics mergedStatistics = mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeStrategy, icebergTable.spec());
TableStatistics mergedStatistics = Optional.of(mergeFlags)
.filter(set -> !set.isEmpty())
.map(flags -> {
PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName());
return mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeFlags, icebergTable.spec());
})
.orElse(icebergStatistics);
TableStatistics.Builder filteredStatsBuilder = TableStatistics.builder()
.setRowCount(mergedStatistics.getRowCount());
double totalSize = 0;
for (ColumnHandle colHandle : columnHandles) {
IcebergColumnHandle icebergHandle = (IcebergColumnHandle) colHandle;
if (mergedStatistics.getColumnStatistics().containsKey(icebergHandle)) {
ColumnStatistics stats = mergedStatistics.getColumnStatistics().get(icebergHandle);
filteredStatsBuilder.setColumnStatistics(icebergHandle, stats);
if (!stats.getDataSize().isUnknown()) {
totalSize += stats.getDataSize().getValue();
}
}
}
filteredStatsBuilder.setTotalSize(Estimate.of(totalSize));
return filterStatsCalculatorService.filterStats(
filteredStatsBuilder.build(),
calculateAndSetTableSize(filteredStatsBuilder).build(),
translatedPredicate,
session,
columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableMap(
Expand All @@ -481,9 +483,9 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
IcebergColumnHandle::getName,
IcebergColumnHandle::getType)));
}).orElseGet(() -> {
if (!mergeStrategy.equals(HiveStatisticsMergeStrategy.NONE)) {
if (!mergeFlags.isEmpty()) {
PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName());
return mergeHiveStatistics(icebergStatistics, hiveStats, mergeStrategy, icebergTable.spec());
return mergeHiveStatistics(icebergStatistics, hiveStats, mergeFlags, icebergTable.spec());
}
return icebergStatistics;
});
Expand All @@ -500,9 +502,17 @@ public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession
{
Set<ColumnStatisticMetadata> columnStatistics = tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden())
.flatMap(meta -> metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType())
.stream()
.map(statType -> statType.getColumnStatisticMetadata(meta.getName())))
.flatMap(meta -> {
try {
return metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType())
.stream()
.map(statType -> statType.getColumnStatisticMetadata(meta.getName()));
}
// thrown in the case the type isn't supported by HMS statistics
catch (IllegalArgumentException e) {
return Stream.empty();
}
})
.collect(toImmutableSet());

Set<TableStatisticType> tableStatistics = ImmutableSet.of(ROW_COUNT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
import com.facebook.presto.hive.OrcFileWriterConfig;
import com.facebook.presto.hive.ParquetFileWriterConfig;
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy;
import com.facebook.presto.iceberg.util.StatisticsUtil;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import org.apache.parquet.column.ParquetProperties;

import javax.inject.Inject;

import java.util.EnumSet;
import java.util.List;

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.iceberg.util.StatisticsUtil.SUPPORTED_MERGE_FLAGS;
import static com.facebook.presto.iceberg.util.StatisticsUtil.decodeMergeFlags;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
Expand Down Expand Up @@ -156,14 +160,14 @@ public IcebergSessionProperties(
false),
new PropertyMetadata<>(
HIVE_METASTORE_STATISTICS_MERGE_STRATEGY,
"choose how to include statistics from the Hive Metastore when calculating table stats. Valid values are: "
+ Joiner.on(", ").join(HiveStatisticsMergeStrategy.values()),
"Flags to choose which statistics from the Hive Metastore are used when calculating table stats. Valid values are: "
+ Joiner.on(", ").join(SUPPORTED_MERGE_FLAGS),
VARCHAR,
HiveStatisticsMergeStrategy.class,
icebergConfig.getHiveStatisticsMergeStrategy(),
EnumSet.class,
icebergConfig.getHiveStatisticsMergeFlags(),
false,
val -> HiveStatisticsMergeStrategy.valueOf((String) val),
HiveStatisticsMergeStrategy::name),
val -> decodeMergeFlags((String) val),
StatisticsUtil::encodeMergeFlags),
booleanProperty(
PUSHDOWN_FILTER_ENABLED,
"Experimental: Enable Filter Pushdown for Iceberg. This is only supported with Native Worker.",
Expand Down Expand Up @@ -272,9 +276,9 @@ public static boolean isMergeOnReadModeEnabled(ConnectorSession session)
return session.getProperty(MERGE_ON_READ_MODE_ENABLED, Boolean.class);
}

public static HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy(ConnectorSession session)
public static EnumSet<ColumnStatisticType> getHiveStatisticsMergeStrategy(ConnectorSession session)
{
return session.getProperty(HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, HiveStatisticsMergeStrategy.class);
return session.getProperty(HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, EnumSet.class);
}

public static boolean isPushdownFilterEnabled(ConnectorSession session)
Expand Down
Loading

0 comments on commit da86cbe

Please sign in to comment.