-
Notifications
You must be signed in to change notification settings - Fork 0
/
airflow_dbt_plugin_dags.py
77 lines (57 loc) · 2.01 KB
/
airflow_dbt_plugin_dags.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from airflow import DAG
from airflow.models import Variable
from airflow_dbt.operators.dbt_operator import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator,
)
from airflow.utils.dates import days_ago
synq_token = Variable.get("SYNQ_TOKEN", default_var=None)
default_args = {
"dir": "/opt/airflow/dags/repo/dbt_example",
"start_date": days_ago(0),
"profiles_dir": "/opt/airflow/dags/repo/dbt_example",
}
default_args_synq = default_args.copy()
env_dict = {"SYNQ_TOKEN": synq_token}
# Config JSON object for overrides OPTIONAL
env_dict.update(Variable.get("CONFIG_OBJECT", {}, deserialize_json=True))
default_args_synq.update({"env": env_dict, "dbt_bin": "synq-dbt"})
###
# DAGs
###
###
# Vanilla dbt run
###
with DAG(
dag_id="airflow_dbt_plugin_without_synq_dag",
default_args=default_args,
schedule_interval="@daily",
) as dag:
dbt_seed = DbtSeedOperator(task_id="dbt_seed")
dbt_snapshot = DbtSnapshotOperator(task_id="dbt_snapshot")
dbt_run = DbtRunOperator(task_id="dbt_run")
dbt_test = DbtTestOperator(
task_id="dbt_test",
retries=0, # Failing tests would fail the task, and we don't want Airflow to try again
)
dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test
##
# Dbt reporting to synq with synq-dby
# IMPORTANT: Because of a missing feature the SYNQ_TOKEN is not passed via the DbtOperator
# you need to start the Airflow worker process with the SYNQ_TOKEN environment variable set
##
with DAG(
dag_id="airflow_dbt_plugin_basic_dag",
default_args=default_args_synq,
schedule_interval="@daily",
) as dag_synq:
dbt_seed = DbtSeedOperator(task_id="dbt_seed_synq")
dbt_snapshot = DbtSnapshotOperator(task_id="dbt_snapshot_synq")
dbt_run = DbtRunOperator(task_id="dbt_run_synq")
dbt_test = DbtTestOperator(
task_id="dbt_test_synq",
retries=0, # Failing tests would fail the task, and we don't want Airflow to try again
)
(dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test)