Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

type_2_scd_generic_upsert does not handle NULL values properly #121

Open
dgcaron opened this issue Aug 8, 2023 · 7 comments
Open

type_2_scd_generic_upsert does not handle NULL values properly #121

dgcaron opened this issue Aug 8, 2023 · 7 comments

Comments

@dgcaron
Copy link
Contributor

dgcaron commented Aug 8, 2023

The current way the type_2_scd_generic_upsert function checks for changes in a row involves evaluating each column and this does not yield the expected result. An possible perfomance improvement on this matter and cater for NULL values is to add a hash column that is calculated based on the contents of the columns in the table (except the scd2 system columns).

some background: https://datacadamia.com/dit/owb/scd2_hash

besides some code around it the most interesting changes would be
an udf to calculate the hash

differential_hash_udf = udf(
        lambda row: calculate_differential_hash(row, attr_col_names), StringType()
    )

addition of the hash column to the update set

updates_df = updates_df.withColumn(
        differential_hash_col_name, differential_hash_udf(struct(*attr_col_names))
    )

stage changes based on the hash of the columns instead of a column by column comparison

 staged_part_1 = (
        updates_df.alias("updates")
        .join(delta_table.toDF().alias("base"), primary_key)
        .where(
            f"base.{is_current_col_name} = true AND base.{differential_hash_col_name} <> updates.{differential_hash_col_name}"
        )
        .selectExpr("NULL as mergeKey", "updates.*")
    )

merge using the hash column

res = (
        delta_table.alias("base")
        .merge(
            source=staged_updates.alias("staged_updates"),
            condition=pyspark.sql.functions.expr(f"base.{primary_key} = mergeKey"),
        )
        .whenMatchedUpdate(
            condition=f"base.{is_current_col_name} = true AND base.{differential_hash_col_name} <> staged_updates.{differential_hash_col_name}",
            set={
                is_current_col_name: "false",
                end_time_col_name: f"staged_updates.{effective_time_col_name}",
            },
        )
        .whenNotMatchedInsert(values=res_thing)
        .execute()
    )

this is a breaking (not backwards compatible) change to what is now in the function so you could consider making this a different function?

@dgcaron
Copy link
Contributor Author

dgcaron commented Aug 8, 2023

this is the other issue mentioned in #120

@robertkossendey
Copy link
Collaborator

@dgcaron thank you for flagging this. I am not so familiar with the topic, could you explain the potential performance boost and how this will fix null value issues?

@dgcaron
Copy link
Contributor Author

dgcaron commented Sep 29, 2023

i believe the issue arises here https://github.com/MrPowers/mack/blob/main/mack/__init__.py#L103

 staged_updates_attrs = list(
        map(lambda attr: f"staged_updates.{attr} <> base.{attr}", attr_col_names)
    )

if the base.{attr} value is initially null for the first write, the scd2 isn't being built properly on next writes. this has to do with the way sql and pypark handle null values in comparisons.

the perfomance boost should come from the fact that you join based on a precalculated hash (the hash is also persisted in the scd2 table) on both sides instead on all columns of interest. you don't have to evaluate each column on both sides to check for changes this way.

i'll add a test case that shows the issue in the upcoming days

@robertkossendey
Copy link
Collaborator

@dgcaron understood. precalculating a hash column and comparing only that is way less expensive than comparing each column and it would solve the spark null problem, although this might be solvable through eqNullSafe (not sure though)

@dgcaron
Copy link
Contributor Author

dgcaron commented Sep 29, 2023

yes, i guess eqNullSafe should solve the null issue too but i am not sure how to implement that properly with an expression string. it would allow for a change that is non-breaking though, if that is the preference, than i can take a look some time soon

@robertkossendey
Copy link
Collaborator

@dgcaron Maybe we could do that as a first step. We could still implement a scd2 with hash later :)

@MrPowers
Copy link
Owner

@dgcaron - just a friendly ping on this one. Would love to get this fix added! Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants