diff --git a/.changes/unreleased/Features-20241016-154952.yaml b/.changes/unreleased/Features-20241016-154952.yaml new file mode 100644 index 000000000..93328046e --- /dev/null +++ b/.changes/unreleased/Features-20241016-154952.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Enables overriding execution project for .py models run via dataproc +time: 2024-10-16T15:49:52.295605-06:00 +custom: + Author: matt-winkler + Issue: "1364" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index d3eee3ef3..cfca3ee16 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -148,6 +148,7 @@ class BigQueryCredentials(Credentials): dataproc_region: Optional[str] = None dataproc_cluster_name: Optional[str] = None + dataproc_project: Optional[str] = None gcs_bucket: Optional[str] = None dataproc_batch: Optional[DataprocBatchConfig] = field( @@ -213,6 +214,7 @@ def _connection_keys(self): "dataproc_cluster_name", "gcs_bucket", "dataproc_batch", + "dataproc_project", ) @classmethod @@ -228,6 +230,10 @@ def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]: # `execution_project` default to dataset/project if "execution_project" not in d: d["execution_project"] = d["database"] + # if no dataproc_project default to execution_project + if "dataproc_project" not in d: + d["dataproc_project"] = d["execution_project"] + return d diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 368ed9d07..bb48c6c8a 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -36,6 +36,7 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: self.parsed_model = parsed_model python_required_configs = [ "dataproc_region", + "dataproc_project", "gcs_bucket", ] for required_config in python_required_configs: @@ -107,7 +108,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: } operation = self.job_client.submit_job_as_operation( request={ - "project_id": self.credential.execution_project, + "project_id": self.credential.dataproc_project, "region": self.credential.dataproc_region, "job": job, } diff --git a/tests/functional/python_model_tests/test_override_dataproc_project.py b/tests/functional/python_model_tests/test_override_dataproc_project.py new file mode 100644 index 000000000..43cd0deb5 --- /dev/null +++ b/tests/functional/python_model_tests/test_override_dataproc_project.py @@ -0,0 +1,192 @@ +import pytest + +from dbt.tests.util import run_dbt +from tests.unit.test_bigquery_adapter import BaseTestBigQueryAdapter +from tests.functional.python_model_tests.files import SINGLE_RECORD # noqa: F401 +from unittest.mock import patch + + +# Test application of dataproc_batch configuration to a +# google.cloud.dataproc_v1.Batch object. +# This reuses the machinery from BaseTestBigQueryAdapter to get hold of the +# parsed credentials +class TestOverrideDataprocProject(BaseTestBigQueryAdapter): + @pytest.fixture(scope="class") + def model_path(self): + return "models" + + def test_update_dataproc_cluster(self): + # update the raw profile to set dataproc_project config + self.raw_profile["outputs"]["dataproc-serverless-configured"]["dataproc_batch"][ + "dataproc_project" + ] = "test" + adapter = self.get_adapter("dataproc-serverless-configured") + run_dbt(["models"]) + + # raw_profile = self.raw_profile["outputs"]["dataproc-serverless-configured"][ + # "dataproc_batch" + # ] + + +# class BaseOverrideDatabase: +# @pytest.fixture(scope="class") +# def model_path(self): +# return "models" + +# @pytest.fixture(scope="class") +# def project_config_update(self): +# return { +# "config-version": 2, +# "seed-paths": ["seeds"], +# "vars": { +# "alternate_db": ALT_DATABASE, +# }, +# "quoting": { +# "database": True, +# }, +# "seeds": { +# "quote_columns": False, +# }, +# } + +# @pytest.fixture(scope="function") +# def clean_up(self, project): +# yield +# relation = project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema +# ) +# project.adapter.drop_schema(relation) + + +# class TestModelOverrideBigQuery(BaseOverrideDatabase): +# def run_database_override(self, project): +# run_dbt(["seed"]) +# assert len(run_dbt(["run"])) == 4 +# check_relations_equal_with_relations( +# project.adapter, +# [ +# project.adapter.Relation.create(schema=project.test_schema, identifier="seed"), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_2" +# ), +# project.adapter.Relation.create(schema=project.test_schema, identifier="view_1"), +# project.adapter.Relation.create(schema=project.test_schema, identifier="view_3"), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_4" +# ), +# ], +# ) + +# def test_bigquery_database_override(self, project, clean_up): +# self.run_database_override(project) + + +# class BaseTestProjectModelOverrideBigQuery(BaseOverrideDatabase): +# def run_database_override(self, project): +# run_dbt(["seed"]) +# assert len(run_dbt(["run"])) == 4 +# self.assertExpectedRelations(project) + +# def assertExpectedRelations(self, project): +# check_relations_equal_with_relations( +# project.adapter, +# [ +# project.adapter.Relation.create(schema=project.test_schema, identifier="seed"), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_2" +# ), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_1" +# ), +# project.adapter.Relation.create(schema=project.test_schema, identifier="view_3"), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_4" +# ), +# ], +# ) + + +# class TestProjectModelOverrideBigQuery(BaseTestProjectModelOverrideBigQuery): +# @pytest.fixture(scope="class") +# def project_config_update(self): +# return { +# "config-version": 2, +# "models": { +# "database": ALT_DATABASE, +# "test": {"subfolder": {"database": "{{ target.database }}"}}, +# }, +# "seed-paths": ["seeds"], +# "vars": { +# "alternate_db": ALT_DATABASE, +# }, +# "quoting": { +# "database": True, +# }, +# "seeds": { +# "quote_columns": False, +# }, +# } + +# def test_bigquery_database_override(self, project, clean_up): +# self.run_database_override(project) + + +# class TestProjectModelAliasOverrideBigQuery(BaseTestProjectModelOverrideBigQuery): +# @pytest.fixture(scope="class") +# def project_config_update(self): +# return { +# "config-version": 2, +# "models": { +# "project": ALT_DATABASE, +# "test": {"subfolder": {"project": "{{ target.database }}"}}, +# }, +# "seed-paths": ["seeds"], +# "vars": { +# "alternate_db": ALT_DATABASE, +# }, +# "quoting": { +# "database": True, +# }, +# "seeds": { +# "quote_columns": False, +# }, +# } + +# def test_bigquery_project_override(self, project, clean_up): +# self.run_database_override(project) + + +# class TestProjectSeedOverrideBigQuery(BaseOverrideDatabase): +# @pytest.fixture(scope="class") +# def project_config_update(self): +# return { +# "config-version": 2, +# "seed-paths": ["seeds"], +# "vars": { +# "alternate_db": ALT_DATABASE, +# }, +# "seeds": {"database": ALT_DATABASE}, +# } + +# def run_database_override(self, project): +# run_dbt(["seed"]) +# assert len(run_dbt(["run"])) == 4 +# check_relations_equal_with_relations( +# project.adapter, +# [ +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="seed" +# ), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_2" +# ), +# project.adapter.Relation.create(schema=project.test_schema, identifier="view_1"), +# project.adapter.Relation.create(schema=project.test_schema, identifier="view_3"), +# project.adapter.Relation.create( +# database=ALT_DATABASE, schema=project.test_schema, identifier="view_4" +# ), +# ], +# ) + +# def test_bigquery_database_override(self, project, clean_up): +# self.run_database_override(project)