Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SIP-143] Global Async Task Framework #29839

Open
villebro opened this issue Aug 2, 2024 · 17 comments
Open

[SIP-143] Global Async Task Framework #29839

villebro opened this issue Aug 2, 2024 · 17 comments
Labels
change:backend Requires changing the backend change:frontend Requires changing the frontend global:async-query Related to Async Queries feature sip Superset Improvement Proposal

Comments

@villebro
Copy link
Member

villebro commented Aug 2, 2024

[SIP-143] Proposal for Global Async Task Framework

Motivation

Note: This replaces [SIP-141] Global Async Queries 2.0 which aimed at completing [SIP-39] Global Async Query Support.

Proposed Change

Superset currently has varied and and often times opaque solutions for executing async operations (all require Celery):

  • SQL Lab supports async query execution, and utilizes long polling for checking for results
  • Thumbnails and Alerts & Reports are executed as background Celery tasks that are scheduled by Celery Beat
  • Chart queries can be executed async by enabling the GAQ feature flag, and supports both WebSocket and long polling
  • Scheduled cron jobs (added via CeleryConfig in superset_config.py)

Currently none of the above support deduplication or cancellation, or even viewing which tasks are queued/executing. Especially troublesome is executing long running queries synchronously in the webworkers: this can lead to the web worker becoming unresponsive if many long running queries are running simultaneously. This has lead to many orgs having to extend the webserver timeouts so as to not time out long running queries. Moving these to the async workers will both free up the webworkers, and make it possible to decrease webworker timeouts significantly, while simultaneously being able to support arbitrarily long query execution times.

In addition, beyond sharing Celery as the execution backend, there is limited sharing of code, like utils or ORM models, which has lead to significant code duplication. This both increases the risk of regressions, limits reusability of good functionality and adds significant maintenance burden.

For this reason this SIP recommends adding a new Global Async Task Framework (GATF), which will introduce the following:

  • A new ORM model with a menu which makes it possible to view and cancel queued or executing tasks. Admins will have access to all tasks, while regular users will only be able to view tasks they have spawned. This model will be ephemeral by nature, i.e. the task entries will be removed once they are completed.
  • Add locking for all tasks to ensure deduplication. This applies particularly to async chart queries and thumbnails, which currently can cause significant resource waste.
  • Deprecate long polling in both chart and SQL Lab queries - going forward only WebSockets would be supported.

New or Changed Public Interfaces

Model

A new ORM model will be introduced for async tasks with a string based identifier. When a new task is created, an entry is added to the table if it's not already there. For instance, for thumbnails, we would use the digest as the identifier. And for chart queries, we would use the cache key and so on. If the entry is already there, we consider the task already locked, and don't schedule a new one. The model will look as follows:

class AsyncTask(Base):
    __tablename__ = "async_tasks"

    id = Column(Integer, primary_key=True)
    task_id = Column(String(256), unique=True, nullable=False, index=True)
    task_type = Column(Enum(..., name="task_status"), nullable=False)
    task_name = Column(String(256), nullable=False)
    status = Column(Enum("PENDING", "IN_PROGRESS", "SUCCESS", "REVOKED", "FAILURE", name="task_status"), nullable=False)
    created_at = Column(DateTime, nullable=False)
    updated_at = Column(DateTime, nullable=False)
    ended_at = Column(DateTime, nullable=False)
    error = Column(String, nullable=True)
    state = Column(Text, nullable=True)  # JSON serializable

As pet SIP-43, we'll introduce at least a DAO for this (maybe also a set of commands). For abstracting the main GATF logic, a new decorator will be introduced, which wraps the task in a thread that can be killed as needed (the final implementation will look different, this is just to give an understanding of the main logic):

TASK_SLEEP_INTERVAL_SEC = 5

def async_task(f: Callable[..., Any]):
    @wraps(f: Callable[..., Any])
    def wrapper(*args, **kwargs):
        task_id = kwargs.get("task_id")
        if not task_id:
            raise ValueError("task_id is required for cancelable tasks")

        task = AsyncTask.query.filter_by(id=task_id).one_or_none()
        if task is None:
            raise Exception(f"Task not found: {task_id}")
        if task.status != TaskStatus.PENDING:
            raise Exception("Task {task_id} is already in progress, current status: {task.status}")
        task.status = TaskStatus.IN_PROGRESS
        db.session.commit()

        cancel_event = threading.Event()

        def monitor_status():
            while not cancel_event.is_set():
                task = AsyncTask.query.filter_by(id=task_id).one_or_none()
                if task is None:
                    cancel_event.set()
                    break
                if task.status == TaskStatus.REVOKED:
                    cancel_event.set()
                    task.delete()
                    db.session.commit()
                    break
                time.sleep(TASK_SLEEP_INTERVAL_SEC)

        monitor_thread = threading.Thread(target=monitor_status)
        monitor_thread.start()

        try:
            f(*args, cancel_event=cancel_event, **kwargs)
        except Exception as e:
            task.delete()
            db.session.commit()
            monitor_thread.join()
            raise e

        task.delete()
        db.session.commit()
        monitor_thread.join()

    return wrapper

