Skip to content

Commit

Permalink
Merge branch 'main' of github.com:apache/iceberg-python into fd-join
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Feb 21, 2025
2 parents 8326db4 + 68a08b1 commit 73a7fe0
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 12 deletions.
14 changes: 14 additions & 0 deletions pyiceberg/expressions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ class BooleanExpression(ABC):
def __invert__(self) -> BooleanExpression:
"""Transform the Expression into its negated version."""

def __and__(self, other: BooleanExpression) -> BooleanExpression:
"""Perform and operation on another expression."""
if not isinstance(other, BooleanExpression):
raise ValueError(f"Expected BooleanExpression, got: {other}")

return And(self, other)

def __or__(self, other: BooleanExpression) -> BooleanExpression:
"""Perform or operation on another expression."""
if not isinstance(other, BooleanExpression):
raise ValueError(f"Expected BooleanExpression, got: {other}")

return Or(self, other)


class Term(Generic[L], ABC):
"""A simple expression that evaluates to a value."""
Expand Down
7 changes: 6 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,9 @@ def overwrite(
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)

self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)
if overwrite_filter != AlwaysFalse():
# Only delete when the filter is != AlwaysFalse
self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)

with self._append_snapshot_producer(snapshot_properties) as append_files:
# skip writing data files if the dataframe is empty
Expand Down Expand Up @@ -1159,6 +1161,9 @@ def upsert(
else:
raise ValueError(f"Field-ID could not be found: {join_cols}")

if len(join_cols) == 0:
raise ValueError("Join columns could not be found, please set identifier-field-ids or pass in explicitly.")

if not when_matched_update_all and not when_not_matched_insert_all:
raise ValueError("no upsert options selected...exiting")

Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/table/upsert_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols
if has_duplicate_rows(target_table, join_cols):
raise ValueError("Target table has duplicate rows, aborting upsert")

if len(target_table) == 0:
# When the target table is empty, there is nothing to update :)
return source_table.schema.empty_table()

diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != pc.field(f"{col}-rhs") for col in non_key_cols])

return (
Expand Down
14 changes: 14 additions & 0 deletions tests/expressions/test_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,21 +698,35 @@ def test_and() -> None:
null = IsNull(Reference("a"))
nan = IsNaN(Reference("b"))
and_ = And(null, nan)

# Some syntactic sugar
assert and_ == null & nan

assert str(and_) == f"And(left={str(null)}, right={str(nan)})"
assert repr(and_) == f"And(left={repr(null)}, right={repr(nan)})"
assert and_ == eval(repr(and_))
assert and_ == pickle.loads(pickle.dumps(and_))

with pytest.raises(ValueError, match="Expected BooleanExpression, got: abc"):
null & "abc" # type: ignore


def test_or() -> None:
null = IsNull(Reference("a"))
nan = IsNaN(Reference("b"))
or_ = Or(null, nan)

# Some syntactic sugar
assert or_ == null | nan

assert str(or_) == f"Or(left={str(null)}, right={str(nan)})"
assert repr(or_) == f"Or(left={repr(null)}, right={repr(nan)})"
assert or_ == eval(repr(or_))
assert or_ == pickle.loads(pickle.dumps(or_))

with pytest.raises(ValueError, match="Expected BooleanExpression, got: abc"):
null | "abc" # type: ignore


def test_not() -> None:
null = IsNull(Reference("a"))
Expand Down
95 changes: 84 additions & 11 deletions tests/table/test_upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import And, EqualTo, Reference
from pyiceberg.expressions.literals import LongLiteral
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.schema import Schema
from pyiceberg.table import UpsertResult
from pyiceberg.table.upsert_util import create_match_filter
Expand Down Expand Up @@ -328,7 +329,7 @@ def test_upsert_with_identifier_fields(catalog: Catalog) -> None:

schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
NestedField(2, "population", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1],
)
Expand All @@ -338,30 +339,30 @@ def test_upsert_with_identifier_fields(catalog: Catalog) -> None:
arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
pa.field("population", pa.int32(), nullable=False),
]
)

# Write some data
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "inhabitants": 921402},
{"city": "San Francisco", "inhabitants": 808988},
{"city": "Drachten", "inhabitants": 45019},
{"city": "Paris", "inhabitants": 2103000},
{"city": "Amsterdam", "population": 921402},
{"city": "San Francisco", "population": 808988},
{"city": "Drachten", "population": 45019},
{"city": "Paris", "population": 2103000},
],
schema=arrow_schema,
)
tbl.append(df)

df = pa.Table.from_pylist(
[
# Will be updated, the inhabitants has been updated
{"city": "Drachten", "inhabitants": 45505},
# Will be updated, the population has been updated
{"city": "Drachten", "population": 45505},
# New row, will be inserted
{"city": "Berlin", "inhabitants": 3432000},
{"city": "Berlin", "population": 3432000},
# Ignored, already exists in the table
{"city": "Paris", "inhabitants": 2103000},
{"city": "Paris", "population": 2103000},
],
schema=arrow_schema,
)
Expand All @@ -371,6 +372,42 @@ def test_upsert_with_identifier_fields(catalog: Catalog) -> None:
assert upd.rows_inserted == 1


def test_upsert_into_empty_table(catalog: Catalog) -> None:
identifier = "default.test_upsert_into_empty_table"
_drop_table(catalog, identifier)

schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1],
)

tbl = catalog.create_table(identifier, schema=schema)

arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)

# Write some data
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "inhabitants": 921402},
{"city": "San Francisco", "inhabitants": 808988},
{"city": "Drachten", "inhabitants": 45019},
{"city": "Paris", "inhabitants": 2103000},
],
schema=arrow_schema,
)
upd = tbl.upsert(df)

assert upd.rows_updated == 0
assert upd.rows_inserted == 4


def test_create_match_filter_single_condition() -> None:
"""
Test create_match_filter with a composite key where the source yields exactly one unique key.
Expand All @@ -392,9 +429,10 @@ def test_create_match_filter_single_condition() -> None:

def test_upsert_with_duplicate_rows_in_table(catalog: Catalog) -> None:
identifier = "default.test_upsert_with_duplicate_rows_in_table"
_drop_table(catalog, identifier)

_drop_table(catalog, identifier)
schema = Schema(

NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
Expand Down Expand Up @@ -430,3 +468,38 @@ def test_upsert_with_duplicate_rows_in_table(catalog: Catalog) -> None:

with pytest.raises(ValueError, match="Target table has duplicate rows, aborting upsert"):
_ = tbl.upsert(df)






def test_upsert_without_identifier_fields(catalog: Catalog) -> None:
identifier = "default.test_upsert_without_identifier_fields"

_drop_table(catalog, identifier)

schema = Schema(
NestedField(1, "city", StringType(), required=True),

NestedField(2, "population", IntegerType(), required=True),
# No identifier field :o
identifier_field_ids=[],
)

tbl = catalog.create_table(identifier, schema=schema)
# Write some data
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "population": 921402},
{"city": "San Francisco", "population": 808988},
{"city": "Drachten", "population": 45019},
{"city": "Paris", "population": 2103000},
],
schema=schema_to_pyarrow(schema),
)

with pytest.raises(
ValueError, match="Join columns could not be found, please set identifier-field-ids or pass in explicitly."
):
tbl.upsert(df)

0 comments on commit 73a7fe0

Please sign in to comment.