Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable tagged metric names for existing Statsd metric publishing events | influxdb-statsd support #29093

Merged
merged 26 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9e30bd0
enable backward compatible tags on statsd
sungwy Jan 21, 2023
18463f8
update statsd calls with new method signature
sungwy Jan 23, 2023
a1c5945
enable backward compatible tags on statsd
sungwy Jan 23, 2023
300cffb
enable backward compatible tags on statsd
sungwy Jan 23, 2023
90cd3f3
Merge branch 'main' into influxdb-statsd
sungwy Jan 30, 2023
d36abec
publish tagged metrics for existing stats
sungwy Feb 1, 2023
23c3499
publish tagged metrics for existing stats
sungwy Feb 1, 2023
73b21ab
publish tagged metrics for existing stats
sungwy Feb 1, 2023
2510db7
readability and typed arguments
sungwy Feb 2, 2023
68248ba
Merge branch 'main' into influxdb-statsd
sungwy Feb 6, 2023
3453d34
fix method signature
sungwy Feb 6, 2023
209fa92
Merge branches 'influxdb-statsd' and 'influxdb-statsd' of https://git…
sungwy Feb 6, 2023
0c31643
enable backward compatible tags on statsd
sungwy Jan 21, 2023
47b1157
update statsd calls with new method signature
sungwy Jan 23, 2023
23766e9
enable backward compatible tags on statsd
sungwy Jan 23, 2023
1c5d6dd
enable backward compatible tags on statsd
sungwy Jan 23, 2023
d6101a9
publish tagged metrics for existing stats
sungwy Feb 1, 2023
a14fca3
publish tagged metrics for existing stats
sungwy Feb 1, 2023
6098938
publish tagged metrics for existing stats
sungwy Feb 1, 2023
3a08470
readability and typed arguments
sungwy Feb 2, 2023
1fade47
fix method signature
sungwy Feb 6, 2023
e43d3f7
Merge branch 'influxdb-statsd' of https://github.com/syun64/airflow i…
sungwy Feb 14, 2023
b6ba1cc
handle ClientConnectionError in HttpAsyncHook
sungwy Feb 14, 2023
f62f1d6
Revert "handle ClientConnectionError in HttpAsyncHook"
sungwy Feb 14, 2023
766e338
Merge branch 'main' into influxdb-statsd
sungwy Feb 14, 2023
07225ed
Merge branch 'main' into influxdb-statsd
sungwy Feb 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,13 @@ metrics:
type: string
example: ~
default: ~
statsd_influxdb_enabled:
description: |
To enable sending Airflow metrics with StatsD-Influxdb tagging convention.
version_added: 2.6.0
type: boolean
example: ~
default: "False"
secrets:
description: ~
options:
Expand Down
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,9 @@ statsd_datadog_metrics_tags = True
# Note: The module path must exist on your PYTHONPATH for Airflow to pick it up
# statsd_custom_client_path =

# To enable sending Airflow metrics with StatsD-Influxdb tagging convention.
statsd_influxdb_enabled = False

[secrets]
# Full class name of secrets backend to enable (will precede env vars and metastore in search path)
# Example: backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend
Expand Down
1 change: 1 addition & 0 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,7 @@ def _collect_results_from_processor(self, processor) -> None:

file_name = os.path.splitext(os.path.basename(processor.file_path))[0].replace(os.sep, ".")
Stats.timing(f"dag_processing.last_duration.{file_name}", last_duration)
Stats.timing("dag_processing.last_duration", last_duration, tags={"file_name": file_name})

def collect_results(self) -> None:
"""Collect the result from any finished DAG processors."""
Expand Down
11 changes: 11 additions & 0 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,13 @@ def _update_state(dag: DAG, dag_run: DagRun):
# always happening immediately after the data interval.
expected_start_date = dag.get_run_data_interval(dag_run).end
schedule_delay = dag_run.start_date - expected_start_date
# Publish metrics twice with backward compatible name, and then with tags
Stats.timing(f"dagrun.schedule_delay.{dag.dag_id}", schedule_delay)
Stats.timing(
"dagrun.schedule_delay",
schedule_delay,
tags={"dag_id": dag.dag_id},
)

for dag_run in dag_runs:
dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
Expand Down Expand Up @@ -1327,6 +1333,7 @@ def _schedule_dag_run(
dag_run.notify_dagrun_state_changed()
duration = dag_run.end_date - dag_run.start_date
Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration)
Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": dag_run.dag_id})
return callback_to_execute

if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates:
Expand Down Expand Up @@ -1410,6 +1417,10 @@ def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"])
Stats.gauge(f"pool.queued_slots.{pool_name}", slot_stats["queued"])
Stats.gauge(f"pool.running_slots.{pool_name}", slot_stats["running"])
# tagged metrics
Stats.gauge("pool.open_slots", slot_stats["open"], tags={"pool_name": pool_name})
Stats.gauge("pool.queued_slots", slot_stats["queued"], tags={"pool_name": pool_name})
Stats.gauge("pool.running_slots", slot_stats["running"], tags={"pool_name": pool_name})

@provide_session
def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
Expand Down
7 changes: 7 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,11 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis: lis
true_delay = first_start_date - data_interval_end
if true_delay.total_seconds() > 0:
Stats.timing(f"dagrun.{dag.dag_id}.first_task_scheduling_delay", true_delay)
Stats.timing(
"dagrun.first_task_scheduling_delay",
true_delay,
tags={"dag_id": dag.dag_id},
)
except Exception:
self.log.warning("Failed to record first_task_scheduling_delay metric:", exc_info=True)

Expand All @@ -893,8 +898,10 @@ def _emit_duration_stats_for_finished_state(self):
duration = self.end_date - self.start_date
if self.state == State.SUCCESS:
Stats.timing(f"dagrun.duration.success.{self.dag_id}", duration)
Stats.timing("dagrun.duration.success", duration, tags={"dag_id": self.dag_id})
elif self.state == State.FAILED:
Stats.timing(f"dagrun.duration.failed.{self.dag_id}", duration)
Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": self.dag_id})

@provide_session
def verify_integrity(self, *, session: Session = NEW_SESSION) -> None:
Expand Down
Loading