and when used, the task will just be decorated as follows:

@celery_app.task(name="my_task")
@async_task
def my_task(task_id: str, cancel_event: threading.Event) -> None:
   # add logic here that checks cancel_event periodically

Notification method

We propose making WebSockets the sole mechanism for broadcasting task completion. This means that we will remove long polling support from async chart queries, and replace long polling in SQL Lab with WebSockets.

Frontend changes

Charts will display the current task status, and have a simple mechanism for cancelling queries if needed:
image

New dependencies

None - however, going forward, the WebSocket server will be mandatory for both SQL Lab and async chart queries.

Migration Plan and Compatibility

Phase 1 - Thumbnails

As a first step, we migrate Thumbnails to GATF, as they tend to be fairly long running tasks that currently lack deduplication. In addition, the main thumbnail rendering functionality is fairly straight forward, and will not require extensive changes. Migrating Thumbnails will require implementing all interfaces, like the GATF ORM model, UI, decorators/context managers. At the end of this phase, thumbnail rendering will be deduplicated, and Admins will be able to both see and cancel running Thumbnail tasks via the Async task list view.

Phase 2 - GAQ

In the second phase, we clean up GAQ by simplifying the architecture (see details about redundant form data cache keys etc from SIP-141) and remove long polling support. We will also migrate the GAQ metadata from Redis to the new ORM model.

At the end of this phase, we will have removed long polling from GAQ, and will have both chart query deduplication and cancellation support.

Phase 3 - The rest

In the final phase, we migrate the remaining async tasks to GATF. This mainly covers Alerts & Reports, but also any tasks that are triggered via Celery Beat that implement the provided context managers/decorators, like cache warmup, log rotation etc. At the end of this phase, and it will be possible to cancel any queued or running async task via the UI.

Rejected Alternatives

SIP-39 and SIP-141 were rejected in favor of making a more general purpose Async Task Framework.

@villebro villebro added the sip Superset Improvement Proposal label Aug 2, 2024
@dosubot dosubot bot added change:backend Requires changing the backend global:async-query Related to Async Queries feature labels Aug 2, 2024
@villebro
Copy link
Member Author

villebro commented Aug 5, 2024

Summary from meeting today (@mistercrunch, @michael-s-molina and myself in attendance):

  • @villebro proposed migrating Thumbnails to the new framework first, as it would be quite straight forward to migrate, and is known to cause significant resource waste due to missing locking.
  • @mistercrunch proposed investigating if Celery could be replaced with something more actively maintained, like Dask. Some reasons for considering this include:
    • Lack of active maintenance of Celery
    • Lack of advanced features that are available in Dask
    • Opaque design of Celery, esp. the Kombu transport layer
  • @villebro noted that it may be difficult to find a replacement for Celery that works well with Flask. However, we will investigate if this would indeed be possible, e.g. by extracting the needed Flask properties during task creation and placing them in the payload, or proxying/wrapping them so that Flask APIs are only used when available (e.g. g, current_app, request).
  • @michael-s-molina proposed implementing the APIs for the new framework on Celery, and later replacing it with something else, if feasible. @villebro noted that the choice of scheduler may well affect the API design, as the public APIs of the scheduler will likely affect the final API, hence preferring to do the assessment upfront.
  • @michael-s-molina wanted to emphasize, that if we do introduce Dask (or another replacement for Celery), we need to commit to a deprecation and removal plan that concludes in the full removal of Celery in a future major version of Superset.

@zhaoyongjie
Copy link
Member

zhaoyongjie commented Aug 6, 2024

@villebro Thanks for document the SIP. I want to post a bit thought in here.

I don't think we should replace Celery as task queue. The reasons are:

Celery is a task queue but the Dask is a parallel computing framework. I'm not familiar with Dask, just have a look its document, it looks like Spark but in Python implementation so it should be different use case for a task queue.

Lack of active maintenance of Celery

Celery is activated, the latest version released at Apr 17, 2024.

