Skip to content

Commit

Permalink
Emit DataDog statsd metrics with metadata tags (#28961)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored Jan 20, 2023
1 parent 326b5d9 commit c0d9862
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 62 deletions.
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,13 @@ metrics:
type: string
example: ~
default: ""
statsd_datadog_metrics_tags:
description: |
Set to False to disable metadata tags for some of the emitted metrics
version_added: 2.6.0
type: boolean
example: ~
default: "True"
statsd_custom_client_path:
description: |
If you want to utilise your own custom StatsD client set the relevant
Expand Down
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ statsd_datadog_enabled = False
# List of datadog tags attached to all metrics(e.g: key1:value1,key2:value2)
statsd_datadog_tags =

# Set to False to disable metadata tags for some of the emitted metrics
statsd_datadog_metrics_tags = True

# If you want to utilise your own custom StatsD client set the relevant
# module path below.
# Note: The module path must exist on your PYTHONPATH for Airflow to pick it up
Expand Down
1 change: 1 addition & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class AirflowConfigParser(ConfigParser):
("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"),
("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"),
("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"),
("metrics", "statsd_datadog_metrics_tags"): ("scheduler", "statsd_datadog_metrics_tags", "2.6.0"),
("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"),
("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"),
("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"),
Expand Down
14 changes: 8 additions & 6 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ def set_file_paths(self, new_file_paths):
filtered_processors[file_path] = processor
else:
self.log.warning("Stopping processor for %s", file_path)
Stats.decr("dag_processing.processes")
Stats.decr("dag_processing.processes", tags={"file_path": file_path, "action": "stop"})
processor.terminate()
self._file_stats.pop(file_path)

Expand All @@ -965,7 +965,7 @@ def wait_until_finished(self):

def _collect_results_from_processor(self, processor) -> None:
self.log.debug("Processor for %s finished", processor.file_path)
Stats.decr("dag_processing.processes")
Stats.decr("dag_processing.processes", tags={"file_path": processor.file_path, "action": "finish"})
last_finish_time = timezone.utcnow()

if processor.result is not None:
Expand Down Expand Up @@ -1037,7 +1037,7 @@ def start_new_processes(self):
)

del self._callback_to_execute[file_path]
Stats.incr("dag_processing.processes")
Stats.incr("dag_processing.processes", tags={"file_path": file_path, "action": "start"})

processor.start()
self.log.debug("Started a process (PID: %s) to generate tasks for %s", processor.pid, file_path)
Expand Down Expand Up @@ -1157,8 +1157,8 @@ def _kill_timed_out_processors(self):
processor.pid,
processor.start_time.isoformat(),
)
Stats.decr("dag_processing.processes")
Stats.incr("dag_processing.processor_timeouts")
Stats.decr("dag_processing.processes", tags={"file_path": file_path, "action": "timeout"})
Stats.incr("dag_processing.processor_timeouts", tags={"file_path": file_path})
# Deprecated; may be removed in a future Airflow release.
Stats.incr("dag_file_processor_timeouts")
processor.kill()
Expand Down Expand Up @@ -1194,7 +1194,9 @@ def max_runs_reached(self):
def terminate(self):
"""Stops all running processors."""
for processor in self._processors.values():
Stats.decr("dag_processing.processes")
Stats.decr(
"dag_processing.processes", tags={"file_path": processor.file_path, "action": "terminate"}
)
processor.terminate()

