Skip to content

Commit

Permalink
[TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job schedu…
Browse files Browse the repository at this point in the history
…ling without explicit_defaults_for_timestamp.
  • Loading branch information
kwlzn authored and aoen committed Oct 5, 2018
1 parent 94a46f4 commit fb64f2e
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 15 deletions.
6 changes: 6 additions & 0 deletions .arcconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"arc.feature.start.default": "origin/twtr_rb_1.10.0",
"arc.land.onto.default": "twtr_rb_1.10.0",
"base": "git:merge-base(origin/twtr_rb_1.10.0), arc:amended, arc:prompt",
"history.immutable": false
}
20 changes: 20 additions & 0 deletions README_TWITTER.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Developing locally

Here are some steps to develop this dependency locally and interact with source, interpreted from
https://confluence.twitter.biz/display/ENG/Overview%3A+Python+3rdparty+in+Source

1. Create a git branch for this change.
2. Edit `airflow/version.py` to change the version.
3. Edit `source/3rdparty/python/BUILD` with the corresponding version.
4. Run the command `python2.7 setup.py bdist_wheel` in the `airflow` directory to build the wheel.
It will be written to `airflow/dist`.
5. Clean out the pex cache: `rm -rf ~/.pex ~/.cache/pants`.
6. Run `ps aux | grep pantsd` to find the pid of the pantsd process.
7. Run `kill $pid` where `$pid` is the the pid just observed.
8. From the `source` directory, run `./pants clean-all`.
9. Now here are the hacky parts. The `run-local.sh` and `run-aurora.sh` all run pants commands
without the option `--python-repos-repos`. You can either edit these to include this option,
or run a pants command that includes it, which will cache the local artifact you need, e.g.
`./pants test airflow:: --python-repos-repos="['file:///path/to/airflow/dist/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/wheels/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/bootstrap/','https://science-binaries.local.twitter.com/home/third_party/source/python/sources/']"`
10. Now you can start up airflow instances as usual with the newly built wheel!
11. See the above link for `Adding Dependencies to science-libraries`.
7 changes: 6 additions & 1 deletion airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,11 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instanc
# actually enqueue them
for task_instance in task_instances:
simple_dag = simple_dag_bag.get_dag(task_instance.dag_id)

path = simple_dag.full_filepath
if path.startswith(settings.DAGS_FOLDER):
path = path.replace(settings.DAGS_FOLDER, "DAGS_FOLDER", 1)

command = " ".join(TI.generate_command(
task_instance.dag_id,
task_instance.task_id,
Expand All @@ -1328,7 +1333,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instanc
ignore_task_deps=False,
ignore_ti_state=False,
pool=task_instance.pool,
file_path=simple_dag.full_filepath,
file_path=path,
pickle_id=simple_dag.pickle_id))

priority = task_instance.priority_weight
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,23 @@

from alembic import op
from sqlalchemy.dialects import mysql
from sqlalchemy import text
import sqlalchemy as sa


def upgrade():
conn = op.get_bind()
if conn.dialect.name == 'mysql':
conn.execute("SET time_zone = '+00:00'")
cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
res = cur.fetchall()
if res[0][0] == 0:
raise Exception("Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql")
# @awilcox July 2018
# we only need to worry about explicit_defaults_for_timestamp if we have
# DATETIME columns that are NOT explicitly declared with NULL
# ... and we don't, all are explicit

# cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
# res = cur.fetchall()
# if res[0][0] == 0:
# raise Exception("Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql")

op.alter_column(table_name='chart', column_name='last_modified', type_=mysql.TIMESTAMP(fsp=6))

Expand All @@ -53,7 +59,9 @@ def upgrade():

op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
# NOTE(kwilson): See below.
op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6),
nullable=False, server_default=text('CURRENT_TIMESTAMP(6)'))
op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))

Expand All @@ -76,7 +84,29 @@ def upgrade():
op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), nullable=False)
# NOTE(kwilson)
#
# N.B. Here (and above) we explicitly set a default to the string literal `CURRENT_TIMESTAMP(6)` to avoid the
# default MySQL behavior for TIMESTAMP without `explicit_defaults_for_timestamp` turned on as stated here:
#
# "The first TIMESTAMP column in a table, if not explicitly declared with the NULL attribute or an explicit
# DEFAULT or ON UPDATE attribute, is automatically declared with the DEFAULT CURRENT_TIMESTAMP and
# ON UPDATE CURRENT_TIMESTAMP attributes." [0]
#
# Because of the "ON UPDATE CURRENT_TIMESTAMP" default, anytime the `task_instance` table is UPDATE'd without
# explicitly re-passing the current value for the `execution_date` column, it will end up getting clobbered with
# the current timestamp value which breaks `dag_run` <-> `task_instance` alignment and causes all sorts of
# scheduler and DB integrity breakage (because `execution_date` is part of the primary key).
#
# We unfortunately cannot turn `explicit_defaults_for_timestamp` on globally ourselves as is now technically
# required by Airflow [1], because this has to be set in the my.cnf and we don't control that in managed MySQL.
# A request to enable this fleet-wide has been made in MVP-18609.
#
# [0]: https://dev.mysql.com/doc/refman/5.6/en/server-system-variables.html#sysvar_explicit_defaults_for_timestamp
# [1]: https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#mysql-setting-required

op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6),
nullable=False, server_default=text('CURRENT_TIMESTAMP(6)'))
op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.TIMESTAMP(fsp=6))
Expand Down
15 changes: 8 additions & 7 deletions airflow/security/kerberos.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ def renew_from_kt():
sys.exit(subp.returncode)

global NEED_KRB181_WORKAROUND
if NEED_KRB181_WORKAROUND is None:
NEED_KRB181_WORKAROUND = detect_conf_var()
if NEED_KRB181_WORKAROUND:
# (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we
# renew the ticket after the initial valid time.
time.sleep(1.5)
perform_krb181_workaround()
# This breaks for twitter as we dont issue renewable tickets
# if NEED_KRB181_WORKAROUND is None:
# NEED_KRB181_WORKAROUND = detect_conf_var()
# if NEED_KRB181_WORKAROUND:
# # (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we
# # renew the ticket after the initial valid time.
# time.sleep(1.5)
# perform_krb181_workaround()


def perform_krb181_workaround():
Expand Down
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
# under the License.
#

version = '1.10.0'
version = '1.10.0+twtr5'

0 comments on commit fb64f2e

Please sign in to comment.