Skip to content

Commit

Permalink
Remove partitioning flow for cte materialized tables
Browse files Browse the repository at this point in the history
Previously we were partitioning/bucketing the temp tables written by the first column. However for cte_materialized tables
it makes more sense to do random partitioning. Its more effective and even helps the readers from reading without potential skew
  • Loading branch information
jaystarshot committed Jun 4, 2024
1 parent 3e0fad0 commit 37c9f33
Show file tree
Hide file tree
Showing 15 changed files with 21 additions and 155 deletions.
16 changes: 1 addition & 15 deletions presto-docs/src/main/sphinx/admin/cte-materialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ To address this, Presto supports CTE Materialization allowing intermediate CTEs
Materializing CTEs can improve performance when the same CTE is used multiple times in a query by reducing recomputation of the CTE. However, there is also a cost to writing to and reading from disk, so the optimization may not be beneficial for very simple CTEs
or CTEs that are not used many times in a query.

Materialized CTEs are stored in temporary tables that are bucketed based on the first projection column.
Materialized CTEs are stored in temporary tables that are bucketed based on random hashing.
To use this feature, the connector used by the query must support the creation of temporary tables. Currently, only the :doc:`/connector/hive` offers this capability.
The QueryStats (com.facebook.presto.spi.eventlistener.QueryStatistics#writtenIntermediateBytes) expose a metric to the event listener to monitor the bytes written to intermediate storage by temporary tables.

Expand Down Expand Up @@ -48,20 +48,6 @@ When ``cte-materialization-strategy`` is set to ``HEURISTIC`` or ``HEURISTIC_COM

Use the ``cte_heuristic_replication_threshold`` session property to set on a per-query basis.

``query.cte-hash-partition-count``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``100``

The number of buckets to be used for materializing CTEs in queries.
This setting determines how many buckets or writers should be used when materializing the CTEs, potentially affecting the performance of queries involving CTE materialization.
A higher number of partitions might improve parallelism but also increases overhead in terms of memory and network communication.

Recommended value: 4 - 10x times the size of the cluster.

Use the ``cte_hash_partition_count`` session property to set on a per-query basis.

``query.cte-partitioning-provider-catalog``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@
import static com.facebook.presto.hive.HivePartitioningHandle.createPrestoNativePartitioningHandle;
import static com.facebook.presto.hive.HiveSessionProperties.HIVE_STORAGE_FORMAT;
import static com.facebook.presto.hive.HiveSessionProperties.RESPECT_TABLE_FORMAT;
import static com.facebook.presto.hive.HiveSessionProperties.getBucketFunctionTypeForCteMaterialization;
import static com.facebook.presto.hive.HiveSessionProperties.getBucketFunctionTypeForExchange;
import static com.facebook.presto.hive.HiveSessionProperties.getCompressionCodec;
import static com.facebook.presto.hive.HiveSessionProperties.getHiveStorageFormat;
Expand Down Expand Up @@ -2947,13 +2946,6 @@ public ConnectorTableLayoutHandle getAlternativeLayoutHandle(ConnectorSession se
.setBucketHandle(Optional.of(updatedBucketHandle))
.build();
}

@Override
public ConnectorPartitioningHandle getPartitioningHandleForCteMaterialization(ConnectorSession session, int partitionCount, List<Type> partitionTypes)
{
return getHivePartitionHandle(session, partitionCount, partitionTypes, getBucketFunctionTypeForCteMaterialization(session));
}

@Override
public ConnectorPartitioningHandle getPartitioningHandleForExchange(ConnectorSession session, int partitionCount, List<Type> partitionTypes)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public final class SystemSessionProperties
public static final String DISTRIBUTED_JOIN = "distributed_join";
public static final String DISTRIBUTED_INDEX_JOIN = "distributed_index_join";
public static final String HASH_PARTITION_COUNT = "hash_partition_count";
public static final String CTE_HASH_PARTITION_COUNT = "cte_hash_partition_count";
public static final String CTE_HEURISTIC_REPLICATION_THRESHOLD = "cte_heuristic_replication_threshold";

public static final String PARTITIONING_PROVIDER_CATALOG = "partitioning_provider_catalog";
Expand Down Expand Up @@ -432,11 +431,6 @@ public SystemSessionProperties(
"Number of partitions for distributed joins and aggregations",
queryManagerConfig.getHashPartitionCount(),
false),
integerProperty(
CTE_HASH_PARTITION_COUNT,
"Number of partitions for materializing CTEs",
queryManagerConfig.getCteHashPartitionCount(),
false),
stringProperty(
PARTITIONING_PROVIDER_CATALOG,
"Name of the catalog providing custom partitioning",
Expand Down Expand Up @@ -2020,11 +2014,6 @@ public static int getHashPartitionCount(Session session)
return session.getSystemProperty(HASH_PARTITION_COUNT, Integer.class);
}

public static int getCteHashPartitionCount(Session session)
{
return session.getSystemProperty(CTE_HASH_PARTITION_COUNT, Integer.class);
}

public static int getCteHeuristicReplicationThreshold(Session session)
{
return session.getSystemProperty(CTE_HEURISTIC_REPLICATION_THRESHOLD, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public class QueryManagerConfig
private int maxQueuedQueries = 5000;

private int hashPartitionCount = 100;

private int cteHashPartitionCount = 100;
private String partitioningProviderCatalog = GlobalSystemConnector.NAME;
private String ctePartitioningProviderCatalog = GlobalSystemConnector.NAME;
private ExchangeMaterializationStrategy exchangeMaterializationStrategy = ExchangeMaterializationStrategy.NONE;
Expand Down Expand Up @@ -157,20 +155,6 @@ public QueryManagerConfig setMaxQueuedQueries(int maxQueuedQueries)
return this;
}

@Min(1)
public int getCteHashPartitionCount()
{
return cteHashPartitionCount;
}

@Config("query.cte-hash-partition-count")
@ConfigDescription("Number of writers or buckets allocated per materialized CTE. (Recommended value: 4 - 10x times the size of the cluster)")
public QueryManagerConfig setCteHashPartitionCount(int cteHashPartitionCount)
{
this.cteHashPartitionCount = cteHashPartitionCount;
return this;
}

@Min(1)
public int getHashPartitionCount()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,6 @@ public PartitioningHandle getPartitioningHandleForExchange(Session session, Stri
return delegate.getPartitioningHandleForExchange(session, catalogName, partitionCount, partitionTypes);
}

@Override
public PartitioningHandle getPartitioningHandleForCteMaterialization(Session session, String catalogName, int partitionCount, List<Type> partitionTypes)
{
return delegate.getPartitioningHandleForCteMaterialization(session, catalogName, partitionCount, partitionTypes);
}

@Override
public Optional<Object> getInfo(Session session, TableHandle handle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,6 @@ public interface Metadata
*/
PartitioningHandle getPartitioningHandleForExchange(Session session, String catalogName, int partitionCount, List<Type> partitionTypes);

/**
* Provides partitioning handle for cte Materialization.
*/
PartitioningHandle getPartitioningHandleForCteMaterialization(Session session, String catalogName, int partitionCount, List<Type> partitionTypes);

Optional<Object> getInfo(Session session, TableHandle handle);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,18 +450,6 @@ public PartitioningHandle getPartitioningHandleForExchange(Session session, Stri
return new PartitioningHandle(Optional.of(connectorId), Optional.of(transaction), connectorPartitioningHandle);
}

@Override
public PartitioningHandle getPartitioningHandleForCteMaterialization(Session session, String catalogName, int partitionCount, List<Type> partitionTypes)
{
CatalogMetadata catalogMetadata = getOptionalCatalogMetadata(session, transactionManager, catalogName)
.orElseThrow(() -> new PrestoException(NOT_FOUND, format("Catalog '%s' does not exist", catalogName)));
ConnectorId connectorId = catalogMetadata.getConnectorId();
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId);
ConnectorPartitioningHandle connectorPartitioningHandle = metadata.getPartitioningHandleForCteMaterialization(session.toConnectorSession(connectorId), partitionCount, partitionTypes);
ConnectorTransactionHandle transaction = catalogMetadata.getTransactionHandleFor(connectorId);
return new PartitioningHandle(Optional.of(connectorId), Optional.of(transaction), connectorPartitioningHandle);
}

@Override
public Optional<Object> getInfo(Session session, TableHandle handle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.google.common.collect.ImmutableMap;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -93,7 +94,7 @@ public static TableScanNode createTemporaryTableScan(
TableHandle tableHandle,
List<VariableReferenceExpression> outputVariables,
Map<VariableReferenceExpression, ColumnMetadata> variableToColumnMap,
PartitioningMetadata expectedPartitioningMetadata)
Optional<PartitioningMetadata> expectedPartitioningMetadata)
{
Map<String, ColumnHandle> columnHandles = metadata.getColumnHandles(session, tableHandle);
Map<VariableReferenceExpression, ColumnMetadata> outputColumns = outputVariables.stream()
Expand All @@ -106,13 +107,14 @@ public static TableScanNode createTemporaryTableScan(
TableLayoutResult selectedLayout = metadata.getLayout(session, tableHandle, Constraint.alwaysTrue(), Optional.of(outputColumnHandles));
verify(selectedLayout.getUnenforcedConstraint().equals(TupleDomain.all()), "temporary table layout shouldn't enforce any constraints");
verify(!selectedLayout.getLayout().getColumns().isPresent(), "temporary table layout must provide all the columns");
TableLayout.TablePartitioning expectedPartitioning = new TableLayout.TablePartitioning(
expectedPartitioningMetadata.getPartitioningHandle(),
expectedPartitioningMetadata.getPartitionColumns().stream()
.map(columnHandles::get)
.collect(toImmutableList()));
verify(selectedLayout.getLayout().getTablePartitioning().equals(Optional.of(expectedPartitioning)), "invalid temporary table partitioning");

if (expectedPartitioningMetadata.isPresent()) {
TableLayout.TablePartitioning expectedPartitioning = new TableLayout.TablePartitioning(
expectedPartitioningMetadata.get().getPartitioningHandle(),
expectedPartitioningMetadata.get().getPartitionColumns().stream()
.map(columnHandles::get)
.collect(toImmutableList()));
verify(selectedLayout.getLayout().getTablePartitioning().equals(Optional.of(expectedPartitioning)), "invalid temporary table partitioning");
}
Map<VariableReferenceExpression, ColumnHandle> assignments = outputVariables.stream()
.collect(toImmutableMap(identity(), variable -> columnHandles.get(outputColumns.get(variable).getName())));

Expand All @@ -139,6 +141,11 @@ public static Map<VariableReferenceExpression, ColumnMetadata> assignTemporaryTa
return result.build();
}

public static Map<VariableReferenceExpression, ColumnMetadata> assignTemporaryTableColumnNames(Collection<VariableReferenceExpression> outputVariables)
{
return assignTemporaryTableColumnNames(outputVariables, Collections.emptyList());
}

public static BasePlanFragmenter.PartitioningVariableAssignments assignPartitioningVariables(VariableAllocator variableAllocator,
Partitioning partitioning)
{
Expand Down Expand Up @@ -169,7 +176,6 @@ public static TableFinishNode createTemporaryTableWriteWithoutExchanges(
TableHandle tableHandle,
List<VariableReferenceExpression> outputs,
Map<VariableReferenceExpression, ColumnMetadata> variableToColumnMap,
PartitioningMetadata partitioningMetadata,
VariableReferenceExpression outputVar)
{
SchemaTableName schemaTableName = metadata.getTableMetadata(session, tableHandle).getTable();
Expand All @@ -181,19 +187,8 @@ public static TableFinishNode createTemporaryTableWriteWithoutExchanges(
Set<VariableReferenceExpression> outputNotNullColumnVariables = outputs.stream()
.filter(variable -> variableToColumnMap.get(variable) != null && !(variableToColumnMap.get(variable).isNullable()))
.collect(Collectors.toSet());
PartitioningHandle partitioningHandle = partitioningMetadata.getPartitioningHandle();
List<String> partitionColumns = partitioningMetadata.getPartitionColumns();
Map<String, VariableReferenceExpression> columnNameToVariable = variableToColumnMap.entrySet().stream()
.collect(toImmutableMap(entry -> entry.getValue().getName(), Map.Entry::getKey));
List<VariableReferenceExpression> partitioningVariables = partitionColumns.stream()
.map(columnNameToVariable::get)
.collect(toImmutableList());
PartitioningScheme partitioningScheme = new PartitioningScheme(
Partitioning.create(partitioningHandle, partitioningVariables),
outputs,
Optional.empty(),
false,
Optional.empty());
return new TableFinishNode(
source.getSourceLocation(),
idAllocator.getNextId(),
Expand All @@ -208,7 +203,7 @@ public static TableFinishNode createTemporaryTableWriteWithoutExchanges(
outputs,
outputColumnNames,
outputNotNullColumnVariables,
Optional.of(partitioningScheme),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private PlanNode createRemoteMaterializedExchange(ExchangeNode exchange, Rewrite
temporaryTableHandle,
exchange.getOutputVariables(),
variableToColumnMap,
partitioningMetadata);
Optional.of(partitioningMetadata));

checkArgument(
!exchange.getPartitioningScheme().isReplicateNullsAndAny(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
package com.facebook.presto.sql.planner.optimizations;

import com.facebook.presto.Session;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.PartitioningMetadata;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.PrestoException;
Expand All @@ -31,28 +29,22 @@
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.BasePlanFragmenter;
import com.facebook.presto.sql.planner.Partitioning;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.facebook.presto.SystemSessionProperties.getCteHashPartitionCount;
import static com.facebook.presto.SystemSessionProperties.getCtePartitioningProviderCatalog;
import static com.facebook.presto.SystemSessionProperties.isCteMaterializationApplicable;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables;
import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames;
import static com.facebook.presto.sql.TemporaryTableUtil.createTemporaryTableScan;
import static com.facebook.presto.sql.TemporaryTableUtil.createTemporaryTableWriteWithoutExchanges;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -126,34 +118,16 @@ public PlanNode visitCteProducer(CteProducerNode node, RewriteContext<PhysicalCt
isPlanRewritten = true;
// Create Table Metadata
PlanNode actualSource = node.getSource();
// The cte will be bucketed on the first column currently
VariableReferenceExpression partitionVariable = actualSource.getOutputVariables()
.get(0);
List<Type> partitioningTypes = Arrays.asList(partitionVariable.getType());
String partitioningProviderCatalog = getCtePartitioningProviderCatalog(session);
// First column is taken as the partitioning column
Partitioning partitioning = Partitioning.create(
metadata.getPartitioningHandleForCteMaterialization(session, partitioningProviderCatalog,
getCteHashPartitionCount(session), partitioningTypes),
Arrays.asList(partitionVariable));
BasePlanFragmenter.PartitioningVariableAssignments partitioningVariableAssignments
= assignPartitioningVariables(variableAllocator, partitioning);
Map<VariableReferenceExpression, ColumnMetadata> variableToColumnMap =
assignTemporaryTableColumnNames(actualSource.getOutputVariables(),
partitioningVariableAssignments.getConstants().keySet());
List<VariableReferenceExpression> partitioningVariables = partitioningVariableAssignments.getVariables();
List<String> partitionColumns = partitioningVariables.stream()
.map(variable -> variableToColumnMap.get(variable).getName())
.collect(toImmutableList());
PartitioningMetadata partitioningMetadata = new PartitioningMetadata(partitioning.getHandle(), partitionColumns);

assignTemporaryTableColumnNames(actualSource.getOutputVariables());
TableHandle temporaryTableHandle;
try {
temporaryTableHandle = metadata.createTemporaryTable(
session,
partitioningProviderCatalog,
ImmutableList.copyOf(variableToColumnMap.values()),
Optional.of(partitioningMetadata));
Optional.empty());
context.get().put(node.getCteId(),
new PhysicalCteTransformerContext.TemporaryTableInfo(
createTemporaryTableScan(
Expand All @@ -164,7 +138,7 @@ public PlanNode visitCteProducer(CteProducerNode node, RewriteContext<PhysicalCt
temporaryTableHandle,
actualSource.getOutputVariables(),
variableToColumnMap,
partitioningMetadata), node.getOutputVariables()));
Optional.empty()), node.getOutputVariables()));
}
catch (PrestoException e) {
if (e.getErrorCode().equals(NOT_SUPPORTED.toErrorCode())) {
Expand All @@ -185,7 +159,6 @@ public PlanNode visitCteProducer(CteProducerNode node, RewriteContext<PhysicalCt
temporaryTableHandle,
actualSource.getOutputVariables(),
variableToColumnMap,
partitioningMetadata,
node.getRowCountVariable());
}

Expand Down
Loading

0 comments on commit 37c9f33

Please sign in to comment.