From 906495f95b4f1fea4a38ed33692afa512ab73e01 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 6 Sep 2024 10:59:10 -0400 Subject: [PATCH 1/5] poc: microbatch using merge --- .../macros/materializations/incremental.sql | 4 +- .../incremental_strategy/microbatch.sql | 9 +++ dev-requirements.txt | 4 +- .../test_incremental_microbatch.py | 77 +++++++++++++++++++ 4 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql create mode 100644 tests/functional/adapter/incremental/test_incremental_microbatch.py diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index 3908bedc2..93499d008 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -4,9 +4,9 @@ {% set invalid_strategy_msg -%} Invalid incremental strategy provided: {{ strategy }} - Expected one of: 'merge', 'insert_overwrite' + Expected one of: 'merge', 'insert_overwrite', 'microbatch' {%- endset %} - {% if strategy not in ['merge', 'insert_overwrite'] %} + {% if strategy not in ['merge', 'insert_overwrite', 'microbatch'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {% endif %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql new file mode 100644 index 000000000..8511f23ca --- /dev/null +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql @@ -0,0 +1,9 @@ +{% macro get_incremental_microbatch_sql(arg_dict) %} + + {% if arg_dict["unique_key"] %} + {% do return(adapter.dispatch('get_incremental_merge_sql', 'dbt')(arg_dict)) %} + {% else %} + {{ exceptions.raise_compiler_error("dbt-bigquery 'microbatch' requires a `unique_key` config") }} + {% endif %} + +{% endmacro %} diff --git a/dev-requirements.txt b/dev-requirements.txt index 34169172a..c5eba53d4 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,8 +1,8 @@ # install latest changes in dbt-core -git+https://github.com/dbt-labs/dbt-adapters.git +git+https://github.com/dbt-labs/dbt-adapters.git@event-time-ref-filtering git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@event-time-ref-filtering#egg=dbt-core&subdirectory=core # dev ddtrace==2.3.0 diff --git a/tests/functional/adapter/incremental/test_incremental_microbatch.py b/tests/functional/adapter/incremental/test_incremental_microbatch.py new file mode 100644 index 000000000..69861bf40 --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_microbatch.py @@ -0,0 +1,77 @@ +import os +from unittest import mock + +import pytest +from freezegun import freeze_time + +from dbt.tests.util import relation_from_name, run_dbt + +input_model_sql = """ +{{ config(materialized='table', event_time='event_time') }} + +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +union all +select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time +union all +select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time +""" + +microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + + +class TestMicroBatchBoundsDefault: + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # initial run -- backfills all data + with freeze_time("2020-01-01 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # our partition grain is "day" so running the same day without new data should produce the same results + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # add next two days of data + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.run_sql( + f"insert into {test_schema_relation}.input_model(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + ) + self.assert_row_count(project, "input_model", 5) + + # re-run without changing current time => no insert + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 3) + + # re-run by advancing time by one day changing current time => insert 1 row + with freeze_time("2020-01-04 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 4) + + # re-run by advancing time by one more day changing current time => insert 1 more row + with freeze_time("2020-01-05 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) From c1f58057c28d8f99dba0a0afb4ccb0aa8efde21c Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 17:47:24 +0100 Subject: [PATCH 2/5] update base tests --- dev-requirements.txt | 4 +- .../test_incremental_microbatch.py | 77 +++---------------- 2 files changed, 11 insertions(+), 70 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index c5eba53d4..34169172a 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,8 +1,8 @@ # install latest changes in dbt-core -git+https://github.com/dbt-labs/dbt-adapters.git@event-time-ref-filtering +git+https://github.com/dbt-labs/dbt-adapters.git git+https://github.com/dbt-labs/dbt-adapters.git#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git -git+https://github.com/dbt-labs/dbt-core.git@event-time-ref-filtering#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core # dev ddtrace==2.3.0 diff --git a/tests/functional/adapter/incremental/test_incremental_microbatch.py b/tests/functional/adapter/incremental/test_incremental_microbatch.py index 69861bf40..daf6f656a 100644 --- a/tests/functional/adapter/incremental/test_incremental_microbatch.py +++ b/tests/functional/adapter/incremental/test_incremental_microbatch.py @@ -1,77 +1,18 @@ -import os -from unittest import mock - import pytest -from freezegun import freeze_time - -from dbt.tests.util import relation_from_name, run_dbt -input_model_sql = """ -{{ config(materialized='table', event_time='event_time') }} +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, +) -select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time -union all -select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time -union all -select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time -""" -microbatch_model_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +# TODO: No requirement for a unique_id for bigquery microbatch! +_microbatch_model_no_unique_id_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} select * from {{ ref('input_model') }} """ -class TestMicroBatchBoundsDefault: +class TestBigQueryMicrobatch(BaseMicrobatch): @pytest.fixture(scope="class") - def models(self): - return { - "input_model.sql": input_model_sql, - "microbatch_model.sql": microbatch_model_sql, - } - - def assert_row_count(self, project, relation_name: str, expected_row_count: int): - relation = relation_from_name(project.adapter, relation_name) - result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") - - if result[0] != expected_row_count: - # running show for debugging - run_dbt(["show", "--inline", f"select * from {relation}"]) - - assert result[0] == expected_row_count - - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) - def test_run_with_event_time(self, project): - # initial run -- backfills all data - with freeze_time("2020-01-01 13:57:00"): - run_dbt(["run"]) - self.assert_row_count(project, "microbatch_model", 3) - - # our partition grain is "day" so running the same day without new data should produce the same results - with freeze_time("2020-01-03 14:57:00"): - run_dbt(["run"]) - self.assert_row_count(project, "microbatch_model", 3) - - # add next two days of data - test_schema_relation = project.adapter.Relation.create( - database=project.database, schema=project.test_schema - ) - project.run_sql( - f"insert into {test_schema_relation}.input_model(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" - ) - self.assert_row_count(project, "input_model", 5) - - # re-run without changing current time => no insert - with freeze_time("2020-01-03 14:57:00"): - run_dbt(["run", "--select", "microbatch_model"]) - self.assert_row_count(project, "microbatch_model", 3) - - # re-run by advancing time by one day changing current time => insert 1 row - with freeze_time("2020-01-04 14:57:00"): - run_dbt(["run", "--select", "microbatch_model"]) - self.assert_row_count(project, "microbatch_model", 4) - - # re-run by advancing time by one more day changing current time => insert 1 more row - with freeze_time("2020-01-05 14:57:00"): - run_dbt(["run", "--select", "microbatch_model"]) - self.assert_row_count(project, "microbatch_model", 5) + def microbatch_model_sql(self) -> str: + return _microbatch_model_no_unique_id_sql From 6fa4f41478cd230dc8d39efa2a1b80d48872f804 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 25 Sep 2024 18:14:30 +0100 Subject: [PATCH 3/5] use dynamic insert_overwrite under the hood for bigquery --- .../macros/materializations/incremental.sql | 7 ++++- .../incremental_strategy/microbatch.sql | 30 +++++++++++++++---- .../test_incremental_microbatch.py | 15 ++++++++-- 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index 93499d008..fed1a680d 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -48,8 +48,13 @@ tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions ) %} - {% else %} {# strategy == 'merge' #} + {% elif strategy == 'microbatch' %} + {% set build_sql = bq_generate_microbatch_build_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + ) %} + + {% else %} {# strategy == 'merge' #} {% set build_sql = bq_generate_incremental_merge_build_sql( tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_predicates ) %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql index 8511f23ca..4f3b5dba5 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql @@ -1,9 +1,27 @@ -{% macro get_incremental_microbatch_sql(arg_dict) %} +{% macro bq_generate_microbatch_build_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions +) %} + {% if partition_by is none %} + {% set missing_partition_msg -%} + The 'microbatch' strategy requires the `partition_by` config. + {%- endset %} + {% do exceptions.raise_compiler_error(missing_partition_msg) %} + {% endif %} - {% if arg_dict["unique_key"] %} - {% do return(adapter.dispatch('get_incremental_merge_sql', 'dbt')(arg_dict)) %} - {% else %} - {{ exceptions.raise_compiler_error("dbt-bigquery 'microbatch' requires a `unique_key` config") }} - {% endif %} + {% if partition_by.granularity != config.get('batch_size') %} + {% set invalid_partition_by_granularity_msg -%} + The 'microbatch' strategy in requires the `partition_by` config with the same granularity + as its configured `batch_size`. Got: + `batch_size`: {{ config.get('batch_size') }} + `partition_by.granularity`: {{ partition_by.granularity }} + {%- endset %} + {% do exceptions.raise_compiler_error(invalid_partition_by_granularity_msg) %} + {% endif %} + + {% set build_sql = bq_insert_overwrite_sql( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + ) %} + + {{ return(build_sql) }} {% endmacro %} diff --git a/tests/functional/adapter/incremental/test_incremental_microbatch.py b/tests/functional/adapter/incremental/test_incremental_microbatch.py index daf6f656a..eb5125a42 100644 --- a/tests/functional/adapter/incremental/test_incremental_microbatch.py +++ b/tests/functional/adapter/incremental/test_incremental_microbatch.py @@ -5,9 +5,20 @@ ) -# TODO: No requirement for a unique_id for bigquery microbatch! _microbatch_model_no_unique_id_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +{{ config( + materialized='incremental', + incremental_strategy='microbatch', + partition_by={ + "field": "event_time", + "data_type": "timestamp", + "granularity": "day" + }, + event_time='event_time', + batch_size='day', + begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0) + ) +}} select * from {{ ref('input_model') }} """ From c6707edc1f5ad7657b3f3e7915071fbd8449cd53 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 25 Sep 2024 23:22:43 +0100 Subject: [PATCH 4/5] changelog entry --- .changes/unreleased/Features-20240925-232238.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240925-232238.yaml diff --git a/.changes/unreleased/Features-20240925-232238.yaml b/.changes/unreleased/Features-20240925-232238.yaml new file mode 100644 index 000000000..903884196 --- /dev/null +++ b/.changes/unreleased/Features-20240925-232238.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add Microbatch Strategy to dbt-spark +time: 2024-09-25T23:22:38.216277+01:00 +custom: + Author: michelleark + Issue: "1354" From 01cece32cab772c04266653f9706e00f8cce0f00 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 26 Sep 2024 00:27:57 +0100 Subject: [PATCH 5/5] clean up validation + add testing --- .../macros/materializations/incremental.sql | 4 ++ .../incremental_strategy/microbatch.sql | 37 +++++------ .../incremental_strategy_fixtures.py | 56 +++++++++++++++++ .../test_incremental_microbatch.py | 62 +++++++++++++------ 4 files changed, 123 insertions(+), 36 deletions(-) diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index fed1a680d..935280d63 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -10,6 +10,10 @@ {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {% endif %} + {% if strategy == 'microbatch' %} + {% do bq_validate_microbatch_config(config) %} + {% endif %} + {% do return(strategy) %} {% endmacro %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql index 4f3b5dba5..d4c4b7453 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql @@ -1,27 +1,28 @@ +{% macro bq_validate_microbatch_config(config) %} + {% if config.get("partition_by") is none %} + {% set missing_partition_msg -%} + The 'microbatch' strategy requires a `partition_by` config. + {%- endset %} + {% do exceptions.raise_compiler_error(missing_partition_msg) %} + {% endif %} + + {% if config.get("partition_by").granularity != config.get('batch_size') %} + {% set invalid_partition_by_granularity_msg -%} + The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`. + Got: + `batch_size`: {{ config.get('batch_size') }} + `partition_by.granularity`: {{ config.get("partition_by").granularity }} + {%- endset %} + {% do exceptions.raise_compiler_error(invalid_partition_by_granularity_msg) %} + {% endif %} +{% endmacro %} + {% macro bq_generate_microbatch_build_sql( tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions ) %} - {% if partition_by is none %} - {% set missing_partition_msg -%} - The 'microbatch' strategy requires the `partition_by` config. - {%- endset %} - {% do exceptions.raise_compiler_error(missing_partition_msg) %} - {% endif %} - - {% if partition_by.granularity != config.get('batch_size') %} - {% set invalid_partition_by_granularity_msg -%} - The 'microbatch' strategy in requires the `partition_by` config with the same granularity - as its configured `batch_size`. Got: - `batch_size`: {{ config.get('batch_size') }} - `partition_by.granularity`: {{ partition_by.granularity }} - {%- endset %} - {% do exceptions.raise_compiler_error(invalid_partition_by_granularity_msg) %} - {% endif %} - {% set build_sql = bq_insert_overwrite_sql( tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions ) %} {{ return(build_sql) }} - {% endmacro %} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index 17391b48d..02efbb6c2 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -555,3 +555,59 @@ select * from data """.lstrip() + +microbatch_model_no_unique_id_sql = """ +{{ config( + materialized='incremental', + incremental_strategy='microbatch', + partition_by={ + 'field': 'event_time', + 'data_type': 'timestamp', + 'granularity': 'day' + }, + event_time='event_time', + batch_size='day', + begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0) + ) +}} +select * from {{ ref('input_model') }} +""" + +microbatch_input_sql = """ +{{ config(materialized='table', event_time='event_time') }} +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +union all +select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time +union all +select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time +""" + +microbatch_model_no_partition_by_sql = """ +{{ config( + materialized='incremental', + incremental_strategy='microbatch', + event_time='event_time', + batch_size='day', + begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0) + ) +}} +select * from {{ ref('input_model') }} +""" + + +microbatch_model_invalid_partition_by_sql = """ +{{ config( + materialized='incremental', + incremental_strategy='microbatch', + event_time='event_time', + batch_size='day', + begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), + partition_by={ + 'field': 'event_time', + 'data_type': 'timestamp', + 'granularity': 'hour' + } + ) +}} +select * from {{ ref('input_model') }} +""" diff --git a/tests/functional/adapter/incremental/test_incremental_microbatch.py b/tests/functional/adapter/incremental/test_incremental_microbatch.py index eb5125a42..d1bbbcea3 100644 --- a/tests/functional/adapter/incremental/test_incremental_microbatch.py +++ b/tests/functional/adapter/incremental/test_incremental_microbatch.py @@ -1,29 +1,55 @@ +import os import pytest +from unittest import mock +from dbt.tests.util import run_dbt_and_capture from dbt.tests.adapter.incremental.test_incremental_microbatch import ( BaseMicrobatch, + patch_microbatch_end_time, ) - -_microbatch_model_no_unique_id_sql = """ -{{ config( - materialized='incremental', - incremental_strategy='microbatch', - partition_by={ - "field": "event_time", - "data_type": "timestamp", - "granularity": "day" - }, - event_time='event_time', - batch_size='day', - begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0) - ) -}} -select * from {{ ref('input_model') }} -""" +from tests.functional.adapter.incremental.incremental_strategy_fixtures import ( + microbatch_model_no_unique_id_sql, + microbatch_input_sql, + microbatch_model_no_partition_by_sql, + microbatch_model_invalid_partition_by_sql, +) class TestBigQueryMicrobatch(BaseMicrobatch): @pytest.fixture(scope="class") def microbatch_model_sql(self) -> str: - return _microbatch_model_no_unique_id_sql + return microbatch_model_no_unique_id_sql + + +class TestBigQueryMicrobatchMissingPartitionBy: + @pytest.fixture(scope="class") + def models(self) -> str: + return { + "microbatch.sql": microbatch_model_no_partition_by_sql, + "input_model.sql": microbatch_input_sql, + } + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_execution_failure_no_partition_by(self, project): + with patch_microbatch_end_time("2020-01-03 13:57:00"): + _, stdout = run_dbt_and_capture(["run"], expect_pass=False) + assert "The 'microbatch' strategy requires a `partition_by` config" in stdout + + +class TestBigQueryMicrobatchInvalidPartitionByGranularity: + @pytest.fixture(scope="class") + def models(self) -> str: + return { + "microbatch.sql": microbatch_model_invalid_partition_by_sql, + "input_model.sql": microbatch_input_sql, + } + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_execution_failure_no_partition_by(self, project): + with patch_microbatch_end_time("2020-01-03 13:57:00"): + _, stdout = run_dbt_and_capture(["run"], expect_pass=False) + assert ( + "The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`" + in stdout + )