Skip to content

Commit

Permalink
fix imports, mark private members
Browse files Browse the repository at this point in the history
  • Loading branch information
mikealfare committed Oct 30, 2024
1 parent f5182bc commit 5378259
Showing 1 changed file with 46 additions and 48 deletions.
94 changes: 46 additions & 48 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,41 @@
from dataclasses import dataclass
from datetime import datetime
import json
import threading
from multiprocessing.context import SpawnContext

import threading
import time
from typing import (
Any,
Dict,
FrozenSet,
Iterable,
List,
Optional,
Type,
Set,
Union,
FrozenSet,
Tuple,
Iterable,
TYPE_CHECKING,
Type,
Union,
)

from dbt.adapters.contracts.relation import RelationConfig
import google.api_core
import google.auth
import google.cloud.bigquery
from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable
import google.cloud.exceptions
import google.oauth2
import pytz

from dbt_common.contracts.constraints import (
ColumnLevelConstraint,
ConstraintType,
ModelLevelConstraint,
)
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.functions import fire_event
import dbt_common.exceptions
import dbt_common.exceptions.base
from dbt_common.utils import filter_null_values
from dbt.adapters.base import (
AdapterConfig,
BaseAdapter,
Expand All @@ -37,33 +51,18 @@
from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.contracts.macros import MacroResolverProtocol
from dbt_common.contracts.constraints import (
ColumnLevelConstraint,
ConstraintType,
ModelLevelConstraint,
)
from dbt_common.dataclass_schema import dbtClassMixin
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.events.logging import AdapterLogger
from dbt_common.events.functions import fire_event
from dbt.adapters.events.types import SchemaCreation, SchemaDrop
import dbt_common.exceptions
from dbt_common.utils import filter_null_values
import google.api_core
import google.auth
import google.oauth2
import google.cloud.bigquery
from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable
import google.cloud.exceptions
import pytz

from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager
from dbt.adapters.bigquery.column import get_nested_column_data_types
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset
from dbt.adapters.bigquery.connections import (
BigQueryAdapterResponse,
ClusterDataprocHelper,
ServerlessDataProcHelper,
)
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset
from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.adapters.bigquery.relation_configs import (
BigQueryBaseRelationConfig,
Expand All @@ -77,13 +76,18 @@
# Used by mypy for earlier type hints.
import agate

logger = AdapterLogger("BigQuery")

_logger = AdapterLogger("BigQuery")


# Write dispositions for bigquery.
WRITE_APPEND = google.cloud.bigquery.job.WriteDisposition.WRITE_APPEND
WRITE_TRUNCATE = google.cloud.bigquery.job.WriteDisposition.WRITE_TRUNCATE
_WRITE_APPEND = google.cloud.bigquery.job.WriteDisposition.WRITE_APPEND
_WRITE_TRUNCATE = google.cloud.bigquery.job.WriteDisposition.WRITE_TRUNCATE


_CREATE_SCHEMA_MACRO_NAME = "create_schema"


CREATE_SCHEMA_MACRO_NAME = "create_schema"
_dataset_lock = threading.Lock()


Expand All @@ -96,14 +100,8 @@ def render(self):
return f"{self.project}.{self.dataset}"


def _stub_relation(*args, **kwargs):
return BigQueryRelation.create(
database="", schema="", identifier="", quote_policy={}, type=BigQueryRelation.Table
)


@dataclass
class BigqueryConfig(AdapterConfig):
class _BigqueryConfig(AdapterConfig):
cluster_by: Optional[Union[List[str], str]] = None
partition_by: Optional[Dict[str, Any]] = None
kms_key_name: Optional[str] = None
Expand Down Expand Up @@ -133,7 +131,7 @@ class BigQueryAdapter(BaseAdapter):
Column = BigQueryColumn
ConnectionManager = BigQueryConnectionManager

AdapterSpecificConfigs = BigqueryConfig
AdapterSpecificConfigs = _BigqueryConfig

CONSTRAINT_SUPPORT = {
ConstraintType.check: ConstraintSupport.NOT_SUPPORTED,
Expand Down Expand Up @@ -246,7 +244,7 @@ def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryCo
return self._get_dbt_columns_from_bq_table(table)

except (ValueError, google.cloud.exceptions.NotFound) as e:
logger.debug("get_columns_in_relation error: {}".format(e))
_logger.debug("get_columns_in_relation error: {}".format(e))
return []

@available.parse(lambda *a, **k: [])
Expand Down Expand Up @@ -303,7 +301,7 @@ def list_relations_without_caching(
except google.api_core.exceptions.NotFound:
return []
except google.api_core.exceptions.Forbidden as exc:
logger.debug("list_relations_without_caching error: {}".format(str(exc)))
_logger.debug("list_relations_without_caching error: {}".format(str(exc)))
return []

def get_relation(
Expand Down Expand Up @@ -333,7 +331,7 @@ def create_schema(self, relation: BigQueryRelation) -> None:
kwargs = {
"relation": relation,
}
self.execute_macro(CREATE_SCHEMA_MACRO_NAME, kwargs=kwargs)
self.execute_macro(_CREATE_SCHEMA_MACRO_NAME, kwargs=kwargs)
self.commit_if_has_connection()
# we can't update the cache here, as if the schema already existed we
# don't want to (incorrectly) say that it's empty
Expand All @@ -342,7 +340,7 @@ def drop_schema(self, relation: BigQueryRelation) -> None:
# still use a client method, rather than SQL 'drop schema ... cascade'
database = relation.database
schema = relation.schema
logger.debug('Dropping schema "{}.{}".', database, schema) # in lieu of SQL
_logger.debug('Dropping schema "{}.{}".', database, schema) # in lieu of SQL
fire_event(SchemaDrop(relation=_make_ref_key_dict(relation)))
self.connections.drop_dataset(database, schema)
self.cache.drop_schema(database, schema)
Expand Down Expand Up @@ -420,9 +418,9 @@ def _agate_to_schema(
@available.parse(lambda *a, **k: "")
def copy_table(self, source, destination, materialization):
if materialization == "incremental":
write_disposition = WRITE_APPEND
write_disposition = _WRITE_APPEND
elif materialization == "table":
write_disposition = WRITE_TRUNCATE
write_disposition = _WRITE_TRUNCATE
else:
raise dbt_common.exceptions.CompilationError(
'Copy table materialization must be "copy" or "table", but '
Expand Down Expand Up @@ -458,7 +456,7 @@ def get_columns_in_select_sql(self, select_sql: str) -> List[BigQueryColumn]:
return self._get_dbt_columns_from_bq_table(query_table)

except (ValueError, google.cloud.exceptions.NotFound) as e:
logger.debug("get_columns_in_select_sql error: {}".format(e))
_logger.debug("get_columns_in_select_sql error: {}".format(e))
return []

@classmethod
Expand Down Expand Up @@ -492,7 +490,7 @@ def _bq_table_to_relation(self, bq_table) -> Union[BigQueryRelation, None]:
@classmethod
def warning_on_hooks(cls, hook_type):
msg = "{} is not supported in bigquery and will be ignored"
logger.info(msg)
_logger.info(msg)

@available
def add_query(self, sql, auto_begin=True, bindings=None, abridge_sql_log=False):
Expand Down Expand Up @@ -659,7 +657,7 @@ def update_table_description(

@available.parse_none
def alter_table_add_columns(self, relation, columns):
logger.debug('Adding columns ({}) to table {}".'.format(columns, relation))
_logger.debug('Adding columns ({}) to table {}".'.format(columns, relation))

conn = self.connections.get_thread_connection()
client = conn.handle
Expand Down Expand Up @@ -747,7 +745,7 @@ def _get_catalog_schemas(self, relation_config: Iterable[RelationConfig]) -> Sch
if candidate.schema in db_schemas[database]:
result[candidate] = schemas
else:
logger.debug(
_logger.debug(
"Skipping catalog for {}.{} - schema does not exist".format(
database, candidate.schema
)
Expand Down Expand Up @@ -867,7 +865,7 @@ def grant_access_to(self, entity, entity_type, role, grant_target_dict) -> None:
access_entry = AccessEntry(role, entity_type, entity)
# only perform update if access entry not in dataset
if is_access_entry_in_dataset(dataset, access_entry):
logger.warning(f"Access entry {access_entry} " f"already exists in dataset")
_logger.warning(f"Access entry {access_entry} " f"already exists in dataset")
else:
dataset = add_access_entry_to_dataset(dataset, access_entry)
client.update_dataset(dataset, ["access_entries"])
Expand Down

0 comments on commit 5378259

Please sign in to comment.