Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: MERGE/Upsert Support #1534

Closed
wants to merge 54 commits into from
Closed
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
fccb74b
test
mattmartin14 Jan 14, 2025
7298589
unit testing
mattmartin14 Jan 14, 2025
25bc9cf
adding unit tests
mattmartin14 Jan 14, 2025
af6c868
adding unit tests
mattmartin14 Jan 14, 2025
94be807
adding unit tests
mattmartin14 Jan 14, 2025
269d9f5
adding unit tests
mattmartin14 Jan 15, 2025
f44c61a
adding unit tests
mattmartin14 Jan 15, 2025
a96fdf9
finished unit tests
mattmartin14 Jan 16, 2025
fa5ab35
removed unnecesary return
mattmartin14 Jan 16, 2025
cfa2277
updated poetry manifest list for datafusion package dependency
mattmartin14 Jan 17, 2025
35f29be
added license headers, cleaned up dead code
mattmartin14 Jan 22, 2025
6c68d0d
updated the merge function to use bools for matched and not matched rows
mattmartin14 Jan 29, 2025
2d1e8ae
incorporated changes for boolExpression. It simplified the filters a lot
mattmartin14 Jan 31, 2025
f988f25
moved the filter build function to a separate function to accomodate …
mattmartin14 Jan 31, 2025
43393b4
removed unneccessary comment
mattmartin14 Jan 31, 2025
9a561b4
removed test files
mattmartin14 Jan 31, 2025
9ef39a6
bug fixes and removed some more dependency on datafusion
mattmartin14 Feb 3, 2025
2ba1ed6
updated various items including adding a dataclass return result
mattmartin14 Feb 4, 2025
a42eecd
updated merge_rows to remove dependency from datafusion! wahoo
mattmartin14 Feb 4, 2025
1305f58
renamed merge_rows to upsert, removed unnecessary code. will put in f…
mattmartin14 Feb 5, 2025
b2be3db
adding params to unit testing for pytest; having some errors
mattmartin14 Feb 5, 2025
f5688ad
fixed bugs on unit testing; added context wrapper for txn; fixed vari…
mattmartin14 Feb 5, 2025
7d55a4e
bug fixes
mattmartin14 Feb 6, 2025
2e14767
updated some error throwing items
mattmartin14 Feb 6, 2025
85c5848
moved datafusion to just a dev dependency in poetry toml
mattmartin14 Feb 6, 2025
6472071
updated UpsertRow class to be recognized in the return statement
mattmartin14 Feb 6, 2025
51c34da
removed some spaces and streamlined assert statements in unit testing
mattmartin14 Feb 6, 2025
862a69a
updated test cases to use an InMemory catalog
mattmartin14 Feb 7, 2025
3731b86
updated some formatting; added more commentary on the rows_to_update …
mattmartin14 Feb 7, 2025
bbb35d6
rebased poetry lock file and pyproject.toml file; removed sf repo info
mattmartin14 Feb 10, 2025
c8189c9
Merge branch 'main' into main
mattmartin14 Feb 10, 2025
02af4d4
updated equality checks with not instead of == false
mattmartin14 Feb 10, 2025
cc75192
ran ruff check --fix
mattmartin14 Feb 10, 2025
998d98b
manually added lint fixes and updated poetry toml and lock files. tha…
mattmartin14 Feb 11, 2025
513c839
added formatting fices
mattmartin14 Feb 11, 2025
0fd6446
remove the node_modules
mattmartin14 Feb 11, 2025
5fc3478
updated code for another round of fixes
mattmartin14 Feb 11, 2025
6cef789
removed npm uneeded files
mattmartin14 Feb 11, 2025
40b69b8
fixed formatting on upsert function for docs build
mattmartin14 Feb 12, 2025
804c526
Merge branch 'main' into main
mattmartin14 Feb 12, 2025
09e0347
rebased for poetry lock files
mattmartin14 Feb 12, 2025
ca2d904
updated lock files. thanks kevin
mattmartin14 Feb 12, 2025
77375fb
fixed other changes
mattmartin14 Feb 12, 2025
ba4db49
fixed gitignore file
mattmartin14 Feb 12, 2025
622e66c
no whitespace
mattmartin14 Feb 12, 2025
9e79dad
fixed vendor fb file from kevins changes
mattmartin14 Feb 12, 2025
4cbf3e3
reverting vendor changes
mattmartin14 Feb 12, 2025
5333a1e
removing node modules
mattmartin14 Feb 12, 2025
11a25be
updating vendor files
mattmartin14 Feb 12, 2025
03a8d10
Update vendor/fb303/FacebookService.py
mattmartin14 Feb 12, 2025
8a2143c
updated vendor files
mattmartin14 Feb 12, 2025
e719cf8
updated vendor files
mattmartin14 Feb 12, 2025
245b4a9
attempting to update poetry files
mattmartin14 Feb 12, 2025
e3e9611
Merge branch 'main' into main
mattmartin14 Feb 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ htmlcov
pyiceberg/avro/decoder_fast.c
pyiceberg/avro/*.html
pyiceberg/avro/*.so

# node modules
node_modules/
311 changes: 303 additions & 8 deletions poetry.lock

Large diffs are not rendered by default.

80 changes: 80 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"


@dataclass()
class UpsertResult:
"""Summary the upsert operation."""

rows_updated: int = 0
rows_inserted: int = 0


class TableProperties:
PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes"
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB
Expand Down Expand Up @@ -1086,6 +1094,78 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()

def upsert(
self, df: pa.Table, join_cols: list[str], when_matched_update_all: bool = True, when_not_matched_insert_all: bool = True
) -> UpsertResult:
"""Shorthand API for performing an upsert to an iceberg table.
mattmartin14 marked this conversation as resolved.
Show resolved Hide resolved

Args:
self: the target Iceberg table to execute the upsert on
df: The input dataframe to upsert with the table's data.
join_cols: The columns to join on. These are essentially analogous to primary keys
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table

Example Use Cases:
Case 1: Both Parameters = True (Full Upsert)
Existing row found → Update it
New row found → Insert it

Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
Existing row found → Do nothing (no updates)
New row found → Insert it

Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
Existing row found → Update it
New row found → Do nothing (no inserts)

Case 4: Both Parameters = False (No Merge Effect)
Existing row found → Do nothing
New row found → Do nothing
(Function effectively does nothing)


Returns:
An UpsertResult class (contains details of rows updated and inserted)
"""
from pyiceberg.table import upsert_util

if not when_matched_update_all and not when_not_matched_insert_all:
raise ValueError("no upsert options selected...exiting")

if upsert_util.has_duplicate_rows(df, join_cols):
raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")

# get list of rows that exist so we don't have to load the entire target table
matched_predicate = upsert_util.create_match_filter(df, join_cols)
matched_iceberg_table = self.scan(row_filter=matched_predicate).to_arrow()

update_row_cnt = 0
insert_row_cnt = 0

with self.transaction() as tx:
if when_matched_update_all:
# function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
# we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
# this extra step avoids unnecessary IO and writes
rows_to_update = upsert_util.get_rows_to_update(df, matched_iceberg_table, join_cols)

update_row_cnt = len(rows_to_update)

# build the match predicate filter
overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)

tx.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate)

if when_not_matched_insert_all:
rows_to_insert = upsert_util.get_rows_to_insert(df, matched_iceberg_table, join_cols)

insert_row_cnt = len(rows_to_insert)

tx.append(rows_to_insert)

return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Shorthand API for appending a PyArrow table to the table.
Expand Down
118 changes: 118 additions & 0 deletions pyiceberg/table/upsert_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import functools
import operator

import pyarrow as pa
mattmartin14 marked this conversation as resolved.
Show resolved Hide resolved
from pyarrow import Table as pyarrow_table
from pyarrow import compute as pc

from pyiceberg.expressions import (
And,
BooleanExpression,
EqualTo,
In,
Or,
)


def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpression:
unique_keys = df.select(join_cols).group_by(join_cols).aggregate([])

if len(join_cols) == 1:
return In(join_cols[0], unique_keys[0].to_pylist())
else:
return Or(*[And(*[EqualTo(col, row[col]) for col in join_cols]) for row in unique_keys.to_pylist()])


def has_duplicate_rows(df: pyarrow_table, join_cols: list[str]) -> bool:
"""Check for duplicate rows in a PyArrow table based on the join columns."""
return len(df.select(join_cols).group_by(join_cols).aggregate([([], "count_all")]).filter(pc.field("count_all") > 1)) > 0


def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols: list[str]) -> pa.Table:
"""
Return a table with rows that need to be updated in the target table based on the join columns.

When a row is matched, an additional scan is done to evaluate the non-key columns to detect if an actual change has occurred.
Only matched rows that have an actual change to a non-key column value will be returned in the final output.
"""
all_columns = set(source_table.column_names)
join_cols_set = set(join_cols)

non_key_cols = list(all_columns - join_cols_set)

match_expr = functools.reduce(operator.and_, [pc.field(col).isin(target_table.column(col).to_pylist()) for col in join_cols])

matching_source_rows = source_table.filter(match_expr)

rows_to_update = []

for index in range(matching_source_rows.num_rows):
mattmartin14 marked this conversation as resolved.
Show resolved Hide resolved
source_row = matching_source_rows.slice(index, 1)

target_filter = functools.reduce(operator.and_, [pc.field(col) == source_row.column(col)[0].as_py() for col in join_cols])

matching_target_row = target_table.filter(target_filter)

if matching_target_row.num_rows > 0:
needs_update = False

for non_key_col in non_key_cols:
source_value = source_row.column(non_key_col)[0].as_py()
target_value = matching_target_row.column(non_key_col)[0].as_py()

if source_value != target_value:
mattmartin14 marked this conversation as resolved.
Show resolved Hide resolved
needs_update = True
break

if needs_update:
rows_to_update.append(source_row)

if rows_to_update:
rows_to_update_table = pa.concat_tables(rows_to_update)
else:
rows_to_update_table = pa.Table.from_arrays([], names=source_table.column_names)

common_columns = set(source_table.column_names).intersection(set(target_table.column_names))
rows_to_update_table = rows_to_update_table.select(list(common_columns))

return rows_to_update_table


def get_rows_to_insert(source_table: pa.Table, target_table: pa.Table, join_cols: list[str]) -> pa.Table:
source_filter_expr = pc.scalar(True)

for col in join_cols:
target_values = target_table.column(col).to_pylist()
expr = pc.field(col).isin(target_values)

if source_filter_expr is None:
source_filter_expr = expr
else:
source_filter_expr = source_filter_expr & expr

non_matching_expr = ~source_filter_expr

source_columns = set(source_table.column_names)
target_columns = set(target_table.column_names)

common_columns = source_columns.intersection(target_columns)

non_matching_rows = source_table.filter(non_matching_expr).select(common_columns)

return non_matching_rows
Loading
Loading