Skip to content

Commit

Permalink
fix: to_gbq uses default_type for ambiguous array types and struc…
Browse files Browse the repository at this point in the history
…t field types (#838)

* fix: `to_gbq` uses `default_type` for ambiguous array types and struct field types

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix arrow list(null) case too

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* lint

* Update pandas_gbq/schema/pandas_to_bigquery.py

Co-authored-by: Chalmer Lowe <[email protected]>

* Update pandas_gbq/schema/pandas_to_bigquery.py

Co-authored-by: Chalmer Lowe <[email protected]>

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* remove redundant string check

* Apply suggestions from code review

Co-authored-by: Chalmer Lowe <[email protected]>

* add docstrings and a few more test cases

* use python 3.10 for docs github action

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Chalmer Lowe <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent 5484a8c commit cf1aadd
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.9"
python-version: "3.10"
- name: Install nox
run: |
python -m pip install --upgrade setuptools pip wheel
Expand Down
1 change: 1 addition & 0 deletions owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"noxfile.py",
"README.rst",
# exclude this file as we have an alternate prerelease.cfg
".github/workflows/docs.yml",
".kokoro/presubmit/prerelease-deps.cfg",
".kokoro/presubmit/presubmit.cfg",
],
Expand Down
111 changes: 92 additions & 19 deletions pandas_gbq/schema/pandas_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import collections.abc
import datetime
from typing import Optional, Tuple
from typing import Any, Optional, Tuple
import warnings