Lack of advanced features that are available in Dask

Which advanced features should we want to use?

@villebro
Copy link
Member Author

villebro commented Aug 6, 2024

Thanks for the comments @zhaoyongjie . I think you raised a lot of good points, really appreciate the feedback 👍

@villebro Thanks for document the SIP. I want to post a bit thought in here.

I don't think we should replace Celery as task queue. The reasons are:

Celery is a task queue but the Dask is a parallel computing framework. I'm not familiar with Dask, just have a look its document, it looks like Spark but in Python implementation so it should be different use case for a task queue.

After digesting this a bit more last night, I'm also starting to lean back in the direction of Celery for now. While there may be use cases for a full DAG-style computing framework (e.g. caching chart data to before triggering the dashboard or something to that nature), I think the focus of the Dask project is slightly misaligned with what we're looking for. Regarding the other alternatives (Redis Queue, Dramatiq, Huey, APScheduler et al), I don't think any of them offer a clear improvement over Celery. So I agree, I'm kind of feeling like staying on Celery for now, and postponing the architectural overhaul to a later date when there's a clesr need for it.

Lack of active maintenance of Celery

Celery is activated, the latest version released at Apr 17, 2024.

I was going to say Celery has a long standing bug where the workers silently stop consuming tasks since many years, and this has served as an indicator of stagnation to me, as it has caused significant issues for us and required ugly workarounds: celery/celery#7276. However, it seems this bug has finally been fixed a few weeks ago as of Celery 5.5.0b1 (!). See Redis broker stability notes here: https://github.com/celery/celery/releases/tag/v5.5.0b1. So maybe things aren't as gloomy as I had thought, and things are slowly moving forward.

Lack of advanced features that are available in Dask

Which advanced features should we want to use?

I think one of the main features Celery lacks is proper task cancellation, which is critical to us (Dask has good mechanisms for this). Also, Dask has some nice autoscaling features which makes it possible to handle bursty operations better than Celery. Finally, Dask is newer, so it incorporates many of the mechanisms that were not so relevant during Celery's inception. Granted, many of them may not be critical for us right now (e.g. graph/DAG support), but could have use cases when they become available.

@giftig
Copy link
Contributor

giftig commented Aug 7, 2024

@villebro what do you mean by "proper" task cancellation here? Celery does support "revoking" tasks: https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks

It's been a while since I've done a lot of work with celery but generally I've found it fairly reliable if properly configured.

I'm not familiar with Dask though, so can't comment on what advantages it might bring.

@mistercrunch
Copy link
Member

"proper" task cancellation

I'm guessing it's about running a certain routine when that happens. Some databases don't cancel the query on client disconnections, and would require us to run a routine of to kill the query. Something as simple as the STOP button in SQL Lab would have to hit an endpoint that would celery.revoke(task_id, terminate=True), and rely on some except

Conversing with GPT it seems there are ways to configure and catch things:
Screenshot 2024-08-07 at 1 59 44 PM

