Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long-running task blocks other tasks with LocalExecutor #11331

Closed
grayver opened this issue Oct 7, 2020 · 6 comments
Closed

Long-running task blocks other tasks with LocalExecutor #11331

grayver opened this issue Oct 7, 2020 · 6 comments
Labels
area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug

Comments

@grayver
Copy link

grayver commented Oct 7, 2020

Apache Airflow version: 1.10.12

Environment:

  • Cloud provider or hardware configuration: Amazon EC2 instance, 4 CPU cores, 8GB RAM
  • OS (e.g. from /etc/os-release): Ubuntu 18.04.5 LTS (Bionic Beaver)
  • Kernel (e.g. uname -a): Linux ip-XX-XX-XX-XX.ec2.internal 5.4.0-1025-aws Tutorial improvements. #25~18.04.1-Ubuntu SMP Fri Sep 11 12:03:04 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
  • Install tools: Ansible Airflow role (https://github.com/idealista/airflow-role)
  • Some Airflow configuration parameters:
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://user:pwd@aws-rds-server:5432/airflow_dev2
sql_alchemy_pool_size = 20
parallelism = 32
dag_concurrency = 8
task_concurrency = 4
non_pooled_task_slot_count = 128
task_runner = StandardTaskRunner
max_threads = 4

What happened:

We have 13 DAGs in our Airflow. Some of them in some circumstances process a large amount of data. Usually it's a parsing some large file, transform parsed data and load it into database. Also there are database processing tasks which involve long-running queries. So, some tasks could be running for several hours sometimes. The problem is those long-running tasks block all other tasks from being started. Tasks which are scheduled to run hourly are not started until long-running task is completed. Also we see an yellow bar in our Airflow Web UI:

The scheduler does not appear to be running. Last heartbeat was received XX minutes ago.
The DAGs list may not update, and new tasks will not be scheduled.

We examined Airflow scheduler logs and figured out that scheduler just doesn't try to grab new tasks while long-running task is running. When there is no long-running task running we see that scheduler tries to check whether any task could run and check parallelism/concurrency limitation for them. But with long-running task there are no log messages like this.

Manual triggering also doesn't help - triggered tasks are not started until long-running task is finished.

What you expected to happen:

We expect all other DAGs to start according to their schedule when long-running task is running. This how LocalExecutor should work according to documentation.

We also checked server resources for those cases - but there are a lot of free RAM and CPU in that time, so it shoudn't be the cause.

Anything else we need to know:

This problem occurs every time.

@grayver grayver added the kind:bug This is a clearly a bug label Oct 7, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 7, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@turbaszek
Copy link
Member

Have you tried increasing parallelism or setting it to 0?

On the other note, have you considered using CeleryExecutor? Then you can have separate queues for long and short running tasks.

@turbaszek turbaszek added the area:Scheduler including HA (high availability) scheduler label Oct 7, 2020
@grayver
Copy link
Author

grayver commented Oct 7, 2020

I've just tried setting parallelism to 0 - it doesn't help.
No, I haven't considered CeleryExecutor. We don't have many DAGs and I thought that resources of single machine should be enough for that.

@turbaszek
Copy link
Member

How many DAGs/tasks do you have? Would you be able to provide a replicable setup using BashOperators that sleep for as long as your long-running tasks?

@grayver
Copy link
Author

grayver commented Oct 8, 2020

We have 13 DAGs, each has ~5 tasks (so, ~70 tasks total). Most of those DAGs run hourly (with some time offset). In most cases DAG just checks for new files to grab and does nothing if no new files found. When DAG discovers new unprocessed file, it grabs it, parses it, load parsed data into database and call process function there. In that case it could take some time (up to few hours).

I've prepared reproduce code with 2 DAGs: one long-running DAG and second is short-running DAG. When long-running DAG is running no new short-running DAG runs are scheduled and started (and UI reports that the scheduler doesn't appear to be running).

blocking_reproduce_dag.py
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule

long_dag = DAG(
    dag_id='long_dag',
    schedule_interval='@hourly',
    start_date=days_ago(1),
    catchup=False
)

check_files_task = BranchPythonOperator(
    task_id='check_files', dag=long_dag,
    python_callable=lambda: 'parse_files'
)

parse_files_task = BashOperator(
    task_id='parse_files', dag=long_dag,
    bash_command='sleep 20m'
)

process_files_task = BashOperator(
    task_id='process_files', dag=long_dag,
    bash_command='sleep 15m'
)

slack_report_task = DummyOperator(
    task_id='slack_report', dag=long_dag,
    trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED
)

check_files_task >> parse_files_task >> process_files_task >> slack_report_task
check_files_task >> slack_report_task


short_dag = DAG(
    dag_id='short_dag',
    schedule_interval='*/5 * * * *',
    start_date=days_ago(1),
    catchup=False,
    max_active_runs=1
)

query_service_task = BashOperator(
    task_id='query_service', dag=short_dag,
    bash_command='sleep 30s'
)

do_something_task = DummyOperator(
    task_id='do_something', dag=short_dag
)

query_service_task >> do_something_task

@grayver
Copy link
Author

grayver commented Oct 13, 2020

It seems to be I figured out what the problem is. We have Airflow scheduler installed as a daemon service on Ubuntu machine. Service command is following:

/bin/bash -c 'source /home/airflow/airflow_venv/bin/activate && /home/airflow/airflow scheduler -n ${SCHEDULER_RUNS} --pid /run/airflow/scheduler.pid'

SCHEDULER_RUNS is set to 5 in environment variables. So, scheduler starts, makes 5 loops of scanning DAG files and then stops. Linux daemon restart policy automatically restart it again. However, if long-running task is still running after service restart (it's not stopped) scheduler doesn't grab new tasks until it finishes.

Setting SCHEDULER_RUNS to -1 solved the issue.

@ashb ashb closed this as completed Oct 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

3 participants