def end(self):
Expand Down
18 changes: 13 additions & 5 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,9 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
timestamp=ts,
)
sla_misses.append(sla_miss)
Stats.incr("sla_missed")
Stats.incr(
"sla_missed", tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id": ti.task_id}
)
if sla_misses:
session.add_all(sla_misses)
session.commit()
Expand Down Expand Up @@ -484,10 +486,16 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
callback(dag, task_list, blocking_task_list, slas, blocking_tis)
notification_sent = True
except Exception:
Stats.incr("sla_callback_notification_failure")
Stats.incr(
"sla_callback_notification_failure",
tags={
"dag_id": dag.dag_id,
"func_name": callback.__name__,
},
)
self.log.exception(
"Could not call sla_miss_callback(%s) for DAG %s",
callback.func_name, # type: ignore[attr-defined]
callback.__name__,
dag.dag_id,
)
email_content = f"""\
Expand Down Expand Up @@ -523,7 +531,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
email_sent = True
notification_sent = True
except Exception:
Stats.incr("sla_email_notification_failure")
Stats.incr("sla_email_notification_failure", tags={"dag_id": dag.dag_id})
self.log.exception("Could not send SLA Miss email notification for DAG %s", dag.dag_id)
# If we sent any notification, update the sla_miss table
if notification_sent:
Expand Down Expand Up @@ -761,7 +769,7 @@ def process_file(
dagbag = DagBag(file_path, include_examples=False)
except Exception:
self.log.exception("Failed at reloading the DAG file %s", file_path)
Stats.incr("dag_file_refresh_error", 1, 1)
Stats.incr("dag_file_refresh_error", 1, 1, tags={"file_path": file_path})
return 0, 0

if len(dagbag.dags) > 0:
Expand Down
9 changes: 7 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,10 @@ def _process_executor_events(self, session: Session) -> int:
ti_requeued = ti.queued_by_job_id != self.id or self.executor.has_task(ti)

if ti_queued and not ti_requeued:
Stats.incr("scheduler.tasks.killed_externally")
Stats.incr(
"scheduler.tasks.killed_externally",
tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id": ti.task_id},
)
msg = (
"Executor reports task instance %s finished (%s) although the "
"task says its %s. (Info: %s) Was the task killed externally?"
Expand Down Expand Up @@ -1564,7 +1567,9 @@ def _find_zombies(self) -> None:
)
self.log.error("Detected zombie job: %s", request)
self.executor.send_callback(request)
Stats.incr("zombies_killed")
Stats.incr(
"zombies_killed", tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id": ti.task_id}
)

@staticmethod
def _generate_zombie_message_details(ti: TaskInstance):
Expand Down
4 changes: 3 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1328,7 +1328,9 @@ def handle_callback(self, dagrun, success=True, reason=None, session=NEW_SESSION
callback(context)
except Exception:
self.log.exception("failed to invoke dag state update callback")
Stats.incr("dag.callback_exceptions")
Stats.incr(
"dag.callback_exceptions", tags={"dag_id": dagrun.dag_id, "run_id": dagrun.run_id}
)

def get_active_runs(self):
"""
Expand Down
15 changes: 12 additions & 3 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,12 @@ def check_and_change_state_before_execution(
self.pid = None

if not ignore_all_deps and not ignore_ti_state and self.state == State.SUCCESS:
Stats.incr("previously_succeeded", 1, 1)
Stats.incr(
"previously_succeeded",
1,
1,
tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id},
)

if not mark_success:
# Firstly find non-runnable and non-requeueable tis.
Expand Down Expand Up @@ -1533,7 +1538,9 @@ def signal_handler(signum, frame):
self.task.post_execute(context=context, result=result)

Stats.incr(f"operator_successes_{self.task.task_type}", 1, 1)
Stats.incr("ti_successes")
Stats.incr(
"ti_successes", tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id}
)

def _run_finished_callback(
self,
Expand Down Expand Up @@ -1797,7 +1804,9 @@ def handle_failure(
self.end_date = timezone.utcnow()
self.set_duration()
Stats.incr(f"operator_failures_{self.operator}")
Stats.incr("ti_failures")
Stats.incr(
"ti_failures", tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id}
)
if not test_mode:
session.add(Log(State.FAILED, self))

Expand Down
77 changes: 48 additions & 29 deletions airflow/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,21 @@ class StatsLogger(Protocol):
"""This class is only used for TypeChecking (for IDEs, mypy, etc)."""

@classmethod
def incr(cls, stat: str, count: int = 1, rate: int = 1) -> None:
def incr(cls, stat: str, count: int = 1, rate: int = 1, tags: dict[str, str] | None = None) -> None:
"""Increment stat."""

@classmethod
def decr(cls, stat: str, count: int = 1, rate: int = 1) -> None:
def decr(cls, stat: str, count: int = 1, rate: int = 1, tags: dict[str, str] | None = None) -> None:
"""Decrement stat."""

@classmethod
def gauge(cls, stat: str, value: float, rate: int = 1, delta: bool = False) -> None:
def gauge(
cls, stat: str, value: float, rate: int = 1, delta: bool = False, tags: dict[str, str] | None = None
) -> None:
"""Gauge stat."""

@classmethod
def timing(cls, stat: str, dt: float | datetime.timedelta) -> None:
def timing(cls, stat: str, dt: float | datetime.timedelta, tags: dict[str, str] | None = None) -> None:
"""Stats timing."""

@classmethod
Expand Down Expand Up @@ -156,19 +158,19 @@ class DummyStatsLogger:
"""If no StatsLogger is configured, DummyStatsLogger is used as a fallback."""

@classmethod
def incr(cls, stat, count=1, rate=1):
def incr(cls, stat, count=1, rate=1, tags=None):
"""Increment stat."""

@classmethod
def decr(cls, stat, count=1, rate=1):
def decr(cls, stat, count=1, rate=1, tags=None):
"""Decrement stat."""

@classmethod
def gauge(cls, stat, value, rate=1, delta=False):
def gauge(cls, stat, value, rate=1, delta=False, tags=None):
"""Gauge stat."""

@classmethod
def timing(cls, stat, dt):
def timing(cls, stat, dt, tags=None):
"""Stats timing."""

@classmethod
Expand Down Expand Up @@ -256,35 +258,35 @@ def __init__(self, statsd_client, allow_list_validator=AllowListValidator()):
self.allow_list_validator = allow_list_validator

@validate_stat
def incr(self, stat, count=1, rate=1):
def incr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None):
"""Increment stat."""
if self.allow_list_validator.test(stat):
return self.statsd.incr(stat, count, rate)
return None

