From 506d8afb4f5d5e5eb3497295cf25ad0b5da1a64f Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Wed, 13 Apr 2022 14:14:44 +0530 Subject: [PATCH 1/6] Handle invalid date from query parameters in views. --- airflow/www/views.py | 89 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 76 insertions(+), 13 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index b79160d233ed6..8b6de323758b0 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -187,13 +187,19 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag): """Get Execution Data, Base Date & Number of runs from a Request""" date_time = www_request.args.get('execution_date') if date_time: - date_time = timezone.parse(date_time) + try: + date_time = timezone.parse(date_time) + except ParserError: + return {"error": "Invalid execution_date"} else: date_time = dag.get_latest_execution_date(session=session) or timezone.utcnow() base_date = www_request.args.get('base_date') if base_date: - base_date = timezone.parse(base_date) + try: + base_date = timezone.parse(base_date) + except ParserError: + return {"error": "Invalid base_date"} else: # The DateTimeField widget truncates milliseconds and would loose # the first dag run. Round to next second. @@ -1256,7 +1262,12 @@ def rendered_templates(self, session): task_id = request.args.get('task_id') map_index = request.args.get('map_index', -1, type=int) execution_date = request.args.get('execution_date') - dttm = timezone.parse(execution_date) + + try: + dttm = timezone.parse(execution_date) + except ParserError: + return Response("Invalid execution_date", mimetype="text/plain", status=400) + form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') @@ -1357,7 +1368,12 @@ def rendered_k8s(self, session: Session = NEW_SESSION): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') - dttm = timezone.parse(execution_date) + + try: + dttm = timezone.parse(execution_date) + except ParserError: + return Response("Invalid execution_date", mimetype="text/plain", status=400) + form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') map_index = request.args.get('map_index', -1, type=int) @@ -1506,7 +1522,15 @@ def log(self, session=None): task_id = request.args.get('task_id') map_index = request.args.get('map_index', -1, type=int) execution_date = request.args.get('execution_date') - dttm = timezone.parse(execution_date) if execution_date else None + + if execution_date: + try: + dttm = timezone.parse(execution_date) + except ParserError: + return Response("Invalid execution_date", mimetype="text/plain", status=400) + else: + dttm = None + form = DateTimeForm(data={'execution_date': dttm}) dag_model = DagModel.get_dagmodel(dag_id) @@ -1553,7 +1577,12 @@ def redirect_to_external_log(self, session=None): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') - dttm = timezone.parse(execution_date) + + try: + dttm = timezone.parse(execution_date) + except ParserError: + return Response("Invalid execution_date", mimetype="text/plain", status=400) + map_index = request.args.get('map_index', -1, type=int) try_number = request.args.get('try_number', 1) @@ -1590,7 +1619,12 @@ def task(self, session): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') - dttm = timezone.parse(execution_date) + + try: + dttm = timezone.parse(execution_date) + except ParserError: + return Response("Invalid execution_date", mimetype="text/plain", status=400) + map_index = request.args.get('map_index', -1, type=int) form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') @@ -1722,7 +1756,11 @@ def xcom(self, session=None): # Carrying execution_date through, even though it's irrelevant for # this context execution_date = request.args.get('execution_date') - dttm = timezone.parse(execution_date) + try: + dttm = timezone.parse(execution_date) + except ParserError: + return Response("Invalid execution_date", mimetype="text/plain", status=400) + form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') dag = DagModel.get_dagmodel(dag_id) @@ -2064,7 +2102,12 @@ def clear(self): map_indexes = request.form.getlist('map_index', type=int) execution_date = request.form.get('execution_date') - execution_date = timezone.parse(execution_date) + + try: + execution_date = timezone.parse(execution_date) + except ParserError: + return Response("Invalid execution_date", mimetype="text/plain", status=400) + confirmed = request.form.get('confirmed') == "true" upstream = request.form.get('upstream') == "true" downstream = request.form.get('downstream') == "true" @@ -2834,6 +2877,10 @@ def graph(self, dag_id, session=None): edges = dag_edges(dag) dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag) + + if "error" in dt_nr_dr_data: + return Response(dt_nr_dr_data["error"], mimetype="text/plain", status=400) + dt_nr_dr_data['arrange'] = arrange dttm = dt_nr_dr_data['dttm'] dag_run = dag.get_dagrun(execution_date=dttm) @@ -2940,7 +2987,10 @@ def duration(self, dag_id, session=None): num_runs = request.args.get('num_runs', default=default_dag_run, type=int) if base_date: - base_date = timezone.parse(base_date) + try: + base_date = timezone.parse(base_date) + except ParserError: + return Response("Invalid base_date", mimetype="text/plain", status=400) else: base_date = dag.get_latest_execution_date() or timezone.utcnow() @@ -3087,7 +3137,10 @@ def tries(self, dag_id, session=None): num_runs = request.args.get('num_runs', default=default_dag_run, type=int) if base_date: - base_date = timezone.parse(base_date) + try: + base_date = timezone.parse(base_date) + except ParserError: + return Response("Invalid base_date", mimetype="text/plain", status=400) else: base_date = dag.get_latest_execution_date() or timezone.utcnow() @@ -3177,7 +3230,10 @@ def landing_times(self, dag_id, session=None): num_runs = request.args.get('num_runs', default=default_dag_run, type=int) if base_date: - base_date = timezone.parse(base_date) + try: + base_date = timezone.parse(base_date) + except ParserError: + return Response("Invalid base_date", mimetype="text/plain", status=400) else: base_date = dag.get_latest_execution_date() or timezone.utcnow() @@ -3298,6 +3354,10 @@ def gantt(self, dag_id, session=None): wwwutils.check_import_errors(dag.fileloc, session) dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag) + + if "error" in dt_nr_dr_data: + return Response(dt_nr_dr_data["error"], mimetype="text/plain", status=400) + dttm = dt_nr_dr_data['dttm'] dag_run = dag.get_dagrun(execution_date=dttm) dag_run_id = dag_run.run_id if dag_run else None @@ -3466,7 +3526,10 @@ def task_instances(self): dttm = request.args.get('execution_date') if dttm: - dttm = timezone.parse(dttm) + try: + dttm = timezone.parse(dttm) + except ParserError: + return "Error: Invalid execution_date" else: response = jsonify({'error': f"Invalid execution_date {dttm}"}) response.status_code = 400 From 4620d51cbcdd6012b60c155417d4e21dc7278810 Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Wed, 13 Apr 2022 15:15:32 +0530 Subject: [PATCH 2/6] Add tests. --- airflow/www/views.py | 2 +- tests/www/views/test_views.py | 53 +++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index 8b6de323758b0..bf6944db5f5f0 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2105,7 +2105,7 @@ def clear(self): try: execution_date = timezone.parse(execution_date) - except ParserError: + except (TypeError, ParserError): return Response("Invalid execution_date", mimetype="text/plain", status=400) confirmed = request.form.get('confirmed') == "true" diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py index c7900d64fd949..7ae7cdba806f3 100644 --- a/tests/www/views/test_views.py +++ b/tests/www/views/test_views.py @@ -373,3 +373,56 @@ def test_get_task_stats_from_query(): data = get_task_stats_from_query(query_data) assert data == expected_data + + +@pytest.mark.parametrize( + "url, content", + [ + ( + '/rendered-templates?execution_date=invalid', + "Invalid execution_date", + ), + ( + '/log?execution_date=invalid', + "Invalid execution_date", + ), + ( + '/redirect_to_external_log?execution_date=invalid', + "Invalid execution_date", + ), + ( + '/task?execution_date=invalid', + "Invalid execution_date", + ), + ( + 'dags/example_bash_operator/graph?execution_date=invalid', + "Invalid execution_date", + ), + ( + 'dags/example_bash_operator/graph?execution_date=invalid', + "Invalid execution_date", + ), + ( + 'dags/example_bash_operator/duration?base_date=invalid', + "Invalid base_date", + ), + ( + 'dags/example_bash_operator/tries?base_date=invalid', + "Invalid base_date", + ), + ( + 'dags/example_bash_operator/landing-times?base_date=invalid', + "Invalid base_date", + ), + ( + 'dags/example_bash_operator/gantt?execution_date=invalid', + "Invalid execution_date", + ), + ], +) +def test_invalid_dates(app, admin_client, url, content): + """Test invalid date format doesn't crash page.""" + resp = admin_client.get(url, follow_redirects=True) + + assert resp.status_code == 400 + assert resp.get_data().decode() == content From 6e68016a513de54772e4aeaaa67d184436adc963 Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Fri, 22 Apr 2022 14:50:32 +0530 Subject: [PATCH 3/6] Use common parsing helper. --- airflow/www/views.py | 73 +++++++++-------------------------- tests/www/views/test_views.py | 22 +++++------ 2 files changed, 30 insertions(+), 65 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index bf6944db5f5f0..f3c79f66a2550 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -187,19 +187,13 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag): """Get Execution Data, Base Date & Number of runs from a Request""" date_time = www_request.args.get('execution_date') if date_time: - try: - date_time = timezone.parse(date_time) - except ParserError: - return {"error": "Invalid execution_date"} + date_time = _safe_parse_datetime(date_time) else: date_time = dag.get_latest_execution_date(session=session) or timezone.utcnow() base_date = www_request.args.get('base_date') if base_date: - try: - base_date = timezone.parse(base_date) - except ParserError: - return {"error": "Invalid base_date"} + base_date = _safe_parse_datetime(base_date) else: # The DateTimeField widget truncates milliseconds and would loose # the first dag run. Round to next second. @@ -248,6 +242,14 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag): } +def _safe_parse_datetime(v: str): + """Parse datetime and return error message for invalid dates""" + try: + return timezone.parse(v) + except ParserError: + abort(400, f"Invalid datetime: {v!r}") + + def task_group_to_grid(task_item_or_group, dag, dag_runs, tis, session): """ Create a nested dict representation of this TaskGroup and its children used to construct @@ -1262,12 +1264,7 @@ def rendered_templates(self, session): task_id = request.args.get('task_id') map_index = request.args.get('map_index', -1, type=int) execution_date = request.args.get('execution_date') - - try: - dttm = timezone.parse(execution_date) - except ParserError: - return Response("Invalid execution_date", mimetype="text/plain", status=400) - + dttm = _safe_parse_datetime(execution_date) form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') @@ -1368,11 +1365,7 @@ def rendered_k8s(self, session: Session = NEW_SESSION): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') - - try: - dttm = timezone.parse(execution_date) - except ParserError: - return Response("Invalid execution_date", mimetype="text/plain", status=400) + dttm = _safe_parse_datetime(execution_date) form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') @@ -1524,10 +1517,7 @@ def log(self, session=None): execution_date = request.args.get('execution_date') if execution_date: - try: - dttm = timezone.parse(execution_date) - except ParserError: - return Response("Invalid execution_date", mimetype="text/plain", status=400) + dttm = _safe_parse_datetime(execution_date) else: dttm = None @@ -1577,12 +1567,7 @@ def redirect_to_external_log(self, session=None): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') - - try: - dttm = timezone.parse(execution_date) - except ParserError: - return Response("Invalid execution_date", mimetype="text/plain", status=400) - + dttm = _safe_parse_datetime(execution_date) map_index = request.args.get('map_index', -1, type=int) try_number = request.args.get('try_number', 1) @@ -1619,12 +1604,7 @@ def task(self, session): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') - - try: - dttm = timezone.parse(execution_date) - except ParserError: - return Response("Invalid execution_date", mimetype="text/plain", status=400) - + dttm = _safe_parse_datetime(execution_date) map_index = request.args.get('map_index', -1, type=int) form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') @@ -1756,10 +1736,7 @@ def xcom(self, session=None): # Carrying execution_date through, even though it's irrelevant for # this context execution_date = request.args.get('execution_date') - try: - dttm = timezone.parse(execution_date) - except ParserError: - return Response("Invalid execution_date", mimetype="text/plain", status=400) + dttm = _safe_parse_datetime(execution_date) form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') @@ -2878,9 +2855,6 @@ def graph(self, dag_id, session=None): dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag) - if "error" in dt_nr_dr_data: - return Response(dt_nr_dr_data["error"], mimetype="text/plain", status=400) - dt_nr_dr_data['arrange'] = arrange dttm = dt_nr_dr_data['dttm'] dag_run = dag.get_dagrun(execution_date=dttm) @@ -2987,10 +2961,7 @@ def duration(self, dag_id, session=None): num_runs = request.args.get('num_runs', default=default_dag_run, type=int) if base_date: - try: - base_date = timezone.parse(base_date) - except ParserError: - return Response("Invalid base_date", mimetype="text/plain", status=400) + base_date = _safe_parse_datetime(base_date) else: base_date = dag.get_latest_execution_date() or timezone.utcnow() @@ -3137,10 +3108,7 @@ def tries(self, dag_id, session=None): num_runs = request.args.get('num_runs', default=default_dag_run, type=int) if base_date: - try: - base_date = timezone.parse(base_date) - except ParserError: - return Response("Invalid base_date", mimetype="text/plain", status=400) + base_date = _safe_parse_datetime(base_date) else: base_date = dag.get_latest_execution_date() or timezone.utcnow() @@ -3230,10 +3198,7 @@ def landing_times(self, dag_id, session=None): num_runs = request.args.get('num_runs', default=default_dag_run, type=int) if base_date: - try: - base_date = timezone.parse(base_date) - except ParserError: - return Response("Invalid base_date", mimetype="text/plain", status=400) + base_date = _safe_parse_datetime(base_date) else: base_date = dag.get_latest_execution_date() or timezone.utcnow() diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py index 7ae7cdba806f3..300b23d0323bd 100644 --- a/tests/www/views/test_views.py +++ b/tests/www/views/test_views.py @@ -380,43 +380,43 @@ def test_get_task_stats_from_query(): [ ( '/rendered-templates?execution_date=invalid', - "Invalid execution_date", + "Invalid datetime: 'invalid'", ), ( '/log?execution_date=invalid', - "Invalid execution_date", + "Invalid datetime: 'invalid'", ), ( '/redirect_to_external_log?execution_date=invalid', - "Invalid execution_date", + "Invalid datetime: 'invalid'", ), ( '/task?execution_date=invalid', - "Invalid execution_date", + "Invalid datetime: 'invalid'", ), ( 'dags/example_bash_operator/graph?execution_date=invalid', - "Invalid execution_date", + "Invalid datetime: 'invalid'", ), ( 'dags/example_bash_operator/graph?execution_date=invalid', - "Invalid execution_date", + "Invalid datetime: 'invalid'", ), ( 'dags/example_bash_operator/duration?base_date=invalid', - "Invalid base_date", + "Invalid datetime: 'invalid'", ), ( 'dags/example_bash_operator/tries?base_date=invalid', - "Invalid base_date", + "Invalid datetime: 'invalid'", ), ( 'dags/example_bash_operator/landing-times?base_date=invalid', - "Invalid base_date", + "Invalid datetime: 'invalid'", ), ( 'dags/example_bash_operator/gantt?execution_date=invalid', - "Invalid execution_date", + "Invalid datetime: 'invalid'", ), ], ) @@ -425,4 +425,4 @@ def test_invalid_dates(app, admin_client, url, content): resp = admin_client.get(url, follow_redirects=True) assert resp.status_code == 400 - assert resp.get_data().decode() == content + assert content in resp.get_data().decode() From 36754a043ddc7645483617a53b6cd4b0c084e747 Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Mon, 25 Apr 2022 15:23:07 +0530 Subject: [PATCH 4/6] Add type hint. --- airflow/www/views.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index f3c79f66a2550..8f6aa7df4cee7 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -242,11 +242,11 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag): } -def _safe_parse_datetime(v: str): +def _safe_parse_datetime(v): """Parse datetime and return error message for invalid dates""" try: return timezone.parse(v) - except ParserError: + except (TypeError, ParserError): abort(400, f"Invalid datetime: {v!r}") @@ -2079,12 +2079,7 @@ def clear(self): map_indexes = request.form.getlist('map_index', type=int) execution_date = request.form.get('execution_date') - - try: - execution_date = timezone.parse(execution_date) - except (TypeError, ParserError): - return Response("Invalid execution_date", mimetype="text/plain", status=400) - + execution_date = _safe_parse_datetime(execution_date) confirmed = request.form.get('confirmed') == "true" upstream = request.form.get('upstream') == "true" downstream = request.form.get('downstream') == "true" @@ -2615,7 +2610,7 @@ def grid(self, dag_id, session=None): num_runs = conf.getint('webserver', 'default_dag_run_display_number') try: - base_date = timezone.parse(request.args["base_date"]) + base_date = _safe_parse_datetime(request.args["base_date"]) except (KeyError, ValueError): base_date = dag.get_latest_execution_date() or timezone.utcnow() @@ -3491,10 +3486,7 @@ def task_instances(self): dttm = request.args.get('execution_date') if dttm: - try: - dttm = timezone.parse(dttm) - except ParserError: - return "Error: Invalid execution_date" + dttm = _safe_parse_datetime(dttm) else: response = jsonify({'error': f"Invalid execution_date {dttm}"}) response.status_code = 400 From 2f48271500cc8a4700efe87a57b6546a2b7c4b2c Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Mon, 25 Apr 2022 15:33:54 +0530 Subject: [PATCH 5/6] Remove unwanted error check. --- airflow/www/views.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index 8f6aa7df4cee7..14a572ad3f2f4 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3314,10 +3314,6 @@ def gantt(self, dag_id, session=None): wwwutils.check_import_errors(dag.fileloc, session) dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag) - - if "error" in dt_nr_dr_data: - return Response(dt_nr_dr_data["error"], mimetype="text/plain", status=400) - dttm = dt_nr_dr_data['dttm'] dag_run = dag.get_dagrun(execution_date=dttm) dag_run_id = dag_run.run_id if dag_run else None From a8df8ffd2fd3ffc49263e22e829bb642197d19b4 Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Mon, 25 Apr 2022 19:53:32 +0530 Subject: [PATCH 6/6] Fix extra_links endpoint. --- airflow/www/views.py | 2 +- tests/www/views/test_views.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index 14a572ad3f2f4..f89dda378da58 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3427,7 +3427,7 @@ def extra_links(self, session: "Session" = NEW_SESSION): map_index = request.args.get('map_index', -1, type=int) execution_date = request.args.get('execution_date') link_name = request.args.get('link_name') - dttm = timezone.parse(execution_date) + dttm = _safe_parse_datetime(execution_date) dag = current_app.dag_bag.get_dag(dag_id) if not dag or task_id not in dag.task_ids: diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py index 300b23d0323bd..887bd4898a0a6 100644 --- a/tests/www/views/test_views.py +++ b/tests/www/views/test_views.py @@ -418,6 +418,10 @@ def test_get_task_stats_from_query(): 'dags/example_bash_operator/gantt?execution_date=invalid', "Invalid datetime: 'invalid'", ), + ( + 'extra_links?execution_date=invalid', + "Invalid datetime: 'invalid'", + ), ], ) def test_invalid_dates(app, admin_client, url, content):