diff --git a/airflow/api/common/trigger_dag.py b/airflow/api/common/trigger_dag.py index 686b38ca754c4..01da7745c7bf3 100644 --- a/airflow/api/common/trigger_dag.py +++ b/airflow/api/common/trigger_dag.py @@ -35,7 +35,6 @@ def _trigger_dag( conf: dict | str | None = None, execution_date: datetime | None = None, replace_microseconds: bool = True, - notes: str | None = None, ) -> list[DagRun | None]: """Triggers DAG run. @@ -93,7 +92,6 @@ def _trigger_dag( external_trigger=True, dag_hash=dag_bag.dags_hash.get(dag_id), data_interval=data_interval, - notes=notes, ) dag_runs.append(dag_run) @@ -106,7 +104,6 @@ def trigger_dag( conf: dict | str | None = None, execution_date: datetime | None = None, replace_microseconds: bool = True, - notes: str | None = None, ) -> DagRun | None: """Triggers execution of DAG specified by dag_id. @@ -115,7 +112,6 @@ def trigger_dag( :param conf: configuration :param execution_date: date of execution :param replace_microseconds: whether microseconds should be zeroed - :param notes: set a custom note for the newly created DagRun :return: first dag run triggered - even if more than one Dag Runs were triggered or None """ dag_model = DagModel.get_current(dag_id) @@ -130,7 +126,6 @@ def trigger_dag( conf=conf, execution_date=execution_date, replace_microseconds=replace_microseconds, - notes=notes, ) return triggers[0] if triggers else None diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 6678b76903bc8..e14bde177d1fe 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -317,6 +317,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: conf=post_body.get("conf"), external_trigger=True, dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id), + session=session, ) return dagrun_schema.dump(dag_run) except ValueError as ve: @@ -412,7 +413,7 @@ def clear_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSIO include_parentdag=True, only_failed=False, ) - dag_run.refresh_from_db() + dag_run = session.query(DagRun).filter(DagRun.id == dag_run.id).one() return dagrun_schema.dump(dag_run) @@ -437,6 +438,13 @@ def set_dag_run_notes(*, dag_id: str, dag_run_id: str, session: Session = NEW_SE except ValidationError as err: raise BadRequest(detail=str(err)) - dag_run.notes = new_value_for_notes or None + from flask_login import current_user + + current_user_id = getattr(current_user, "id", None) + if dag_run.dag_run_note is None: + dag_run.notes = (new_value_for_notes, current_user_id) + else: + dag_run.dag_run_note.content = new_value_for_notes + dag_run.dag_run_note.user_id = current_user_id session.commit() return dagrun_schema.dump(dag_run) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 74f8d2a015d14..3f692d8a0d754 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -670,7 +670,13 @@ def set_task_instance_notes( raise NotFound(error_message) ti, sla_miss = result - ti.notes = new_value_for_notes or None - session.commit() + from flask_login import current_user + current_user_id = getattr(current_user, "id", None) + if ti.task_instance_note is None: + ti.notes = (new_value_for_notes, current_user_id) + else: + ti.task_instance_note.content = new_value_for_notes + ti.task_instance_note.user_id = current_user_id + session.commit() return task_instance_schema.dump((ti, sla_miss)) diff --git a/airflow/migrations/versions/0121_2_5_0_add_dagrunnote_and_taskinstancenote.py b/airflow/migrations/versions/0121_2_5_0_add_dagrunnote_and_taskinstancenote.py new file mode 100644 index 0000000000000..d13fba38aa70c --- /dev/null +++ b/airflow/migrations/versions/0121_2_5_0_add_dagrunnote_and_taskinstancenote.py @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add DagRunNote and TaskInstanceNote + +Revision ID: 1986afd32c1b +Revises: ee8d93fcc81e +Create Date: 2022-11-22 21:49:05.843439 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +from airflow.migrations.db_types import StringID +from airflow.utils.sqlalchemy import UtcDateTime + +# revision identifiers, used by Alembic. +revision = "1986afd32c1b" +down_revision = "ee8d93fcc81e" +branch_labels = None +depends_on = None +airflow_version = "2.5.0" + + +def upgrade(): + """Apply Add DagRunNote and TaskInstanceNote""" + op.create_table( + "dag_run_note", + sa.Column("user_id", sa.Integer(), nullable=True), + sa.Column("dag_run_id", sa.Integer(), nullable=False), + sa.Column( + "content", sa.String(length=1000).with_variant(sa.Text(length=1000), "mysql"), nullable=True + ), + sa.Column("created_at", UtcDateTime(timezone=True), nullable=False), + sa.Column("updated_at", UtcDateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint( + ("dag_run_id",), ["dag_run.id"], name="dag_run_note_dr_fkey", ondelete="CASCADE" + ), + sa.ForeignKeyConstraint(("user_id",), ["ab_user.id"], name="dag_run_note_user_fkey"), + sa.PrimaryKeyConstraint("dag_run_id", name=op.f("dag_run_note_pkey")), + ) + + op.create_table( + "task_instance_note", + sa.Column("user_id", sa.Integer(), nullable=True), + sa.Column("task_id", StringID(), nullable=False), + sa.Column("dag_id", StringID(), nullable=False), + sa.Column("run_id", StringID(), nullable=False), + sa.Column("map_index", sa.Integer(), nullable=False), + sa.Column( + "content", sa.String(length=1000).with_variant(sa.Text(length=1000), "mysql"), nullable=True + ), + sa.Column("created_at", UtcDateTime(timezone=True), nullable=False), + sa.Column("updated_at", UtcDateTime(timezone=True), nullable=False), + sa.PrimaryKeyConstraint( + "task_id", "dag_id", "run_id", "map_index", name=op.f("task_instance_note_pkey") + ), + sa.ForeignKeyConstraint( + ("dag_id", "task_id", "run_id", "map_index"), + [ + "task_instance.dag_id", + "task_instance.task_id", + "task_instance.run_id", + "task_instance.map_index", + ], + name="task_instance_note_ti_fkey", + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint(("user_id",), ["ab_user.id"], name="task_instance_note_user_fkey"), + ) + + +def downgrade(): + """Unapply Add DagRunNote and TaskInstanceNote""" + op.drop_table("task_instance_note") + op.drop_table("dag_run_note") diff --git a/airflow/migrations/versions/0121_2_5_0_add_user_comment_to_task_instance_and_dag_run.py b/airflow/migrations/versions/0121_2_5_0_add_user_comment_to_task_instance_and_dag_run.py deleted file mode 100644 index f09eee136f203..0000000000000 --- a/airflow/migrations/versions/0121_2_5_0_add_user_comment_to_task_instance_and_dag_run.py +++ /dev/null @@ -1,63 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""Add user comment to task_instance and dag_run. - -Revision ID: 65a852f26899 -Revises: ecb43d2a1842 -Create Date: 2022-09-17 20:01:42.652862 - -""" - -from __future__ import annotations - -import sqlalchemy as sa -from alembic import op - -# revision identifiers, used by Alembic. -revision = "65a852f26899" -down_revision = "ee8d93fcc81e" -branch_labels = None -depends_on = None -airflow_version = "2.5.0" - - -def upgrade(): - """Apply add user comment to task_instance and dag_run""" - conn = op.get_bind() - - with op.batch_alter_table("dag_run") as batch_op: - if conn.dialect.name == "mysql": - batch_op.add_column(sa.Column("notes", sa.Text(length=1000), nullable=True)) - else: - batch_op.add_column(sa.Column("notes", sa.String(length=1000), nullable=True)) - - with op.batch_alter_table("task_instance") as batch_op: - if conn.dialect.name == "mysql": - batch_op.add_column(sa.Column("notes", sa.Text(length=1000), nullable=True)) - else: - batch_op.add_column(sa.Column("notes", sa.String(length=1000), nullable=True)) - - -def downgrade(): - """Unapply add user comment to task_instance and dag_run""" - with op.batch_alter_table("task_instance", schema=None) as batch_op: - batch_op.drop_column("notes") - - with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.drop_column("notes") diff --git a/airflow/models/dag.py b/airflow/models/dag.py index b61f679269395..bd81980fc2b59 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2552,7 +2552,6 @@ def create_dagrun( dag_hash: str | None = None, creating_job_id: int | None = None, data_interval: tuple[datetime, datetime] | None = None, - notes: str | None = None, ): """ Creates a dag run from this dag including the tasks associated with this dag. @@ -2569,7 +2568,6 @@ def create_dagrun( :param session: database session :param dag_hash: Hash of Serialized DAG :param data_interval: Data interval of the DagRun - :param notes: A custom note for the DAGRun. """ logical_date = timezone.coerce_datetime(execution_date) @@ -2628,7 +2626,6 @@ def create_dagrun( dag_hash=dag_hash, creating_job_id=creating_job_id, data_interval=data_interval, - notes=notes, ) session.add(run) session.flush() diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 2f2d9509a48d7..e148d08f498b7 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -28,6 +28,7 @@ Boolean, Column, ForeignKey, + ForeignKeyConstraint, Index, Integer, PickleType, @@ -40,6 +41,7 @@ text, ) from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.orm import joinedload, relationship, synonym from sqlalchemy.orm.session import Session @@ -85,6 +87,16 @@ class TISchedulingDecision(NamedTuple): finished_tis: list[TI] +def _creator_note(val): + """Custom creator for the ``note`` association proxy.""" + if isinstance(val, str): + return DagRunNote(content=val) + elif isinstance(val, dict): + return DagRunNote(**val) + else: + return DagRunNote(*val) + + class DagRun(Base, LoggingMixin): """ DagRun describes an instance of a Dag. It can be created @@ -111,7 +123,6 @@ class DagRun(Base, LoggingMixin): # When a scheduler last attempted to schedule TIs for this DagRun last_scheduling_decision = Column(UtcDateTime) dag_hash = Column(String(32)) - notes = Column(String(1000).with_variant(Text(1000), "mysql")) # Foreign key to LogTemplate. DagRun rows created prior to this column's # existence have this set to NULL. Later rows automatically populate this on # insert to point to the latest LogTemplate entry. @@ -163,6 +174,8 @@ class DagRun(Base, LoggingMixin): uselist=False, viewonly=True, ) + dag_run_note = relationship("DagRunNote", back_populates="dag_run", uselist=False) + notes = association_proxy("dag_run_note", "content", creator=_creator_note) DEFAULT_DAGRUNS_TO_EXAMINE = airflow_conf.getint( "scheduler", @@ -184,7 +197,6 @@ def __init__( dag_hash: str | None = None, creating_job_id: int | None = None, data_interval: tuple[datetime, datetime] | None = None, - notes: str | None = None, ): if data_interval is None: # Legacy: Only happen for runs created prior to Airflow 2.2. @@ -207,7 +219,6 @@ def __init__( self.run_type = run_type self.dag_hash = dag_hash self.creating_job_id = creating_job_id - self.notes = notes super().__init__() def __repr__(self): @@ -1295,3 +1306,41 @@ def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str: stacklevel=2, ) return self.get_log_template(session=session).filename + + +class DagRunNote(Base): + """For storage of arbitrary notes concerning the dagrun instance.""" + + __tablename__ = "dag_run_note" + + user_id = Column(Integer, nullable=True) + dag_run_id = Column(Integer, primary_key=True, nullable=False) + content = Column(String(1000).with_variant(Text(1000), "mysql")) + created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) + + dag_run = relationship("DagRun", back_populates="dag_run_note") + + __table_args__ = ( + ForeignKeyConstraint( + (dag_run_id,), + ["dag_run.id"], + name="dag_run_note_dr_fkey", + ondelete="CASCADE", + ), + ForeignKeyConstraint( + (user_id,), + ["ab_user.id"], + name="dag_run_note_user_fkey", + ), + ) + + def __init__(self, content, user_id=None): + self.content = content + self.user_id = user_id + + def __repr__(self): + prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.dagrun_id} {self.run_id}" + if self.map_index != -1: + prefix += f" map_index={self.map_index}" + return prefix + ">" diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 29d0ad4497405..58f4d5c379c10 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -313,6 +313,16 @@ def key(self) -> TaskInstanceKey: return self +def _creator_note(val): + """Custom creator for the ``note`` association proxy.""" + if isinstance(val, str): + return TaskInstanceNote(content=val) + elif isinstance(val, dict): + return TaskInstanceNote(**val) + else: + return TaskInstanceNote(*val) + + class TaskInstance(Base, LoggingMixin): """ Task instances store the state of a task instance. This table is the @@ -355,7 +365,6 @@ class TaskInstance(Base, LoggingMixin): queued_by_job_id = Column(Integer) pid = Column(Integer) executor_config = Column(ExecutorConfigType(pickler=dill)) - notes = Column(String(1000).with_variant(Text(1000), "mysql")) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow) external_executor_id = Column(StringID()) @@ -415,9 +424,9 @@ class TaskInstance(Base, LoggingMixin): triggerer_job = association_proxy("trigger", "triggerer_job") dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True) rendered_task_instance_fields = relationship("RenderedTaskInstanceFields", lazy="noload", uselist=False) - execution_date = association_proxy("dag_run", "execution_date") - + task_instance_note = relationship("TaskInstanceNote", back_populates="task_instance", uselist=False) + notes = association_proxy("task_instance_note", "content", creator=_creator_note) task: Operator # Not always set... def __init__( @@ -794,7 +803,6 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool self.trigger_id = ti.trigger_id self.next_method = ti.next_method self.next_kwargs = ti.next_kwargs - self.notes = ti.notes else: self.state = None @@ -2676,6 +2684,52 @@ def from_dict(cls, obj_dict: dict) -> SimpleTaskInstance: return cls(**obj_dict, start_date=start_date, end_date=end_date, key=ti_key) +class TaskInstanceNote(Base): + """For storage of arbitrary notes concerning the task instance.""" + + __tablename__ = "task_instance_note" + + user_id = Column(Integer, nullable=True) + task_id = Column(StringID(), primary_key=True, nullable=False) + dag_id = Column(StringID(), primary_key=True, nullable=False) + run_id = Column(StringID(), primary_key=True, nullable=False) + map_index = Column(Integer, primary_key=True, nullable=False) + content = Column(String(1000).with_variant(Text(1000), "mysql")) + created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) + + task_instance = relationship("TaskInstance", back_populates="task_instance_note") + + __table_args__ = ( + ForeignKeyConstraint( + (dag_id, task_id, run_id, map_index), + [ + "task_instance.dag_id", + "task_instance.task_id", + "task_instance.run_id", + "task_instance.map_index", + ], + name="task_instance_note_ti_fkey", + ondelete="CASCADE", + ), + ForeignKeyConstraint( + (user_id,), + ["ab_user.id"], + name="task_instance_note_user_fkey", + ), + ) + + def __init__(self, content, user_id=None): + self.content = content + self.user_id = user_id + + def __repr__(self): + prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.run_id}" + if self.map_index != -1: + prefix += f" map_index={self.map_index}" + return prefix + ">" + + STATICA_HACK = True globals()["kcah_acitats"[::-1].upper()] = False if STATICA_HACK: # pragma: no cover diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index e0189038f3c12..b687c05dc4b97 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -99,7 +99,6 @@ def __init__( poke_interval: int = 60, allowed_states: list | None = None, failed_states: list | None = None, - dag_run_notes: str | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -111,7 +110,6 @@ def __init__( self.poke_interval = poke_interval self.allowed_states = allowed_states or [State.SUCCESS] self.failed_states = failed_states or [State.FAILED] - self.dag_run_notes = dag_run_notes if execution_date is not None and not isinstance(execution_date, (str, datetime.datetime)): raise TypeError( @@ -144,7 +142,6 @@ def execute(self, context: Context): conf=self.conf, execution_date=parsed_execution_date, replace_microseconds=False, - notes=self.dag_run_notes, ) except DagRunAlreadyExists as e: diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 8cbaa504f541b..c1908ed87c99e 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -717,10 +717,10 @@ def clean_column_names(): clean_column_names() # Support for AssociationProxy in search and list columns - for desc in self.obj.__mapper__.all_orm_descriptors: + for obj_attr, desc in self.obj.__mapper__.all_orm_descriptors.items(): if not isinstance(desc, AssociationProxy): continue - proxy_instance = getattr(self.obj, desc.value_attr) + proxy_instance = getattr(self.obj, obj_attr) if hasattr(proxy_instance.remote_attr.prop, "columns"): self.list_columns[desc.value_attr] = proxy_instance.remote_attr.prop.columns[0] self.list_properties[desc.value_attr] = proxy_instance.remote_attr.prop diff --git a/airflow/www/views.py b/airflow/www/views.py index 61ccc47db0e45..34bbe6d150ff3 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -101,7 +101,7 @@ from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue, DatasetEvent, DatasetModel from airflow.models.operator import Operator from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskinstance import TaskInstance +from airflow.models.taskinstance import TaskInstance, TaskInstanceNote from airflow.providers_manager import ProvidersManager from airflow.security import permissions from airflow.ti_deps.dep_context import DepContext @@ -263,19 +263,18 @@ def dag_to_grid(dag, dag_runs, session): TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, - TaskInstance.notes, - sqla.func.count(sqla.func.coalesce(TaskInstance.state, sqla.literal("no_status"))).label( - "state_count" - ), - sqla.func.min(TaskInstance.start_date).label("start_date"), - sqla.func.max(TaskInstance.end_date).label("end_date"), - sqla.func.max(TaskInstance._try_number).label("_try_number"), + func.min(TaskInstanceNote.content).label("notes"), + func.count(func.coalesce(TaskInstance.state, sqla.literal("no_status"))).label("state_count"), + func.min(TaskInstance.start_date).label("start_date"), + func.max(TaskInstance.end_date).label("end_date"), + func.max(TaskInstance._try_number).label("_try_number"), ) + .join(TaskInstance.task_instance_note, isouter=True) .filter( TaskInstance.dag_id == dag.dag_id, TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]), ) - .group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, TaskInstance.notes) + .group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state) .order_by(TaskInstance.task_id, TaskInstance.run_id) ) @@ -4899,7 +4898,7 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView): "run_type", "start_date", "end_date", - "notes", + # "notes", # todo: maybe figure out how to re-enable this "external_trigger", ] label_columns = { @@ -5291,7 +5290,7 @@ class TaskInstanceModelView(AirflowPrivilegeVerifierModelView): "operator", "start_date", "end_date", - "notes", + # "notes", # todo: maybe make notes work with TI search? "hostname", "priority_weight", "queue", diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 8e2b53ba86452..d2240af1eb048 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -bf1db1b1041afe3ef6277d96701f6d82bce44497b2b4c49ee79f7bb198f51042 \ No newline at end of file +f529521071a6c9ae8bbd58d63cf1195fc1ec964308e7684569d0a36d26534def \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index ec7546b1aa078..a348a579c4466 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,1511 +4,1614 @@ - - + + %3 - + ab_permission - -ab_permission - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(100)] + NOT NULL ab_permission_view - -ab_permission_view - -id - [INTEGER] - NOT NULL - -permission_id - [INTEGER] - -view_menu_id - [INTEGER] + +ab_permission_view + +id + [INTEGER] + NOT NULL + +permission_id + [INTEGER] + +view_menu_id + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_permission_view_role - -ab_permission_view_role - -id - [INTEGER] - NOT NULL - -permission_view_id - [INTEGER] - -role_id - [INTEGER] + +ab_permission_view_role + +id + [INTEGER] + NOT NULL + +permission_view_id + [INTEGER] + +role_id + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_view_menu - -ab_view_menu - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_role - -ab_role - -id - [INTEGER] - NOT NULL - -name - [VARCHAR(64)] - NOT NULL + +ab_role + +id + [INTEGER] + NOT NULL + +name + [VARCHAR(64)] + NOT NULL ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_user_role - -ab_user_role - -id - [INTEGER] - NOT NULL - -role_id - [INTEGER] - -user_id - [INTEGER] + +ab_user_role + +id + [INTEGER] + NOT NULL + +role_id + [INTEGER] + +user_id + [INTEGER] ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_register_user - -ab_register_user - -id - [INTEGER] - NOT NULL - -email - [VARCHAR(256)] - NOT NULL - -first_name - [VARCHAR(64)] - NOT NULL - -last_name - [VARCHAR(64)] - NOT NULL - -password - [VARCHAR(256)] - -registration_date - [DATETIME] - -registration_hash - [VARCHAR(256)] - -username - [VARCHAR(256)] - NOT NULL + +ab_register_user + +id + [INTEGER] + NOT NULL + +email + [VARCHAR(256)] + NOT NULL + +first_name + [VARCHAR(64)] + NOT NULL + +last_name + [VARCHAR(64)] + NOT NULL + +password + [VARCHAR(256)] + +registration_date + [DATETIME] + +registration_hash + [VARCHAR(256)] + +username + [VARCHAR(256)] + NOT NULL ab_user - -ab_user - -id - [INTEGER] - NOT NULL - -active - [BOOLEAN] - -changed_by_fk - [INTEGER] - -changed_on - [DATETIME] - -created_by_fk - [INTEGER] - -created_on - [DATETIME] - -email - [VARCHAR(256)] - NOT NULL - -fail_login_count - [INTEGER] - -first_name - [VARCHAR(64)] - NOT NULL - -last_login - [DATETIME] - -last_name - [VARCHAR(64)] - NOT NULL - -login_count - [INTEGER] - -password - [VARCHAR(256)] - -username - [VARCHAR(256)] - NOT NULL + +ab_user + +id + [INTEGER] + NOT NULL + +active + [BOOLEAN] + +changed_by_fk + [INTEGER] + +changed_on + [DATETIME] + +created_by_fk + [INTEGER] + +created_on + [DATETIME] + +email + [VARCHAR(256)] + NOT NULL + +fail_login_count + [INTEGER] + +first_name + [VARCHAR(64)] + NOT NULL + +last_login + [DATETIME] + +last_name + [VARCHAR(64)] + NOT NULL + +login_count + [INTEGER] + +password + [VARCHAR(256)] + +username + [VARCHAR(256)] + NOT NULL ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} - + +dag_run_note + +dag_run_note + +dag_run_id + [INTEGER] + NOT NULL + +content + [VARCHAR(1000)] + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL + +user_id + [INTEGER] + + + +ab_user--dag_run_note + +0..N +{0,1} + + + +task_instance_note + +task_instance_note + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +content + [VARCHAR(1000)] + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL + +user_id + [INTEGER] + + + +ab_user--task_instance_note + +0..N +{0,1} + + + alembic_version - -alembic_version - -version_num - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + [VARCHAR(32)] + NOT NULL - + callback_request - -callback_request - -id - [INTEGER] - NOT NULL - -callback_data - [JSON] - NOT NULL - -callback_type - [VARCHAR(20)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -priority_weight - [INTEGER] - NOT NULL - -processor_subdir - [VARCHAR(2000)] + +callback_request + +id + [INTEGER] + NOT NULL + +callback_data + [JSON] + NOT NULL + +callback_type + [VARCHAR(20)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +priority_weight + [INTEGER] + NOT NULL + +processor_subdir + [VARCHAR(2000)] - + connection - -connection - -id - [INTEGER] - NOT NULL - -conn_id - [VARCHAR(250)] - NOT NULL - -conn_type - [VARCHAR(500)] - NOT NULL - -description - [VARCHAR(5000)] - -extra - [TEXT] - -host - [VARCHAR(500)] - -is_encrypted - [BOOLEAN] - -is_extra_encrypted - [BOOLEAN] - -login - [VARCHAR(500)] - -password - [VARCHAR(5000)] - -port - [INTEGER] - -schema - [VARCHAR(500)] + +connection + +id + [INTEGER] + NOT NULL + +conn_id + [VARCHAR(250)] + NOT NULL + +conn_type + [VARCHAR(500)] + NOT NULL + +description + [VARCHAR(5000)] + +extra + [TEXT] + +host + [VARCHAR(500)] + +is_encrypted + [BOOLEAN] + +is_extra_encrypted + [BOOLEAN] + +login + [VARCHAR(500)] + +password + [VARCHAR(5000)] + +port + [INTEGER] + +schema + [VARCHAR(500)] - + dag - -dag - -dag_id - [VARCHAR(250)] - NOT NULL - -default_view - [VARCHAR(25)] - -description - [TEXT] - -fileloc - [VARCHAR(2000)] - -has_import_errors - [BOOLEAN] - -has_task_concurrency_limits - [BOOLEAN] - NOT NULL - -is_active - [BOOLEAN] - -is_paused - [BOOLEAN] - -is_subdag - [BOOLEAN] - -last_expired - [TIMESTAMP] - -last_parsed_time - [TIMESTAMP] - -last_pickled - [TIMESTAMP] - -max_active_runs - [INTEGER] - -max_active_tasks - [INTEGER] - NOT NULL - -next_dagrun - [TIMESTAMP] - -next_dagrun_create_after - [TIMESTAMP] - -next_dagrun_data_interval_end - [TIMESTAMP] - -next_dagrun_data_interval_start - [TIMESTAMP] - -owners - [VARCHAR(2000)] - -pickle_id - [INTEGER] - -processor_subdir - [VARCHAR(2000)] - -root_dag_id - [VARCHAR(250)] - -schedule_interval - [TEXT] - -scheduler_lock - [BOOLEAN] - -timetable_description - [VARCHAR(1000)] + +dag + +dag_id + [VARCHAR(250)] + NOT NULL + +default_view + [VARCHAR(25)] + +description + [TEXT] + +fileloc + [VARCHAR(2000)] + +has_import_errors + [BOOLEAN] + +has_task_concurrency_limits + [BOOLEAN] + NOT NULL + +is_active + [BOOLEAN] + +is_paused + [BOOLEAN] + +is_subdag + [BOOLEAN] + +last_expired + [TIMESTAMP] + +last_parsed_time + [TIMESTAMP] + +last_pickled + [TIMESTAMP] + +max_active_runs + [INTEGER] + +max_active_tasks + [INTEGER] + NOT NULL + +next_dagrun + [TIMESTAMP] + +next_dagrun_create_after + [TIMESTAMP] + +next_dagrun_data_interval_end + [TIMESTAMP] + +next_dagrun_data_interval_start + [TIMESTAMP] + +owners + [VARCHAR(2000)] + +pickle_id + [INTEGER] + +processor_subdir + [VARCHAR(2000)] + +root_dag_id + [VARCHAR(250)] + +schedule_interval + [TEXT] + +scheduler_lock + [BOOLEAN] + +timetable_description + [VARCHAR(1000)] - + dag_owner_attributes - -dag_owner_attributes - -dag_id - [VARCHAR(250)] - NOT NULL - -owner - [VARCHAR(500)] - NOT NULL - -link - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + [VARCHAR(250)] + NOT NULL + +owner + [VARCHAR(500)] + NOT NULL + +link + [VARCHAR(500)] + NOT NULL - + dag--dag_owner_attributes - -0..N -{0,1} + +0..N +{0,1} - + dag_schedule_dataset_reference - -dag_schedule_dataset_reference - -dag_id - [VARCHAR(250)] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL + +dag_schedule_dataset_reference + +dag_id + [VARCHAR(250)] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL - + dag--dag_schedule_dataset_reference - -0..N -{0,1} + +0..N +{0,1} - + dag_tag - -dag_tag - -dag_id - [VARCHAR(250)] - NOT NULL - -name - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + [VARCHAR(250)] + NOT NULL + +name + [VARCHAR(100)] + NOT NULL - + dag--dag_tag - -0..N -{0,1} + +0..N +{0,1} - + dag_warning - -dag_warning - -dag_id - [VARCHAR(250)] - NOT NULL - -warning_type - [VARCHAR(50)] - NOT NULL - -message - [TEXT] - NOT NULL - -timestamp - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + [VARCHAR(250)] + NOT NULL + +warning_type + [VARCHAR(50)] + NOT NULL + +message + [TEXT] + NOT NULL + +timestamp + [TIMESTAMP] + NOT NULL - + dag--dag_warning - -0..N -{0,1} + +0..N +{0,1} - + dataset_dag_run_queue - -dataset_dag_run_queue - -dataset_id - [INTEGER] - NOT NULL - -target_dag_id - [VARCHAR(250)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL + +dataset_dag_run_queue + +dataset_id + [INTEGER] + NOT NULL + +target_dag_id + [VARCHAR(250)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL - + dag--dataset_dag_run_queue - -0..N -{0,1} + +0..N +{0,1} - + task_outlet_dataset_reference - -task_outlet_dataset_reference - -dag_id - [VARCHAR(250)] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL + +task_outlet_dataset_reference + +dag_id + [VARCHAR(250)] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL - + dag--task_outlet_dataset_reference - -0..N -{0,1} + +0..N +{0,1} - + dag_code - -dag_code - -fileloc_hash - [BIGINT] - NOT NULL - -fileloc - [VARCHAR(2000)] - NOT NULL - -last_updated - [TIMESTAMP] - NOT NULL - -source_code - [TEXT] - NOT NULL + +dag_code + +fileloc_hash + [BIGINT] + NOT NULL + +fileloc + [VARCHAR(2000)] + NOT NULL + +last_updated + [TIMESTAMP] + NOT NULL + +source_code + [TEXT] + NOT NULL - + dag_pickle - -dag_pickle - -id - [INTEGER] - NOT NULL - -created_dttm - [TIMESTAMP] - -pickle - [BLOB] - -pickle_hash - [BIGINT] + +dag_pickle + +id + [INTEGER] + NOT NULL + +created_dttm + [TIMESTAMP] + +pickle + [BLOB] + +pickle_hash + [BIGINT] - + dag_run - -dag_run - -id - [INTEGER] - NOT NULL - -conf - [BLOB] - -creating_job_id - [INTEGER] - -dag_hash - [VARCHAR(32)] - -dag_id - [VARCHAR(250)] - NOT NULL - -data_interval_end - [TIMESTAMP] - -data_interval_start - [TIMESTAMP] - -end_date - [TIMESTAMP] - -execution_date - [TIMESTAMP] - NOT NULL - -external_trigger - [BOOLEAN] - -last_scheduling_decision - [TIMESTAMP] - -log_template_id - [INTEGER] - -notes - [VARCHAR(1000)] - -queued_at - [TIMESTAMP] - -run_id - [VARCHAR(250)] - NOT NULL - -run_type - [VARCHAR(50)] - NOT NULL - -start_date - [TIMESTAMP] - -state - [VARCHAR(50)] - -updated_at - [TIMESTAMP] + +dag_run + +id + [INTEGER] + NOT NULL + +conf + [BLOB] + +creating_job_id + [INTEGER] + +dag_hash + [VARCHAR(32)] + +dag_id + [VARCHAR(250)] + NOT NULL + +data_interval_end + [TIMESTAMP] + +data_interval_start + [TIMESTAMP] + +end_date + [TIMESTAMP] + +execution_date + [TIMESTAMP] + NOT NULL + +external_trigger + [BOOLEAN] + +last_scheduling_decision + [TIMESTAMP] + +log_template_id + [INTEGER] + +queued_at + [TIMESTAMP] + +run_id + [VARCHAR(250)] + NOT NULL + +run_type + [VARCHAR(50)] + NOT NULL + +start_date + [TIMESTAMP] + +state + [VARCHAR(50)] + +updated_at + [TIMESTAMP] + + + +dag_run--dag_run_note + +0..N +{0,1} - + dagrun_dataset_event - -dagrun_dataset_event - -dag_run_id - [INTEGER] - NOT NULL - -event_id - [INTEGER] - NOT NULL + +dagrun_dataset_event + +dag_run_id + [INTEGER] + NOT NULL + +event_id + [INTEGER] + NOT NULL - + dag_run--dagrun_dataset_event - -0..N -{0,1} + +0..N +{0,1} - + task_instance - -task_instance - -dag_id - [VARCHAR(250)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -duration - [FLOAT] - -end_date - [TIMESTAMP] - -executor_config - [BLOB] - -external_executor_id - [VARCHAR(250)] - -hostname - [VARCHAR(1000)] - -job_id - [INTEGER] - -max_tries - [INTEGER] - -next_kwargs - [JSON] - -next_method - [VARCHAR(1000)] - -notes - [VARCHAR(1000)] - -operator - [VARCHAR(1000)] - -pid - [INTEGER] - -pool - [VARCHAR(256)] - NOT NULL - -pool_slots - [INTEGER] - NOT NULL - -priority_weight - [INTEGER] - -queue - [VARCHAR(256)] - -queued_by_job_id - [INTEGER] - -queued_dttm - [TIMESTAMP] - -start_date - [TIMESTAMP] - -state - [VARCHAR(20)] - -trigger_id - [INTEGER] - -trigger_timeout - [DATETIME] - -try_number - [INTEGER] - -unixname - [VARCHAR(1000)] - -updated_at - [TIMESTAMP] + +task_instance + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +duration + [FLOAT] + +end_date + [TIMESTAMP] + +executor_config + [BLOB] + +external_executor_id + [VARCHAR(250)] + +hostname + [VARCHAR(1000)] + +job_id + [INTEGER] + +max_tries + [INTEGER] + +next_kwargs + [JSON] + +next_method + [VARCHAR(1000)] + +operator + [VARCHAR(1000)] + +pid + [INTEGER] + +pool + [VARCHAR(256)] + NOT NULL + +pool_slots + [INTEGER] + NOT NULL + +priority_weight + [INTEGER] + +queue + [VARCHAR(256)] + +queued_by_job_id + [INTEGER] + +queued_dttm + [TIMESTAMP] + +start_date + [TIMESTAMP] + +state + [VARCHAR(20)] + +trigger_id + [INTEGER] + +trigger_timeout + [DATETIME] + +try_number + [INTEGER] + +unixname + [VARCHAR(1000)] + +updated_at + [TIMESTAMP] - + dag_run--task_instance - -0..N -{0,1} + +0..N +{0,1} - + dag_run--task_instance - -0..N -{0,1} + +0..N +{0,1} - + task_reschedule - -task_reschedule - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - NOT NULL - -duration - [INTEGER] - NOT NULL - -end_date - [TIMESTAMP] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -reschedule_date - [TIMESTAMP] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -start_date - [TIMESTAMP] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -try_number - [INTEGER] - NOT NULL + +task_reschedule + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + NOT NULL + +duration + [INTEGER] + NOT NULL + +end_date + [TIMESTAMP] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +reschedule_date + [TIMESTAMP] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +start_date + [TIMESTAMP] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +try_number + [INTEGER] + NOT NULL - + dag_run--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + dag_run--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + + +task_instance--task_instance_note + +0..N +{0,1} + + +task_instance--task_instance_note + +0..N +{0,1} + + + +task_instance--task_instance_note + +0..N +{0,1} + + + +task_instance--task_instance_note + +0..N +{0,1} + + + task_instance--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_reschedule - -0..N -{0,1} + +0..N +{0,1} - + rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - [VARCHAR(250)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - [JSON] - -rendered_fields - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + [JSON] + +rendered_fields + [JSON] + NOT NULL - + task_instance--rendered_task_instance_fields - -0..N -{0,1} + +0..N +{0,1} - + task_instance--rendered_task_instance_fields - -0..N -{0,1} + +0..N +{0,1} - + task_instance--rendered_task_instance_fields - -0..N -{0,1} + +0..N +{0,1} - + task_instance--rendered_task_instance_fields - -0..N -{0,1} + +0..N +{0,1} - + task_fail - -task_fail - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - NOT NULL - -duration - [INTEGER] - -end_date - [TIMESTAMP] - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -start_date - [TIMESTAMP] - -task_id - [VARCHAR(250)] - NOT NULL + +task_fail + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + NOT NULL + +duration + [INTEGER] + +end_date + [TIMESTAMP] + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +start_date + [TIMESTAMP] + +task_id + [VARCHAR(250)] + NOT NULL - + task_instance--task_fail - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_fail - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_fail - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_fail - -0..N -{0,1} + +0..N +{0,1} - + task_map - -task_map - -dag_id - [VARCHAR(250)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -keys - [JSON] - -length - [INTEGER] - NOT NULL + +task_map + +dag_id + [VARCHAR(250)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +keys + [JSON] + +length + [INTEGER] + NOT NULL - + task_instance--task_map - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_map - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_map - -0..N -{0,1} + +0..N +{0,1} - + task_instance--task_map - -0..N -{0,1} + +0..N +{0,1} - + xcom - -xcom - -dag_run_id - [INTEGER] - NOT NULL - -key - [VARCHAR(512)] - NOT NULL - -map_index - [INTEGER] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -dag_id - [VARCHAR(250)] - NOT NULL - -run_id - [VARCHAR(250)] - NOT NULL - -timestamp - [TIMESTAMP] - NOT NULL - -value - [BLOB] + +xcom + +dag_run_id + [INTEGER] + NOT NULL + +key + [VARCHAR(512)] + NOT NULL + +map_index + [INTEGER] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +dag_id + [VARCHAR(250)] + NOT NULL + +run_id + [VARCHAR(250)] + NOT NULL + +timestamp + [TIMESTAMP] + NOT NULL + +value + [BLOB] - + task_instance--xcom - -0..N -{0,1} + +0..N +{0,1} - + task_instance--xcom - -0..N -{0,1} + +0..N +{0,1} - + task_instance--xcom - -0..N -{0,1} + +0..N +{0,1} - + task_instance--xcom - -0..N -{0,1} + +0..N +{0,1} - + log_template - -log_template - -id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -elasticsearch_id - [TEXT] - NOT NULL - -filename - [TEXT] - NOT NULL + +log_template + +id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +elasticsearch_id + [TEXT] + NOT NULL + +filename + [TEXT] + NOT NULL - + log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} - + dataset - -dataset - -id - [INTEGER] - NOT NULL - -created_at - [TIMESTAMP] - NOT NULL - -extra - [JSON] - NOT NULL - -updated_at - [TIMESTAMP] - NOT NULL - -uri - [VARCHAR(3000)] - NOT NULL + +dataset + +id + [INTEGER] + NOT NULL + +created_at + [TIMESTAMP] + NOT NULL + +extra + [JSON] + NOT NULL + +updated_at + [TIMESTAMP] + NOT NULL + +uri + [VARCHAR(3000)] + NOT NULL - + dataset--dag_schedule_dataset_reference - -0..N -{0,1} + +0..N +{0,1} - + dataset--dataset_dag_run_queue - -0..N -{0,1} + +0..N +{0,1} - + dataset--task_outlet_dataset_reference - -0..N -{0,1} + +0..N +{0,1} - + dataset_event - -dataset_event - -id - [INTEGER] - NOT NULL - -dataset_id - [INTEGER] - NOT NULL - -extra - [JSON] - NOT NULL - -source_dag_id - [VARCHAR(250)] - -source_map_index - [INTEGER] - -source_run_id - [VARCHAR(250)] - -source_task_id - [VARCHAR(250)] - -timestamp - [TIMESTAMP] - NOT NULL + +dataset_event + +id + [INTEGER] + NOT NULL + +dataset_id + [INTEGER] + NOT NULL + +extra + [JSON] + NOT NULL + +source_dag_id + [VARCHAR(250)] + +source_map_index + [INTEGER] + +source_run_id + [VARCHAR(250)] + +source_task_id + [VARCHAR(250)] + +timestamp + [TIMESTAMP] + NOT NULL - + dataset_event--dagrun_dataset_event - -0..N -{0,1} + +0..N +{0,1} - + import_error - -import_error - -id - [INTEGER] - NOT NULL - -filename - [VARCHAR(1024)] - -stacktrace - [TEXT] - -timestamp - [TIMESTAMP] + +import_error + +id + [INTEGER] + NOT NULL + +filename + [VARCHAR(1024)] + +stacktrace + [TEXT] + +timestamp + [TIMESTAMP] - + job - -job - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - -end_date - [TIMESTAMP] - -executor_class - [VARCHAR(500)] - -hostname - [VARCHAR(500)] - -job_type - [VARCHAR(30)] - -latest_heartbeat - [TIMESTAMP] - -start_date - [TIMESTAMP] - -state - [VARCHAR(20)] - -unixname - [VARCHAR(1000)] + +job + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + +end_date + [TIMESTAMP] + +executor_class + [VARCHAR(500)] + +hostname + [VARCHAR(500)] + +job_type + [VARCHAR(30)] + +latest_heartbeat + [TIMESTAMP] + +start_date + [TIMESTAMP] + +state + [VARCHAR(20)] + +unixname + [VARCHAR(1000)] - + log - -log - -id - [INTEGER] - NOT NULL - -dag_id - [VARCHAR(250)] - -dttm - [TIMESTAMP] - -event - [VARCHAR(30)] - -execution_date - [TIMESTAMP] - -extra - [TEXT] - -map_index - [INTEGER] - -owner - [VARCHAR(500)] - -task_id - [VARCHAR(250)] + +log + +id + [INTEGER] + NOT NULL + +dag_id + [VARCHAR(250)] + +dttm + [TIMESTAMP] + +event + [VARCHAR(30)] + +execution_date + [TIMESTAMP] + +extra + [TEXT] + +map_index + [INTEGER] + +owner + [VARCHAR(500)] + +task_id + [VARCHAR(250)] - + trigger - -trigger - -id - [INTEGER] - NOT NULL - -classpath - [VARCHAR(1000)] - NOT NULL - -created_date - [TIMESTAMP] - NOT NULL - -kwargs - [JSON] - NOT NULL - -triggerer_id - [INTEGER] + +trigger + +id + [INTEGER] + NOT NULL + +classpath + [VARCHAR(1000)] + NOT NULL + +created_date + [TIMESTAMP] + NOT NULL + +kwargs + [JSON] + NOT NULL + +triggerer_id + [INTEGER] - + trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} - + serialized_dag - -serialized_dag - -dag_id - [VARCHAR(250)] - NOT NULL - -dag_hash - [VARCHAR(32)] - NOT NULL - -data - [JSON] - -data_compressed - [BLOB] - -fileloc - [VARCHAR(2000)] - NOT NULL - -fileloc_hash - [BIGINT] - NOT NULL - -last_updated - [TIMESTAMP] - NOT NULL - -processor_subdir - [VARCHAR(2000)] + +serialized_dag + +dag_id + [VARCHAR(250)] + NOT NULL + +dag_hash + [VARCHAR(32)] + NOT NULL + +data + [JSON] + +data_compressed + [BLOB] + +fileloc + [VARCHAR(2000)] + NOT NULL + +fileloc_hash + [BIGINT] + NOT NULL + +last_updated + [TIMESTAMP] + NOT NULL + +processor_subdir + [VARCHAR(2000)] - + session - -session - -id - [INTEGER] - NOT NULL - -data - [BLOB] - -expiry - [DATETIME] - -session_id - [VARCHAR(255)] + +session + +id + [INTEGER] + NOT NULL + +data + [BLOB] + +expiry + [DATETIME] + +session_id + [VARCHAR(255)] - + sla_miss - -sla_miss - -dag_id - [VARCHAR(250)] - NOT NULL - -execution_date - [TIMESTAMP] - NOT NULL - -task_id - [VARCHAR(250)] - NOT NULL - -description - [TEXT] - -email_sent - [BOOLEAN] - -notification_sent - [BOOLEAN] - -timestamp - [TIMESTAMP] + +sla_miss + +dag_id + [VARCHAR(250)] + NOT NULL + +execution_date + [TIMESTAMP] + NOT NULL + +task_id + [VARCHAR(250)] + NOT NULL + +description + [TEXT] + +email_sent + [BOOLEAN] + +notification_sent + [BOOLEAN] + +timestamp + [TIMESTAMP] - + slot_pool - -slot_pool - -id - [INTEGER] - NOT NULL - -description - [TEXT] - -pool - [VARCHAR(256)] - -slots - [INTEGER] + +slot_pool + +id + [INTEGER] + NOT NULL + +description + [TEXT] + +pool + [VARCHAR(256)] + +slots + [INTEGER] - + variable - -variable - -id - [INTEGER] - NOT NULL - -description - [TEXT] - -is_encrypted - [BOOLEAN] - -key - [VARCHAR(250)] - -val - [TEXT] + +variable + +id + [INTEGER] + NOT NULL + +description + [TEXT] + +is_encrypted + [BOOLEAN] + +key + [VARCHAR(250)] + +val + [TEXT] diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 77a95c0edb75e..5d243bbf30ffc 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,7 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``65a852f26899`` (head) | ``ee8d93fcc81e`` | ``2.5.0`` | Add user comment to task_instance and dag_run. | +| ``1986afd32c1b`` (head) | ``ee8d93fcc81e`` | ``2.5.0`` | Add DagRunNote and TaskInstanceNote | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``ee8d93fcc81e`` | ``e07f49787c9d`` | ``2.5.0`` | Add updated_at column to DagRun and TaskInstance | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py index 2f1ead7561929..70188ba5e1cfe 100644 --- a/tests/api/client/test_local_client.py +++ b/tests/api/client/test_local_client.py @@ -84,7 +84,6 @@ def test_trigger_dag(self, mock): external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, - notes=None, ) mock.reset_mock() @@ -98,7 +97,6 @@ def test_trigger_dag(self, mock): external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, - notes=None, ) mock.reset_mock() @@ -113,7 +111,6 @@ def test_trigger_dag(self, mock): external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, - notes=None, ) mock.reset_mock() @@ -128,7 +125,6 @@ def test_trigger_dag(self, mock): external_trigger=True, dag_hash=expected_dag_hash, data_interval=expected_data_interval, - notes=None, ) mock.reset_mock() diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 5e652723e6c92..fe0d144f47cd1 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -1421,7 +1421,7 @@ def test_should_respond_200(self, dag_maker, session): "execution_date": dr.execution_date.isoformat(), "external_trigger": False, "logical_date": dr.logical_date.isoformat(), - "start_date": dr.logical_date.isoformat(), + "start_date": None, "state": "queued", "data_interval_start": dr.data_interval_start.isoformat(), "data_interval_end": dr.data_interval_end.isoformat(), @@ -1631,6 +1631,7 @@ def test_should_respond_200(self, dag_maker, session): "run_type": dr.run_type, "notes": new_notes_value, } + assert dr.dag_run_note.user_id is not None def test_should_raises_401_unauthenticated(self, session): response = self.client.patch( diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 8cbd1e6ecbc70..4eef4c841215c 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -380,18 +380,13 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): def test_should_respond_200_mapped_task_instance_with_rtif(self, session): """Verify we don't duplicate rows through join to RTIF""" tis = self.create_task_instances(session) - session.query() - ti = tis[0] - ti.map_index = 1 - rendered_fields = RTIF(ti, render_templates=False) - session.add(rendered_fields) - session.commit() - new_ti = TaskInstance(task=ti.task, run_id=ti.run_id, map_index=2) - for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "notes"]: - setattr(new_ti, attr, getattr(ti, attr)) - session.add(new_ti) - rendered_fields = RTIF(new_ti, render_templates=False) - session.add(rendered_fields) + old_ti = tis[0] + for idx in (1, 2): + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) + ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) + for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "notes"]: + setattr(ti, attr, getattr(old_ti, attr)) + session.add(ti) session.commit() # in each loop, we should get the right mapped TI back @@ -1688,15 +1683,12 @@ def test_should_update_task_instance_state(self, session): assert response2.json["state"] == NEW_STATE def test_should_update_mapped_task_instance_state(self, session): - NEW_STATE = "failed" map_index = 1 - tis = self.create_task_instances(session) - ti = tis[0] - ti.map_index = map_index - rendered_fields = RTIF(ti, render_templates=False) - session.add(rendered_fields) + ti = TaskInstance(task=tis[0].task, run_id=tis[0].run_id, map_index=map_index) + ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) + session.add(ti) session.commit() self.client.patch( @@ -1823,7 +1815,7 @@ def teardown_method(self): @provide_session def test_should_respond_200(self, session): - self.create_task_instances(session) + tis = self.create_task_instances(session) new_notes_value = "My super cool TaskInstance notes." response = self.client.patch( "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/" @@ -1860,22 +1852,19 @@ def test_should_respond_200(self, session): "trigger": None, "triggerer_job": None, } + ti = tis[0] + assert ti.task_instance_note.user_id is not None def test_should_respond_200_mapped_task_instance_with_rtif(self, session): """Verify we don't duplicate rows through join to RTIF""" tis = self.create_task_instances(session) - session.query() - ti = tis[0] - ti.map_index = 1 - rendered_fields = RTIF(ti, render_templates=False) - session.add(rendered_fields) - session.commit() - new_ti = TaskInstance(task=ti.task, run_id=ti.run_id, map_index=2) - for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "notes"]: - setattr(new_ti, attr, getattr(ti, attr)) - session.add(new_ti) - rendered_fields = RTIF(new_ti, render_templates=False) - session.add(rendered_fields) + old_ti = tis[0] + for idx in (1, 2): + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) + ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) + for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "notes"]: + setattr(ti, attr, getattr(old_ti, attr)) + session.add(ti) session.commit() # in each loop, we should get the right mapped TI back diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py b/tests/api_connexion/schemas/test_dag_run_schema.py index 0419a0396cefd..a79983601c406 100644 --- a/tests/api_connexion/schemas/test_dag_run_schema.py +++ b/tests/api_connexion/schemas/test_dag_run_schema.py @@ -57,7 +57,6 @@ def test_serialize(self, session): execution_date=timezone.parse(self.default_time), start_date=timezone.parse(self.default_time), conf='{"start": "stop"}', - notes="my notes", ) session.add(dagrun_model) session.commit() @@ -78,7 +77,7 @@ def test_serialize(self, session): "data_interval_start": None, "last_scheduling_decision": None, "run_type": "manual", - "notes": "my notes", + "notes": None, } @pytest.mark.parametrize( @@ -142,7 +141,6 @@ def test_serialize(self, session): run_type=DagRunType.MANUAL.value, start_date=timezone.parse(self.default_time), conf='{"start": "stop"}', - notes="Notes for first", ) dagrun_model_2 = DagRun( dag_id="my-dag-run", @@ -151,7 +149,6 @@ def test_serialize(self, session): execution_date=timezone.parse(self.second_time), start_date=timezone.parse(self.default_time), run_type=DagRunType.MANUAL.value, - notes="Notes for second", ) dagruns = [dagrun_model_1, dagrun_model_2] session.add_all(dagruns) @@ -174,7 +171,7 @@ def test_serialize(self, session): "data_interval_start": None, "last_scheduling_decision": None, "run_type": "manual", - "notes": "Notes for first", + "notes": None, }, { "dag_id": "my-dag-run", @@ -190,7 +187,7 @@ def test_serialize(self, session): "data_interval_start": None, "last_scheduling_decision": None, "run_type": "manual", - "notes": "Notes for second", + "notes": None, }, ], "total_entries": 2, diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index b2f4f9d4deb64..8f4fb47881e2e 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2778,7 +2778,6 @@ def test_refresh_from_db(self, create_task_instance): "next_kwargs": None, "next_method": None, "updated_at": None, - "notes": None, } # Make sure we aren't missing any new value in our expected_values list. expected_keys = {f"task_instance.{key.lstrip('_')}" for key in expected_values} diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index a52f185d99354..fdaa7263b2e94 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -148,18 +148,6 @@ def test_trigger_dagrun_with_execution_date(self): assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_execution_date) self.assert_extra_link(dagrun, task, session) - def test_trigger_dagrun_with_custom_note(self): - notes_value = "Custom note for newly created DagRun." - task = TriggerDagRunOperator( - task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, dag=self.dag, dag_run_notes=notes_value - ) - task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - with create_session() as session: - dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one() - assert dagrun.external_trigger - assert dagrun.notes == notes_value - def test_trigger_dagrun_twice(self): """Test TriggerDagRunOperator with custom execution_date.""" utc_now = timezone.utcnow() diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index 07a1c5c5f1969..422bec37f3f67 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -255,6 +255,7 @@ def test_no_models_missing(self): with suppress(AttributeError): all_models.update({class_.__tablename__: class_}) exclusion_list = { + "ab_user", "variable", # leave alone "dataset", # not good way to know if "stale" "trigger", # self-maintaining @@ -272,6 +273,8 @@ def test_no_models_missing(self): "task_outlet_dataset_reference", # leave alone for now "dataset_dag_run_queue", # self-managed "dataset_event_dag_run", # foreign keys + "task_instance_note", # foreign keys + "dag_run_note", # foreign keys } from airflow.utils.db_cleanup import config_dict diff --git a/tests/www/views/test_views_dagrun.py b/tests/www/views/test_views_dagrun.py index fcdd80717f7f7..504ee7b09b956 100644 --- a/tests/www/views/test_views_dagrun.py +++ b/tests/www/views/test_views_dagrun.py @@ -218,7 +218,7 @@ def test_muldelete_dag_runs_action(session, admin_client, running_dag_run): follow_redirects=True, ) assert resp.status_code == 200 - assert session.query(TaskInstance).count() == 0 # Deletes associated TIs. + assert session.query(TaskInstance).count() == 0 # associated TIs are deleted assert session.query(DagRun).filter(DagRun.id == dag_run_id).count() == 0 diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index c4cee01a26406..4db2705070a3c 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -1017,7 +1017,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool", @@ -1048,7 +1047,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool", @@ -1079,7 +1077,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "EmptyOperator", "pid": None, "pool": "default_pool", @@ -1110,7 +1107,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool", @@ -1141,7 +1137,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool", @@ -1172,7 +1167,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool", @@ -1203,7 +1197,6 @@ def test_task_instances(admin_client): "max_tries": 0, "next_kwargs": None, "next_method": None, - "notes": None, "operator": "BashOperator", "pid": None, "pool": "default_pool",