Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pid check #24636

Merged
merged 6 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,13 @@ def heartbeat_callback(self, session=None):
recorded_pid = ti.pid
same_process = recorded_pid == current_pid

if ti.run_as_user or self.task_runner.run_as_user:
if recorded_pid is not None and (ti.run_as_user or self.task_runner.run_as_user):
# when running as another user, compare the task runner pid to the parent of
# the recorded pid because user delegation becomes an extra process level.
# However, if recorded_pid is None, pass that through as it signals the task
# runner process has already completed and been cleared out. `psutil.Process`
# uses the current process if the parameter is None, which is not what is intended
# for comparison.
recorded_pid = psutil.Process(ti.pid).ppid()
same_process = recorded_pid == current_pid

Expand Down
30 changes: 30 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ def test_localtaskjob_heartbeat(self, dag_maker):
with pytest.raises(AirflowException):
job1.heartbeat_callback()

# Now, set the ti.pid to None and test that no error
# is raised.
ti.pid = None
session.merge(ti)
session.commit()
assert ti.pid != job1.task_runner.process.pid
assert not ti.run_as_user
assert not job1.task_runner.run_as_user
job1.heartbeat_callback()

@mock.patch('subprocess.check_call')
@mock.patch('airflow.jobs.local_task_job.psutil')
def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker):
Expand Down Expand Up @@ -196,6 +206,16 @@ def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker
with pytest.raises(AirflowException, match='PID of job runner does not match'):
job1.heartbeat_callback()

# Here we set the ti.pid to None and test that no error is
# raised
ti.pid = None
session.merge(ti)
session.commit()
assert ti.run_as_user
assert job1.task_runner.run_as_user == ti.run_as_user
assert ti.pid != job1.task_runner.process.pid
job1.heartbeat_callback()

@conf_vars({('core', 'default_impersonation'): 'testuser'})
@mock.patch('subprocess.check_call')
@mock.patch('airflow.jobs.local_task_job.psutil')
Expand Down Expand Up @@ -239,6 +259,16 @@ def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, _,
with pytest.raises(AirflowException, match='PID of job runner does not match'):
job1.heartbeat_callback()

# Now, set the ti.pid to None and test that no error
# is raised.
ti.pid = None
session.merge(ti)
session.commit()
assert job1.task_runner.run_as_user == 'testuser'
assert ti.run_as_user is None
assert ti.pid != job1.task_runner.process.pid
job1.heartbeat_callback()

def test_heartbeat_failed_fast(self):
"""
Test that task heartbeat will sleep when it fails fast
Expand Down