Skip to content

Commit

Permalink
fix(celery): close celery.apply spans even without after_task_publi…
Browse files Browse the repository at this point in the history
…sh, when using apply_async (#10676)

The instrumentation for the Celery integration relies on various [Celery
signals ](https://docs.celeryq.dev/en/stable/userguide/signals.html) in
order to start and end the span when calling on `apply_async`.

The integration can fail if the expected signals don't trigger, which
can lead to broken context propagation (and unexpected traces).

**Example:**
- dd-trace-py expects the signal `before_task_publish` to start the span
then `after_task_publish` to close the span. If the `after_task_publish`
signal never gets called (which can happen if a Celery exception occurs
while processing the app), then the span won't finish.
- The same thing above can also happen to `task_prerun` and
`task_postrun`.

**Solution**

This PR patches `apply_async` so that there is a check to see if there
is a span lingering around and closes it when `apply_task` is called.

If an internal exception happens, the error will be marked on the
`celery.apply` span.

To track this, I added new logs in debug mode:
> The after_task_publish signal was not called, so manually closing span

and 
> The task_postrun signal was not called, so manually closing span


There's a related PR #10848
that works to improve how we extract information based on the protocols,
that also affects when spans get closed or not.

Special Thanks:
- Thanks to @tabgok for going through this with me in great detail!
- @timmc-edx for helping us track it down!

[APMS-13158]

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)


APMS-13158

[APMS-13158]:
https://datadoghq.atlassian.net/browse/APMS-13158?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

---------

Co-authored-by: Emmett Butler <723615+emmettbutler@users.noreply.github.com>
  • Loading branch information
wantsui and emmettbutler authored Oct 1, 2024
1 parent 6da4590 commit 0d28e08
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 0 deletions.
59 changes: 59 additions & 0 deletions ddtrace/contrib/internal/celery/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sys

import celery
from celery import signals

Expand All @@ -15,9 +17,14 @@
from ddtrace.contrib.internal.celery.signals import trace_retry
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
from ddtrace.pin import _DD_PIN_NAME


log = get_logger(__name__)


def patch_app(app, pin=None):
"""Attach the Pin class to the application and connect
our handlers to Celery signals.
Expand All @@ -41,6 +48,9 @@ def patch_app(app, pin=None):
trace_utils.wrap("celery.beat", "Scheduler.tick", _traced_beat_function(config.celery, "tick"))
pin.onto(celery.beat.Scheduler)

# Patch apply_async
trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async"))

# connect to the Signal framework
signals.task_prerun.connect(trace_prerun, weak=False)
signals.task_postrun.connect(trace_postrun, weak=False)
Expand All @@ -65,6 +75,7 @@ def unpatch_app(app):

trace_utils.unwrap(celery.beat.Scheduler, "apply_entry")
trace_utils.unwrap(celery.beat.Scheduler, "tick")
trace_utils.unwrap(celery.app.task.Task, "apply_async")

signals.task_prerun.disconnect(trace_prerun)
signals.task_postrun.disconnect(trace_postrun)
Expand Down Expand Up @@ -96,3 +107,51 @@ def _traced_beat_inner(func, instance, args, kwargs):
return func(*args, **kwargs)

return _traced_beat_inner


def _traced_apply_async_function(integration_config, fn_name, resource_fn=None):
"""
When apply_async is called, it calls various Celery signals in order, which gets used
to start and close the span.
Example: before_task_publish starts the span while after_task_publish closes the span.
If an exception occurs anywhere inside Celery or its dependencies, this can interrupt the
closing signals.
The purpose of _traced_apply_async_function is to close the spans even if one of the closing
signals don't get called over the course of the apply_task lifecycle.
This is done by fetching the stored span and closing it if it hasn't already been closed by a
closing signal.
"""

def _traced_apply_async_inner(func, instance, args, kwargs):
with core.context_with_data("task_context"):
try:
return func(*args, **kwargs)
except Exception:
# If an internal exception occurs, record the exception in the span,
# then raise the Celery error as usual
task_span = core.get_item("task_span")
if task_span:
task_span.set_exc_info(*sys.exc_info())

prerun_span = core.get_item("prerun_span")
if prerun_span:
prerun_span.set_exc_info(*sys.exc_info())

raise
finally:
task_span = core.get_item("task_span")
if task_span:
log.debug(
"The after_task_publish signal was not called, so manually closing span: %s",
task_span._pprint(),
)
task_span.finish()

prerun_span = core.get_item("prerun_span")
if prerun_span:
log.debug(
"The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()
)
prerun_span.finish()

return _traced_apply_async_inner
7 changes: 7 additions & 0 deletions ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import net
from ddtrace.internal import core
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.logger import get_logger
from ddtrace.propagation.http import HTTPPropagator
Expand Down Expand Up @@ -48,6 +49,9 @@ def trace_prerun(*args, **kwargs):
service = config.celery["worker_service_name"]
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)

# Store an item called "prerun span" in case task_postrun doesn't get called
core.set_item("prerun_span", span)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)

Expand Down Expand Up @@ -111,6 +115,9 @@ def trace_before_publish(*args, **kwargs):
service = config.celery["producer_service_name"]
span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=service, resource=task_name)

# Store an item called "task span" in case after_task_publish doesn't get called
core.set_item("task_span", span)

span.set_tag_str(COMPONENT, config.celery.integration_name)

# set span.kind to the type of request being performed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
tracing(celery): Fixes an issue where ``celery.apply`` spans didn't close if the ``after_task_publish`` or ``task_postrun`` signals didn't get sent when using ``apply_async``, which can happen if there is an internal exception during the handling of the task. This update also marks the span as an error if an exception occurs.
29 changes: 29 additions & 0 deletions tests/contrib/celery/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import celery
from celery.exceptions import Retry
import mock
import pytest

from ddtrace import Pin
Expand Down Expand Up @@ -441,6 +442,34 @@ def run(self):
assert span.get_tag("span.kind") == "consumer"
assert span.error == 0

@mock.patch("kombu.messaging.Producer.publish", mock.Mock(side_effect=ValueError))
def test_fn_task_apply_async_soft_exception(self):
# If the underlying library runs into an exception that doesn't crash the app
# while calling apply_async, we should still close the span even
# if the closing signals didn't get called and mark the span as an error

@self.app.task
def fn_task_parameters(user, force_logout=False):
return (user, force_logout)

t = None
try:
t = fn_task_parameters.apply_async(args=["user"], kwargs={"force_logout": True})
except ValueError:
traces = self.pop_traces()
assert 1 == len(traces)
assert traces[0][0].name == "celery.apply"
assert traces[0][0].resource == "tests.contrib.celery.test_integration.fn_task_parameters"
assert traces[0][0].get_tag("celery.action") == "apply_async"
assert traces[0][0].get_tag("component") == "celery"
assert traces[0][0].get_tag("span.kind") == "producer"
# Internal library errors get recorded on the span
assert traces[0][0].error == 1
assert traces[0][0].get_tag("error.type") == "builtins.ValueError"
assert "ValueError" in traces[0][0].get_tag("error.stack")
# apply_async runs into an internal error (ValueError) so nothing is returned to t
assert t is None

def test_shared_task(self):
# Ensure Django Shared Task are supported
@celery.shared_task
Expand Down

0 comments on commit 0d28e08

Please sign in to comment.