Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Design] Recoverable Grouped Execution #12124

Closed
wenleix opened this issue Dec 21, 2018 · 3 comments
Closed

[Design] Recoverable Grouped Execution #12124

wenleix opened this issue Dec 21, 2018 · 3 comments
Labels
Roadmap A top level roadmap item stale

Comments

@wenleix
Copy link
Contributor

wenleix commented Dec 21, 2018

(A comment-friendly version of this design doc can be found at https://docs.google.com/document/d/1YhibgfzxtkjeJoYtty7R_AdBjTQdqf2nTtwQHk3kGLA/edit?usp=sharing)

Introduction

Grouped execution was introduced to Presto in #8951 to support huge join and aggregation raised in ETL pipelines.

When the input tables are already partitioned on the join key or aggregation key (e.g. bucketed table in Hive), Presto could process a subset (group) of the partitions of the data at a time. This reduces the amount of memory needed to hold the hash table. Implementation wise, for a stage with grouped execution enabled, it is further split into many “lifespan”s, where each lifespan corresponding to a table partition (e.g bucket in Hive table). Only a subset of lifespans are processed simultaneously during stage execution, configured by concurrent-lifespans-per-task.

Besides breaking the memory barrier, grouped execution also enables partial query failure recovery -- each lifespan can be retried independently when the output of the stage writes to a persistent storage. Note output from failed tasks needs to be cleaned up, as discussed later.

Preliminary

Consider the following query, where A and B are already bucketed on custkey:

SELECT ...
FROM A JOIN B 
    USING custkey

Without grouped execution, the workers will load all the data on build side (Table B):

f1

However, since A and B are already bucketed, Presto can schedule the query execution in a more intelligent way to reduce peak memory consumption: for each bucket i, joining the bucket i on table A and B can be done independently! In Presto engine, we call this computation unit a “lifespan”:

f2

Besides scaling Presto to more memory-intensive queries, grouped execution also opens opportunity for partial query failure recovery, as now each lifespan in the query are independent and can be retried independently. As illustrated in the following figure:

f3

Design

A prototype can be found in https://github.com/wenleix/presto/commits/tankbbb

1. Dynamic lifespan schedule

This is done in #11693. Before this change, lifespan are pre-allocated to tasks in a fixed way, which doesn’t work for restarted lifespan since it requires to be allocated to a different task.

Note dynamic life schedule only works when there is no remote source in the stage. In the future, we also want to add support for remote source with replicated distribution (i.e. for broadcast join)

2. Track Persistent Lifespan

In current code, SQLTaskExecution will report a lifespan as completed if all the drivers are done and there are no more inputs. However, it doesn’t check whether lifespan’s output has been delivered. In failure recovery scenario, we want to track such “persistent” lifespans which don’t need to be restarted after task failure.

There are two options to track whether a lifespan’s output is delivered:

  • Track on the sender side: make OutputBuffer aware of lifespan.
  • Track on the receiver side: make SqlStageExecution/SqlQueryExecution informed when TableFinishOperator get data from TableWriteOperator .

Another problem is how to cleanup output from failed lifespans, which will be discussed in next section.

POC based on first option can be found at wenleix@a6e89a1.

3. Support Lifespan Granularity Commit

Failed lifespans may generate temporary output, which cannot be included in the final output. Thus the ConnectorPageSink has to support partial commit.

We can support partial commit in HiveConnector in the following way:

  • The writer initially writes to files prefixed with .tmp.presto, which will be ignored by compute engine (e.g. Hive/Spark/Presto)
  • When ConnectorPageSink.finish() get called, the worker commit the partial output by remove the .tmp.presto prefix.
    • The final file name is decided by stage id and lifespan -- so different attempt will commit to the same file name.

This protocol should work with STAGE_AND_MOVE_TO_TARGET_DIRECTORY write mode.

This commit protocol only requires the underlying filesystem to implement atomic rename. This is also the approach used by MapReduce/Spark.

Note in case there are more than one tasks try to do the rename (i.e. a task considered as failed by coordinator, but it can still talk to filesystem), we cannot decide which task finally win the rename race. Thus stats cannot be updated with recoverable grouped execution for now.

4. Allow Removing Remote Source from ExchangeClient

For a failed task, its receiver stage (i.e. TableFinishOperator) needs to cancel waiting for output from it.

Note that adding this support to n-to-n exchange is inherently hard, since sending the cancellation requests to every receiver task introduced too many coordinator to worker HTTP requests in a bursty manner. (see the discussion in #11065). However, it's OK for the purpose of supporting recovery, since only n-to-1 exchange needs to be supported.

POC: wenleix@79363c1

5. Reschedule Splits for Failed Task

For restarted tasks, splits needs to be re-scheduled. A rewind() API will be added to SplitSource interface. This is trivial for FixedSplitSource since all splits are pre-loaded. It’s more sophisticated for HiveSplitSource since we start the query execution while we are still discovering splits.

For now we decided to keep all splits in memory for HiveSplitSource when running in recovery mode. Note even only with grouped execution, we are likely already buffering all the HiveInternalSplit since split discovery in bucketed mode do not block “offer”:

@Override
public ListenableFuture<?> offer(OptionalInt bucketNumber, InternalHiveSplit connectorSplit)
{
AsyncQueue<InternalHiveSplit> queue = queueFor(bucketNumber);
queue.offer(connectorSplit);
// Do not block "offer" when running split discovery in bucketed mode.
// A limit is enforced on estimatedSplitSizeInBytes.
return immediateFuture(null);
}

We have two implementation options:

  • Load all the HiveInternalSplit prior to execute the query.
  • Execute query while discovering splits, but don’t drop scheduled splits.

POC based on first option: wenleix@37ef4a4

6. Restart Failed Tasks

SqlStageExecution should coordinate task restart, such as asking StageScheduler to restart the task, remove source from TableFinishOperator stage, etc.

POC: wenleix@0af6291

Loose Ends

  • Planner will mark a fragment as dynamic bucket schedule / recoverable.
    • So in query plan, we know whether the stage supports recoverable.
    • Currently dynamic bucket schedule is decided at execution.
  • Report “wasted CPU” due to partial retry
    • Report CPU time per lifespan
  • Add batch mode (session property)
@wenleix
Copy link
Contributor Author

wenleix commented Dec 21, 2018

cc @dain , @kokosing , @findepi

@sopel39
Copy link
Contributor

sopel39 commented Dec 27, 2018

Facebook and S3 currently use DIRECT_TO_TARGET_NEW_DIRECTORY write mode, which doesn’t fit well into this model. We might want to introduce other write mode for this.

I was thinking that we could use some kind of "sub-transactions" for the failure cleanup. This would delegate the cleanup to the connector (e.g: we could use Hive 3 MVCC for S3). Then the coordinator call either:

TransactionManager#asyncCommit
or
TransactionManager#asyncAbort

If any fails, then the whole query fails. This would also remove the need for cleanup in ConnectorMetadata.finishInsert/finishCreateTable

@wenleix wenleix changed the title [Proposal] Recoverable Grouped Execution [Design] Recoverable Grouped Execution Feb 7, 2019
shixuan-fan added a commit to shixuan-fan/presto that referenced this issue Feb 21, 2019
This is part of the effort of recoverable grouped execution (prestodb#12124) to
prepare for rescheduling splits for failed tasks.
shixuan-fan added a commit that referenced this issue Feb 22, 2019
This is part of the effort of recoverable grouped execution (#12124) to
prepare for rescheduling splits for failed tasks.
@wenleix wenleix added the Roadmap A top level roadmap item label Jul 22, 2019
@stale
Copy link

stale bot commented Mar 3, 2022

This issue has been automatically marked as stale because it has not had any activity in the last 2 years. If you feel that this issue is important, just comment and the stale tag will be removed; otherwise it will be closed in 7 days. This is an attempt to ensure that our open issues remain valuable and relevant so that we can keep track of what needs to be done and prioritize the right things.

@stale stale bot added the stale label Mar 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Roadmap A top level roadmap item stale
Projects
None yet
Development

No branches or pull requests

2 participants