import db_dtypes
Expand All @@ -28,14 +28,21 @@
# `docs/source/writing.rst`.
_PANDAS_DTYPE_TO_BQ = {
"bool": "BOOLEAN",
"boolean": "BOOLEAN",
"datetime64[ns, UTC]": "TIMESTAMP",
"datetime64[us, UTC]": "TIMESTAMP",
"datetime64[ns]": "DATETIME",
"datetime64[us]": "DATETIME",
"float32": "FLOAT",
"float64": "FLOAT",
"int8": "INTEGER",
"int16": "INTEGER",
"int32": "INTEGER",
"int64": "INTEGER",
"Int8": "INTEGER",
"Int16": "INTEGER",
"Int32": "INTEGER",
"Int64": "INTEGER",
"uint8": "INTEGER",
"uint16": "INTEGER",
"uint32": "INTEGER",
Expand Down Expand Up @@ -103,7 +110,7 @@ def dataframe_to_bigquery_fields(

# Try to automatically determine the type based on a few rows of the data.
values = dataframe.reset_index()[column]
bq_field = values_to_bigquery_field(column, values)
bq_field = values_to_bigquery_field(column, values, default_type=default_type)

if bq_field:
bq_schema_out.append(bq_field)
Expand All @@ -114,7 +121,9 @@ def dataframe_to_bigquery_fields(
arrow_value = pyarrow.array(values)
bq_field = (
pandas_gbq.schema.pyarrow_to_bigquery.arrow_type_to_bigquery_field(
column, arrow_value.type
column,
arrow_value.type,
default_type=default_type,
)
)

Expand Down Expand Up @@ -151,6 +160,19 @@ def dataframe_to_bigquery_fields(


def dtype_to_bigquery_field(name, dtype) -> Optional[schema.SchemaField]:
"""Infers the BigQuery schema field type from a pandas dtype.
Args:
name (str):
Name of the column/field.
dtype:
A pandas / numpy dtype object.
Returns:
Optional[schema.SchemaField]:
The schema field, or None if a type cannot be inferred, such as if
it is ambiguous like the object dtype.
"""
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)

if bq_type is not None:
Expand All @@ -164,9 +186,44 @@ def dtype_to_bigquery_field(name, dtype) -> Optional[schema.SchemaField]:
return None


def value_to_bigquery_field(name, value) -> Optional[schema.SchemaField]:
if isinstance(value, str):
return schema.SchemaField(name, "STRING")
def value_to_bigquery_field(
name: str, value: Any, default_type: Optional[str] = None
) -> Optional[schema.SchemaField]:
"""Infers the BigQuery schema field type from a single value.
Args:
name:
The name of the field.
value:
The value to infer the type from. If None, the default type is used
if available.
default_type:
The default field type. Defaults to None.
Returns:
The schema field, or None if a type cannot be inferred.
"""

# Set the SchemaField datatype to the given default_type if the value
# being assessed is None.
if value is None:
return schema.SchemaField(name, default_type)

# Map from Python types to BigQuery types. This isn't super exhaustive
# because we rely more on pyarrow, which can check more than one value to
# determine the type.
type_mapping = {
str: "STRING",
}

# geopandas and shapely are optional dependencies, so only check if those
# are installed.
if _BaseGeometry is not None:
type_mapping[_BaseGeometry] = "GEOGRAPHY"

for type_, bq_type in type_mapping.items():
if isinstance(value, type_):
return schema.SchemaField(name, bq_type)

# For timezone-naive datetimes, the later pyarrow conversion to try and
# learn the type add a timezone to such datetimes, causing them to be
Expand All @@ -182,35 +239,51 @@ def value_to_bigquery_field(name, value) -> Optional[schema.SchemaField]:
else:
return schema.SchemaField(name, "DATETIME")

if _BaseGeometry is not None and isinstance(value, _BaseGeometry):
return schema.SchemaField(name, "GEOGRAPHY")

return None


def values_to_bigquery_field(name, values) -> Optional[schema.SchemaField]:
def values_to_bigquery_field(
name: str, values: Any, default_type: str = "STRING"
) -> Optional[schema.SchemaField]:
"""Infers the BigQuery schema field type from a list of values.
This function iterates through the given values to determine the
corresponding schema field type.
Args:
name:
The name of the field.
values:
An iterable of values to infer the type from. If all the values
are None or the iterable is empty, the function returns None.
default_type:
The default field type to use if a specific type cannot be
determined from the values. Defaults to "STRING".
Returns:
The schema field, or None if a type cannot be inferred.
"""
value = pandas_gbq.core.pandas.first_valid(values)

# All NULL, type not determinable.
# All values came back as NULL, thus type not determinable by this method.
# Return None so we can try other methods.
if value is None:
return None

field = value_to_bigquery_field(name, value)
if field is not None:
field = value_to_bigquery_field(name, value, default_type=default_type)
if field:
return field

if isinstance(value, str):
return schema.SchemaField(name, "STRING")

# Check plain ARRAY values here. Let STRUCT get determined by pyarrow,
# which can examine more values to determine all keys.
# Check plain ARRAY values here. Exclude mapping types to let STRUCT get
# determined by pyarrow, which can examine more values to determine all
# keys.
if isinstance(value, collections.abc.Iterable) and not isinstance(
value, collections.abc.Mapping
):
# It could be that this value contains all None or is empty, so get the
# first non-None value we can find.
valid_item = pandas_gbq.core.pandas.first_array_valid(values)
field = value_to_bigquery_field(name, valid_item)
field = value_to_bigquery_field(name, valid_item, default_type=default_type)

if field is not None:
return schema.SchemaField(name, field.field_type, mode="REPEATED")
Expand Down
61 changes: 56 additions & 5 deletions pandas_gbq/schema/pyarrow_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,31 @@
}


def arrow_type_to_bigquery_field(name, type_) -> Optional[schema.SchemaField]:
def arrow_type_to_bigquery_field(
name, type_, default_type="STRING"
) -> Optional[schema.SchemaField]:
"""Infers the BigQuery schema field type from an arrow type.
Args:
name (str):
Name of the column/field.
type_:
A pyarrow type object.
Returns:
Optional[schema.SchemaField]:
The schema field, or None if a type cannot be inferred, such as if
it is a type that doesn't have a clear mapping in BigQuery.
null() are assumed to be the ``default_type``, since there are no
values that contradict that.
"""
# If a sub-field is the null type, then assume it's the default type, as
# that's the best we can do.
# https://github.com/googleapis/python-bigquery-pandas/issues/836
if pyarrow.types.is_null(type_):
return schema.SchemaField(name, default_type)

# Since both TIMESTAMP/DATETIME use pyarrow.timestamp(...), we need to use
# a special case to disambiguate them. See:
# https://github.com/googleapis/python-bigquery-pandas/issues/450
Expand All @@ -52,22 +76,49 @@ def arrow_type_to_bigquery_field(name, type_) -> Optional[schema.SchemaField]:
return schema.SchemaField(name, detected_type)

if pyarrow.types.is_list(type_):
return arrow_list_type_to_bigquery(name, type_)
return arrow_list_type_to_bigquery(name, type_, default_type=default_type)

if pyarrow.types.is_struct(type_):
inner_fields: list[pyarrow.Field] = []
struct_type = cast(pyarrow.StructType, type_)
for field_index in range(struct_type.num_fields):
field = struct_type[field_index]
inner_fields.append(arrow_type_to_bigquery_field(field.name, field.type))
inner_fields.append(
arrow_type_to_bigquery_field(
field.name, field.type, default_type=default_type
)
)

return schema.SchemaField(name, "RECORD", fields=inner_fields)

return None


def arrow_list_type_to_bigquery(name, type_) -> Optional[schema.SchemaField]:
inner_field = arrow_type_to_bigquery_field(name, type_.value_type)
def arrow_list_type_to_bigquery(
name, type_, default_type="STRING"
) -> Optional[schema.SchemaField]:
"""Infers the BigQuery schema field type from an arrow list type.
Args:
name (str):
Name of the column/field.
type_:
A pyarrow type object.
Returns:
Optional[schema.SchemaField]:
The schema field, or None if a type cannot be inferred, such as if
it is a type that doesn't have a clear mapping in BigQuery.
null() are assumed to be the ``default_type``, since there are no
values that contradict that.
"""
inner_field = arrow_type_to_bigquery_field(
name, type_.value_type, default_type=default_type
)

# If this is None, it means we got some type that we can't cleanly map to
# a BigQuery type, so bubble that status up.
if inner_field is None:
return None

Expand Down
49 changes: 40 additions & 9 deletions tests/unit/schema/test_pandas_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,34 @@ def module_under_test():
def test_dataframe_to_bigquery_fields_w_named_index(module_under_test):
df_data = collections.OrderedDict(
[
("str_index", ["a", "b"]),
("str_column", ["hello", "world"]),
("int_column", [42, 8]),
("nullable_int_column", pandas.Series([42, None], dtype="Int64")),
("uint_column", pandas.Series([7, 13], dtype="uint8")),
("bool_column", [True, False]),
("boolean_column", pandas.Series([True, None], dtype="boolean")),
(
"datetime_column",
[
datetime.datetime(1999, 12, 31, 23, 59, 59, 999999),
datetime.datetime(2000, 1, 1, 0, 0, 0),
],
),
(
"timestamp_column",
[
datetime.datetime(
1999, 12, 31, 23, 59, 59, 999999, tzinfo=datetime.timezone.utc
),
datetime.datetime(
2000, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc
),
],
),
]
)
index = pandas.Index(["a", "b"], name="str_index")
dataframe = pandas.DataFrame(df_data, index=index)
dataframe = pandas.DataFrame(df_data).set_index("str_index", drop=True)

returned_schema = module_under_test.dataframe_to_bigquery_fields(
dataframe, [], index=True
Expand All @@ -37,27 +58,37 @@ def test_dataframe_to_bigquery_fields_w_named_index(module_under_test):
schema.SchemaField("str_index", "STRING", "NULLABLE"),
schema.SchemaField("str_column", "STRING", "NULLABLE"),
schema.SchemaField("int_column", "INTEGER", "NULLABLE"),
schema.SchemaField("nullable_int_column", "INTEGER", "NULLABLE"),
schema.SchemaField("uint_column", "INTEGER", "NULLABLE"),
schema.SchemaField("bool_column", "BOOLEAN", "NULLABLE"),
schema.SchemaField("boolean_column", "BOOLEAN", "NULLABLE"),
schema.SchemaField("datetime_column", "DATETIME", "NULLABLE"),
schema.SchemaField("timestamp_column", "TIMESTAMP", "NULLABLE"),
)
assert returned_schema == expected_schema


def test_dataframe_to_bigquery_fields_w_multiindex(module_under_test):
df_data = collections.OrderedDict(
[
("str_index", ["a", "a"]),
("int_index", [0, 0]),
(
"dt_index",
[
datetime.datetime(1999, 12, 31, 23, 59, 59, 999999),
datetime.datetime(2000, 1, 1, 0, 0, 0),
],
),
("str_column", ["hello", "world"]),
("int_column", [42, 8]),
("bool_column", [True, False]),
]
)
index = pandas.MultiIndex.from_tuples(
[
("a", 0, datetime.datetime(1999, 12, 31, 23, 59, 59, 999999)),
("a", 0, datetime.datetime(2000, 1, 1, 0, 0, 0)),
],
names=["str_index", "int_index", "dt_index"],
dataframe = pandas.DataFrame(df_data).set_index(
["str_index", "int_index", "dt_index"],
drop=True,
)
dataframe = pandas.DataFrame(df_data, index=index)

returned_schema = module_under_test.dataframe_to_bigquery_fields(
dataframe, [], index=True
Expand Down
18 changes: 8 additions & 10 deletions tests/unit/schema/test_pyarrow_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,14 @@ def test_arrow_type_to_bigquery_field_scalar_types(pyarrow_type, bigquery_type):


def test_arrow_type_to_bigquery_field_unknown():
assert (
pyarrow_to_bigquery.arrow_type_to_bigquery_field("test_name", pyarrow.null())
is None
)
assert pyarrow_to_bigquery.arrow_type_to_bigquery_field(
"test_name", pyarrow.null(), default_type="DEFAULT_TYPE"
) == bigquery.SchemaField("test_name", "DEFAULT_TYPE")


def test_arrow_type_to_bigquery_field_list_of_unknown():
assert (
pyarrow_to_bigquery.arrow_type_to_bigquery_field(
"test_name", pyarrow.list_(pyarrow.null())
)
is None
)
assert pyarrow_to_bigquery.arrow_type_to_bigquery_field(
"test_name",
pyarrow.list_(pyarrow.null()),
default_type="DEFAULT_TYPE",
) == bigquery.SchemaField("test_name", "DEFAULT_TYPE", mode="REPEATED")
Loading

0 comments on commit cf1aadd

Please sign in to comment.