From 0b8f111b7ed0be30ef842b4814495d1838f85feb Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 19 Sep 2024 15:47:04 +0100 Subject: [PATCH 01/13] first pass: microbatch --- .../materializations/incremental/strategies.sql | 3 +++ .../incremental_strategies/test_microbatch.py | 17 +++++++++++++++++ 2 files changed, 20 insertions(+) create mode 100644 tests/functional/adapter/incremental_strategies/test_microbatch.py diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql index eeb920493..019dad452 100644 --- a/dbt/include/spark/macros/materializations/incremental/strategies.sql +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -75,6 +75,9 @@ {%- elif strategy == 'insert_overwrite' -%} {#-- insert statements don't like CTEs, so support them via a temp view #} {{ get_insert_overwrite_sql(source, target, existing) }} + {%- elif strategy == 'microbatch' -%} + {#-- insert statements don't like CTEs, so support them via a temp view #} + {{ get_insert_overwrite_sql(source, target, existing) }} {%- elif strategy == 'merge' -%} {#-- merge all columns for datasources which implement MERGE INTO (e.g. databricks, iceberg) - schema changes are handled for us #} {{ get_merge_sql(target, source, unique_key, dest_columns=none, incremental_predicates=incremental_predicates) }} diff --git a/tests/functional/adapter/incremental_strategies/test_microbatch.py b/tests/functional/adapter/incremental_strategies/test_microbatch.py new file mode 100644 index 000000000..7806b0299 --- /dev/null +++ b/tests/functional/adapter/incremental_strategies/test_microbatch.py @@ -0,0 +1,17 @@ +import pytest + +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, +) + +# No requirement for a unique_id for spark microbatch! +_microbatch_model_no_unique_id_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + + +class TestMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + return _microbatch_model_no_unique_id_sql From 82d98de13a36b9af58bcf1df38dd41e4680ae8b6 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 23 Sep 2024 13:19:04 +0100 Subject: [PATCH 02/13] make microbatch valid strategy --- .../spark/macros/materializations/incremental/validate.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbt/include/spark/macros/materializations/incremental/validate.sql b/dbt/include/spark/macros/materializations/incremental/validate.sql index 0d4c4d8b6..4a1ac9943 100644 --- a/dbt/include/spark/macros/materializations/incremental/validate.sql +++ b/dbt/include/spark/macros/materializations/incremental/validate.sql @@ -21,7 +21,7 @@ {% set invalid_strategy_msg -%} Invalid incremental strategy provided: {{ raw_strategy }} - Expected one of: 'append', 'merge', 'insert_overwrite' + Expected one of: 'append', 'merge', 'insert_overwrite', 'microbatch' {%- endset %} {% set invalid_merge_msg -%} @@ -35,13 +35,13 @@ Use the 'append' or 'merge' strategy instead {%- endset %} - {% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %} + {% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'microbatch'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} {% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %} {% do exceptions.raise_compiler_error(invalid_merge_msg) %} {% endif %} - {% if raw_strategy == 'insert_overwrite' and target.endpoint %} + {% if raw_strategy in ['insert_overwrite', 'microbatch'] and target.endpoint %} {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} {% endif %} {% endif %} From 8da7bdaf137ad990682b7b74b511fbcf950308d7 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 21:16:48 +0100 Subject: [PATCH 03/13] skip microbatch testing on profiles that don't support insert_overwrite --- .../adapter/incremental_strategies/test_microbatch.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/functional/adapter/incremental_strategies/test_microbatch.py b/tests/functional/adapter/incremental_strategies/test_microbatch.py index 7806b0299..6974d6b9b 100644 --- a/tests/functional/adapter/incremental_strategies/test_microbatch.py +++ b/tests/functional/adapter/incremental_strategies/test_microbatch.py @@ -11,6 +11,9 @@ """ +@pytest.mark.skip_profile( + "databricks_http_cluster", "databricks_sql_endpoint", "spark_session", "spark_http_odbc" +) class TestMicrobatch(BaseMicrobatch): @pytest.fixture(scope="class") def microbatch_model_sql(self) -> str: From 1f0d48ddbd4f8b3fb35a1a3b4c90062410e57de8 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 21:19:28 +0100 Subject: [PATCH 04/13] set overwrite mode for microbatch --- .../spark/macros/materializations/incremental/incremental.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql index 9a66bab51..77bfc59c9 100644 --- a/dbt/include/spark/macros/materializations/incremental/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -24,7 +24,7 @@ {%- endif -%} {#-- Set Overwrite Mode --#} - {%- if strategy == 'insert_overwrite' and partition_by -%} + {%- if strategy in ['insert_overwrite', 'microbatch'] and partition_by -%} {%- call statement() -%} set spark.sql.sources.partitionOverwriteMode = DYNAMIC {%- endcall -%} From 55015490aeea3e924ae2e924c2dd25f1eeaad2c5 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 21:37:07 +0100 Subject: [PATCH 05/13] add begin config + create date_day --- .../adapter/incremental_strategies/test_microbatch.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/functional/adapter/incremental_strategies/test_microbatch.py b/tests/functional/adapter/incremental_strategies/test_microbatch.py index 6974d6b9b..3cc366142 100644 --- a/tests/functional/adapter/incremental_strategies/test_microbatch.py +++ b/tests/functional/adapter/incremental_strategies/test_microbatch.py @@ -6,8 +6,9 @@ # No requirement for a unique_id for spark microbatch! _microbatch_model_no_unique_id_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }} -select * from {{ ref('input_model') }} +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +select *, cast(event_time as date) as date_day +from {{ ref('input_model') }} """ From 0231ec79e37602881517b0b506bf3ce66d40f6df Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 21:40:32 +0100 Subject: [PATCH 06/13] temporarily run only microbatch tests in CI --- dagger/run_dbt_spark_tests.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dagger/run_dbt_spark_tests.py b/dagger/run_dbt_spark_tests.py index 67fa56587..16192541d 100644 --- a/dagger/run_dbt_spark_tests.py +++ b/dagger/run_dbt_spark_tests.py @@ -150,6 +150,8 @@ async def test_spark(test_args): tst_container = tst_container.with_(env_variables(TESTING_ENV_VARS)) test_path = test_args.test_path if test_args.test_path else "tests/functional/adapter" + # TODO: remove before merging! + test_path = "tests/functional/adapter/incremental_strategies/test_microbatch.py" result = await tst_container.with_exec( ["pytest", "-v", "--profile", test_profile, "-n", "auto", test_path] ).stdout() From 3c30100e9821cd2d8993229a6d15a887c4db6a7a Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 21:47:49 +0100 Subject: [PATCH 07/13] partition_by date_day in microbatch model --- CONTRIBUTING.md | 4 ++-- .../adapter/incremental_strategies/test_microbatch.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6fcaacea8..7185cef3e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -70,14 +70,14 @@ To run functional tests we rely on [dagger](https://dagger.io/). This launches a ```sh pip install -r dagger/requirements.txt -python dagger/run_dbt_spark_tests.py --profile databricks_sql_endpoint --test-path tests/functional/adapter/test_basic.py::TestSimpleMaterializationsSpark::test_base -``` +python dagger/run_dbt_spark_tests.py --profile apache_spark --test-path tests/functional/adapter/incremental_strategies/test_microbatch.py `--profile`: required, this is the kind of spark connection to test against _options_: - "apache_spark" - "spark_session" + - "spark_http_odbc" - "databricks_sql_endpoint" - "databricks_cluster" - "databricks_http_cluster" diff --git a/tests/functional/adapter/incremental_strategies/test_microbatch.py b/tests/functional/adapter/incremental_strategies/test_microbatch.py index 3cc366142..060033efa 100644 --- a/tests/functional/adapter/incremental_strategies/test_microbatch.py +++ b/tests/functional/adapter/incremental_strategies/test_microbatch.py @@ -6,7 +6,7 @@ # No requirement for a unique_id for spark microbatch! _microbatch_model_no_unique_id_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), partition_by=['date_day']) }} select *, cast(event_time as date) as date_day from {{ ref('input_model') }} """ From 287531c482c96b1729d279311da71b4573df857c Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 23:18:27 +0100 Subject: [PATCH 08/13] test using parquet --- .../adapter/incremental_strategies/test_microbatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/adapter/incremental_strategies/test_microbatch.py b/tests/functional/adapter/incremental_strategies/test_microbatch.py index 060033efa..088b35baf 100644 --- a/tests/functional/adapter/incremental_strategies/test_microbatch.py +++ b/tests/functional/adapter/incremental_strategies/test_microbatch.py @@ -6,7 +6,7 @@ # No requirement for a unique_id for spark microbatch! _microbatch_model_no_unique_id_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), partition_by=['date_day']) }} +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), partition_by=['date_day'], file_format='parquet') }} select *, cast(event_time as date) as date_day from {{ ref('input_model') }} """ From 835cb475a7fcc135301601d6d1a4515f07b27cf9 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 23:40:53 +0100 Subject: [PATCH 09/13] partition_by required for dbt-spark microbatch --- .../spark/macros/materializations/incremental/strategies.sql | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql index 019dad452..bbb3a571b 100644 --- a/dbt/include/spark/macros/materializations/incremental/strategies.sql +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -76,7 +76,10 @@ {#-- insert statements don't like CTEs, so support them via a temp view #} {{ get_insert_overwrite_sql(source, target, existing) }} {%- elif strategy == 'microbatch' -%} - {#-- insert statements don't like CTEs, so support them via a temp view #} + {#-- microbatch wraps insert_overwrite, and requires a partition_by config #} + {%- if not config.get('partition_by') -%} + {{ exceptions.raise_compiler_error("dbt-spark 'microbatch' requires a `partition_by` config") }} + {%- endif -%} {{ get_insert_overwrite_sql(source, target, existing) }} {%- elif strategy == 'merge' -%} {#-- merge all columns for datasources which implement MERGE INTO (e.g. databricks, iceberg) - schema changes are handled for us #} From 846eb3f6400874657e0c1c88e895771055face19 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 25 Sep 2024 12:52:52 +0100 Subject: [PATCH 10/13] changelog entry --- .changes/unreleased/Features-20240925-125242.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240925-125242.yaml diff --git a/.changes/unreleased/Features-20240925-125242.yaml b/.changes/unreleased/Features-20240925-125242.yaml new file mode 100644 index 000000000..03a6ca73d --- /dev/null +++ b/.changes/unreleased/Features-20240925-125242.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Microbatch Strategy +time: 2024-09-25T12:52:42.872017+01:00 +custom: + Author: michelleark + Issue: "1109" From 3a9d7ced2ca972891030d7aa8e07fa2d062ed311 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 25 Sep 2024 14:29:37 +0100 Subject: [PATCH 11/13] improve partition_by missing error message --- .../macros/materializations/incremental/strategies.sql | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql index bbb3a571b..28ba8284e 100644 --- a/dbt/include/spark/macros/materializations/incremental/strategies.sql +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -77,8 +77,14 @@ {{ get_insert_overwrite_sql(source, target, existing) }} {%- elif strategy == 'microbatch' -%} {#-- microbatch wraps insert_overwrite, and requires a partition_by config #} + {% set missing_partition_key_microbatch_msg -%} + dbt-spark 'microbatch' incremental strategy requires a `partition_by` config. + Ensure you are using a `partition_by` column that is of grain {{ config.get('batch_size') }} + for microbatch model {{ model.name }}. + {%- endset %} + {%- if not config.get('partition_by') -%} - {{ exceptions.raise_compiler_error("dbt-spark 'microbatch' requires a `partition_by` config") }} + {{ exceptions.raise_compiler_error(missing_partition_key_microbatch_msg) }} {%- endif -%} {{ get_insert_overwrite_sql(source, target, existing) }} {%- elif strategy == 'merge' -%} From 07544fe93ae37f28ba1c146224eae28662bfcd62 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 25 Sep 2024 15:58:42 +0100 Subject: [PATCH 12/13] restore testing setup --- CONTRIBUTING.md | 3 ++- dagger/run_dbt_spark_tests.py | 2 -- .../spark/macros/materializations/incremental/strategies.sql | 3 +-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7185cef3e..903507b7a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -70,7 +70,8 @@ To run functional tests we rely on [dagger](https://dagger.io/). This launches a ```sh pip install -r dagger/requirements.txt -python dagger/run_dbt_spark_tests.py --profile apache_spark --test-path tests/functional/adapter/incremental_strategies/test_microbatch.py +python dagger/run_dbt_spark_tests.py --profile databricks_sql_endpoint --test-path tests/functional/adapter/test_basic.py::TestSimpleMaterializationsSpark::test_base +``` `--profile`: required, this is the kind of spark connection to test against diff --git a/dagger/run_dbt_spark_tests.py b/dagger/run_dbt_spark_tests.py index 16192541d..67fa56587 100644 --- a/dagger/run_dbt_spark_tests.py +++ b/dagger/run_dbt_spark_tests.py @@ -150,8 +150,6 @@ async def test_spark(test_args): tst_container = tst_container.with_(env_variables(TESTING_ENV_VARS)) test_path = test_args.test_path if test_args.test_path else "tests/functional/adapter" - # TODO: remove before merging! - test_path = "tests/functional/adapter/incremental_strategies/test_microbatch.py" result = await tst_container.with_exec( ["pytest", "-v", "--profile", test_profile, "-n", "auto", test_path] ).stdout() diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql index 28ba8284e..4ffead6a0 100644 --- a/dbt/include/spark/macros/materializations/incremental/strategies.sql +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -79,8 +79,7 @@ {#-- microbatch wraps insert_overwrite, and requires a partition_by config #} {% set missing_partition_key_microbatch_msg -%} dbt-spark 'microbatch' incremental strategy requires a `partition_by` config. - Ensure you are using a `partition_by` column that is of grain {{ config.get('batch_size') }} - for microbatch model {{ model.name }}. + Ensure you are using a `partition_by` column that is of grain {{ config.get('batch_size') }}. {%- endset %} {%- if not config.get('partition_by') -%} From 4ad1bc6188de5ca0aab5d3b0511c1707be8861bf Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 25 Sep 2024 21:02:43 +0100 Subject: [PATCH 13/13] Update .changes/unreleased/Features-20240925-125242.yaml Co-authored-by: Colin Rogers <111200756+colin-rogers-dbt@users.noreply.github.com> --- .changes/unreleased/Features-20240925-125242.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changes/unreleased/Features-20240925-125242.yaml b/.changes/unreleased/Features-20240925-125242.yaml index 03a6ca73d..1cb51c004 100644 --- a/.changes/unreleased/Features-20240925-125242.yaml +++ b/.changes/unreleased/Features-20240925-125242.yaml @@ -1,5 +1,5 @@ kind: Features -body: Microbatch Strategy +body: Add Microbatch Strategy to dbt-spark time: 2024-09-25T12:52:42.872017+01:00 custom: Author: michelleark