In my experience (mostly around Airflow's CeleryExecutor) it's common to end up with things not firing as expected, zombie tasks (Superset think the job is dead but still running somewhere on a worker machine), undead task (Superset the job is running, but nothing is running), forcing us to double-check the state. Seems like the framework is built for fire-and-forget typical web-scale workloads, as opposed to long-running tasks with good support for cancelation

@mistercrunch
Copy link
Member

mistercrunch commented Aug 7, 2024

Trying to formalize the Model (?):

class AsyncTask(Base):
    __tablename__ = 'async_tasks'

    id = Column(Integer, primary_key=True)
    task_id = Column(String(256), unique=True, nullable=False, index=True)
    task_type = Column(Enum(..., name='task_status'), nullable=False)
    task_name = Column(String(256), nullable=False)
    status = Column(Enum('PENDING', 'IN_PROGRESS', 'SUCCESS', 'REVOKED', 'FAILURE', name='task_status'), nullable=False)
    created_at = Column(DateTime, nullable=False)
    updated_at = Column(DateTime, nullable=False)
    ended_at = Column(DateTime, nullable=False)
    error = Column(String, nullable=True)
    extra_json = Column(Text, nullable=True)

@mistercrunch
Copy link
Member

About this SIP, I wanted to highlight that there is a significant opportunity to formalize and eventually isolate the "Data Access Layer" (DAL) within Superset, exposing it as a set of APIs and related backend services. The idea would be to evolve it toward a robust, independent service that facilitates not only Superset but other apps to access data, while offering mechanisms to authenticate, authorize & limit access (RLS), cache, and audit access to data within your organization.

First step in that direction is to organize the code in a way that clarifies what is and what's not part of the DAL. Something to keep in mind while working on the async framework.

@villebro
Copy link
Member Author

villebro commented Aug 9, 2024

Trying to formalize the Model (?):

I think this looks generally ok. Maybe we could just rename extra_json to payload or state (JSON serializable ofc), as I assume all tasks will have some state. Also, I find Enum works pretty badly on ORM tables, so we may just want to use IntEnum in the backend and have INT on the ORM table (we would call it status_id on the table, and then have a property status on the SQLA model that maps the INT value to a proper Python Enum). Finally, I think we should add a type field (I assume name will be some human readable representation, like "Email report of 'My Dashboard'"), with the following Enum-type values (this list is probably non-exhaustive, but you get the idea):

  • CHART_QUERY
  • SQLLAB_QUERY
  • THUMBNAIL
  • ALERTS_REPORTS
  • LOG_ROTATE
  • CACHE_WARMUP
  • CUSTOM (this would be for used defined tasks that could be cronned in the CeleryConfig or whichever scheduler we use).

@mistercrunch
Copy link
Member

mistercrunch commented Aug 9, 2024

Oh right, added task_type, and your feedback is all reasonable, feel free to edit my comment or to promote to the main body of the issue. About enums, personally I prefer a human-readable in-database for things like status that are pretty static/finite to avoid having to refer to the code or FK while referring directly to the database. I know it's less efficient and all, but that's for databases to figure out how to optimize/compress/dict-encode.

@villebro
Copy link
Member Author

Sounds good - also, given that this table will likely be ephemeral, INT vs VARCHAR for the type likely doesn't make any difference. I'll integrate the final proposal into the body 👍

@villebro
Copy link
Member Author

@mistercrunch I've moved the ORM model to the body and made some general updates. Feel free to make updates if needed. I think this is approaching votability - WDYT @rusackas @michael-s-molina ?

@rusackas rusackas changed the title [SIP] Global Async Task Framework [SIP-143] Global Async Task Framework Aug 15, 2024
@rusackas
Copy link
Member

rusackas commented Aug 15, 2024

I think this SIP is lookin' good! I think the SIP (now numbered 143) is most definitely ready for a [DISCUSS] thread, and likely ready for voting if there aren't any dealbreakers that turn up from that.

@villebro
Copy link
Member Author

Notes from meeting this Monday ( @rusackas @michael-s-molina and I in attendance):

  • We discussed if the Async Task list view would need to be real time. @villebro argued that it's out of scope, and that we can consider that as a different SIP (something like "Make React based FAB list views real time")
  • @rusackas and @michael-s-molina asked to add something on how the async task status should be handled in the UI. @villebro will be adding details about this.
  • Since the ORM table is going to be ephemeral (=entries are deleted once a task is no longer queued or executing), the question came up of how to have a persistent log of these events. @villebro proposed not adding a separate history table, but rather logging to the EVENT_LOGGER. Additionally, we could also introduce new StatsD metrics to make it possible to do high level metrics monitoring.

@villebro
Copy link
Member Author

I think this SIP is lookin' good! I think the SIP (now numbered 143) is most definitely ready for a [DISCUSS] thread, and likely ready for voting if there aren't any dealbreakers that turn up from that.

Thanks @rusackas - I'll do the final changes to the body of the SIP this week and trigger the discuss once those are in place!

@villebro
Copy link
Member Author

FYI: DISCUSS sent to Dev list: https://lists.apache.org/thread/ytv9vx7wrk07xxjg4m4kx5rx0zl5whx1

@villebro
Copy link
Member Author

FYI: VOTE sent to Dev list: https://lists.apache.org/thread/b39obsb2qmqbo8k9twj9b17pfb42c6qr

@villebro
Copy link
Member Author

villebro commented Oct 1, 2024

Results on the Dev list: https://lists.apache.org/thread/kj0odbyfcomohp2p0kkc2d8j7679z8db

The summary from the result message:

The vote PASSED with 7 binding +1, 1 non-binding +1, and 0 -1 votes:

+1 binding votes:
Max
Evan
Kamil
Michael
Sam
Tai
Diego

+1 non-binding votes
Emmanuel

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
change:backend Requires changing the backend change:frontend Requires changing the frontend global:async-query Related to Async Queries feature sip Superset Improvement Proposal
Projects
Status: [RESULT][VOTE] Approved
Development

No branches or pull requests

5 participants