@validate_stat
def decr(self, stat, count=1, rate=1):
def decr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None):
"""Decrement stat."""
if self.allow_list_validator.test(stat):
return self.statsd.decr(stat, count, rate)
return None

@validate_stat
def gauge(self, stat, value, rate=1, delta=False):
def gauge(self, stat, value, rate=1, delta=False, tags: dict[str, str] | None = None):
"""Gauge stat."""
if self.allow_list_validator.test(stat):
return self.statsd.gauge(stat, value, rate, delta)
return None

@validate_stat
def timing(self, stat, dt):
def timing(self, stat, dt, tags: dict[str, str] | None = None):
"""Stats timing."""
if self.allow_list_validator.test(stat):
return self.statsd.timing(stat, dt)
return None

@validate_stat
def timer(self, stat=None, *args, **kwargs):
def timer(self, stat=None, *args, tags: dict[str, str] | None = None, **kwargs):
"""Timer metric that can be cancelled."""
if stat and self.allow_list_validator.test(stat):
return Timer(self.statsd.timer(stat, *args, **kwargs))
Expand All @@ -294,50 +296,66 @@ def timer(self, stat=None, *args, **kwargs):
class SafeDogStatsdLogger:
"""DogStatsd Logger."""

def __init__(self, dogstatsd_client, allow_list_validator=AllowListValidator()):
def __init__(self, dogstatsd_client, allow_list_validator=AllowListValidator(), metrics_tags=False):
self.dogstatsd = dogstatsd_client
self.allow_list_validator = allow_list_validator
self.metrics_tags = metrics_tags

@validate_stat
def incr(self, stat, count=1, rate=1, tags=None):
def incr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None):
"""Increment stat."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [f"{key}:{value}" for key, value in tags.items()]
else:
tags_list = []
if self.allow_list_validator.test(stat):
tags = tags or []
return self.dogstatsd.increment(metric=stat, value=count, tags=tags, sample_rate=rate)
return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate)
return None

@validate_stat
def decr(self, stat, count=1, rate=1, tags=None):
def decr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None):
"""Decrement stat."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [f"{key}:{value}" for key, value in tags.items()]
else:
tags_list = []
if self.allow_list_validator.test(stat):
tags = tags or []
return self.dogstatsd.decrement(metric=stat, value=count, tags=tags, sample_rate=rate)
return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate)
return None

@validate_stat
def gauge(self, stat, value, rate=1, delta=False, tags=None):
def gauge(self, stat, value, rate=1, delta=False, tags: dict[str, str] | None = None):
"""Gauge stat."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [f"{key}:{value}" for key, value in tags.items()]
else:
tags_list = []
if self.allow_list_validator.test(stat):
tags = tags or []
return self.dogstatsd.gauge(metric=stat, value=value, tags=tags, sample_rate=rate)
return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate)
return None

@validate_stat
def timing(self, stat, dt: float | datetime.timedelta, tags: list[str] | None = None):
def timing(self, stat, dt: float | datetime.timedelta, tags: dict[str, str] | None = None):
"""Stats timing."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [f"{key}:{value}" for key, value in tags.items()]
else:
tags_list = []
if self.allow_list_validator.test(stat):
tags = tags or []
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds()
return self.dogstatsd.timing(metric=stat, value=dt, tags=tags)
return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
return None

@validate_stat
def timer(self, stat=None, *args, tags=None, **kwargs):
"""Timer metric that can be cancelled."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [f"{key}:{value}" for key, value in tags.items()]
else:
tags_list = []
if stat and self.allow_list_validator.test(stat):
tags = tags or []
return Timer(self.dogstatsd.timed(stat, *args, tags=tags, **kwargs))
return Timer(self.dogstatsd.timed(stat, *args, tags=tags_list, **kwargs))
return Timer()


Expand Down Expand Up @@ -407,7 +425,8 @@ def get_dogstatsd_logger(cls):
)
dogstatsd_allow_list = conf.get("metrics", "statsd_allow_list", fallback=None)
allow_list_validator = AllowListValidator(dogstatsd_allow_list)
return SafeDogStatsdLogger(dogstatsd, allow_list_validator)
datadog_metrics_tags = conf.get("metrics", "statsd_datadog_metrics_tags", fallback=True)
return SafeDogStatsdLogger(dogstatsd, allow_list_validator, datadog_metrics_tags)

@classmethod
def get_constant_tags(cls):
Expand Down
Loading

0 comments on commit c0d9862

Please sign in to comment.