Skip to content

Commit

Permalink
Merge branch 'main' into 1414-microbatch-copy
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Dec 5, 2024
2 parents d5435d3 + 4d255b2 commit 5455196
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 9 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20241204-105846.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Cast `event_time` to a timestamp prior to comparing against microbatch start/end
time
time: 2024-12-04T10:58:46.573608-05:00
custom:
Author: michelleark
Issue: "1422"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241205-133606.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix issue where rate limit errors on table service calls are not retried
time: 2024-12-05T13:36:06.436005-05:00
custom:
Author: mikealfare
Issue: "1423"
5 changes: 1 addition & 4 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,7 @@ def get_bq_table(self, database, schema, identifier) -> Table:
# backwards compatibility: fill in with defaults if not specified
database = database or conn.credentials.database
schema = schema or conn.credentials.schema
return client.get_table(
table=self.table_ref(database, schema, identifier),
retry=self._retry.create_reopen_with_deadline(conn),
)
return client.get_table(self.table_ref(database, schema, identifier))

def drop_dataset(self, database, schema) -> None:
conn = self.get_thread_connection()
Expand Down
25 changes: 24 additions & 1 deletion dbt/adapters/bigquery/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

from dbt_common.exceptions import CompilationError
from dbt_common.utils.dict import filter_null_values
from dbt.adapters.base.relation import BaseRelation, ComponentName, InformationSchema
from dbt.adapters.base.relation import (
BaseRelation,
ComponentName,
InformationSchema,
EventTimeFilter,
)
from dbt.adapters.contracts.relation import RelationConfig, RelationType
from dbt.adapters.relation_configs import RelationConfigChangeAction

Expand Down Expand Up @@ -116,6 +121,24 @@ def materialized_view_config_changeset(
def information_schema(self, identifier: Optional[str] = None) -> "BigQueryInformationSchema":
return BigQueryInformationSchema.from_relation(self, identifier)

def _render_event_time_filtered(self, event_time_filter: EventTimeFilter) -> str:
"""
Returns "" if start and end are both None
"""
filter = ""
if event_time_filter.start and event_time_filter.end:
filter = f"cast({event_time_filter.field_name} as timestamp) >= '{event_time_filter.start}' and cast({event_time_filter.field_name} as timestamp) < '{event_time_filter.end}'"
elif event_time_filter.start:
filter = (
f"cast({event_time_filter.field_name} as timestamp) >= '{event_time_filter.start}'"
)
elif event_time_filter.end:
filter = (
f"cast({event_time_filter.field_name} as timestamp) < '{event_time_filter.end}'"
)

return filter


@dataclass(frozen=True, eq=False, repr=False)
class BigQueryInformationSchema(InformationSchema):
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ RUN apt-get update \
build-essential=12.9 \
ca-certificates=20210119 \
git=1:2.30.2-1+deb11u2 \
libpq-dev=13.14-0+deb11u1 \
libpq-dev=13.18-0+deb11u1 \
make=4.3-4.1 \
openssh-client=1:8.4p1-5+deb11u3 \
software-properties-common=0.96.20.2-2.1 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)
)
}}
select * from {{ ref('input_model') }}
select id, cast(event_time as timestamp) as event_time from {{ ref('input_model') }}
"""

microbatch_input_sql = """
Expand All @@ -582,6 +582,24 @@
select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time
"""

microbatch_input_event_time_date_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, DATE '2020-01-01' as event_time
union all
select 2 as id, DATE '2020-01-02' as event_time
union all
select 3 as id, DATE '2020-01-03' as event_time
"""

microbatch_input_event_time_datetime_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, DATETIME '2020-01-01' as event_time
union all
select 2 as id, DATETIME '2020-01-02' as event_time
union all
select 3 as id, DATETIME '2020-01-03' as event_time
"""

microbatch_model_no_partition_by_sql = """
{{ config(
materialized='incremental',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
microbatch_model_no_partition_by_sql,
microbatch_model_invalid_partition_by_sql,
microbatch_model_no_unique_id_copy_partitions_sql,
microbatch_input_event_time_date_sql,
microbatch_input_event_time_datetime_sql,
)


Expand All @@ -23,6 +25,32 @@ def microbatch_model_sql(self) -> str:
return microbatch_model_no_unique_id_sql


class TestBigQueryMicrobatchInputWithDate(TestBigQueryMicrobatch):
@pytest.fixture(scope="class")
def input_model_sql(self) -> str:
return microbatch_input_event_time_date_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, DATE '2020-01-04'), (5, DATE '2020-01-05')"


class TestBigQueryMicrobatchInputWithDatetime(TestBigQueryMicrobatch):
@pytest.fixture(scope="class")
def input_model_sql(self) -> str:
return microbatch_input_event_time_datetime_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, DATETIME '2020-01-04'), (5, DATETIME '2020-01-05')"


class TestBigQueryMicrobatchMissingPartitionBy:
@pytest.fixture(scope="class")
def models(self) -> str:
Expand All @@ -31,7 +59,6 @@ def models(self) -> str:
"input_model.sql": microbatch_input_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_execution_failure_no_partition_by(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, stdout = run_dbt_and_capture(["run"], expect_pass=False)
Expand All @@ -46,7 +73,6 @@ def models(self) -> str:
"input_model.sql": microbatch_input_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_execution_failure_no_partition_by(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, stdout = run_dbt_and_capture(["run"], expect_pass=False)
Expand Down

0 comments on commit 5455196

Please sign in to comment.