Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: cost-effective merge for partitioned incremental models on BigQuery #1971

Closed

Conversation

jtcohen6
Copy link
Contributor

@jtcohen6 jtcohen6 commented Dec 3, 2019

Attempt an implementation of the solution suggested by @jarlainnix in dbt-labs/dbt-bigquery#1034

When running an incremental model on BigQuery incrementally, IFF the model is partitioned, dbt should:

  • create a temporary table using the model SQL (including is_incremental() input set limit)
  • run a statement to get the min and max partition values from that temporary table
  • filter by the partition range in the destination (already existing) table when merging to limit query cost

The way I've implemented this is quite verbose and requires adding a keyword arg to the contract of get_merge_sql. A better approach may leverage **kwargs. I'm very open to feedback.

@jtcohen6 jtcohen6 requested a review from drewbanin December 3, 2019 15:57
@cla-bot cla-bot bot added the cla:yes label Dec 3, 2019
@dbt-labs dbt-labs deleted a comment from Sherm4nLC Dec 4, 2019
@jarlainnix
Copy link

🤞 @jtcohen6 Thanks a lot. I hope it goes through. Can't wait for it. I believe it makes dbt_ better prepared for the bigger data sets and stronger to withstand the test of time. 😃

Copy link
Contributor

@drewbanin drewbanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is groovy, thanks for sending over the PR @jtcohen6!

I have some tiny thoughts about the code - I've dropped them inline here. I think the approach is a good one though. Let me spend some time testing this out :)

Copy link
Contributor

@drewbanin drewbanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comments

@inve1
Copy link

inve1 commented Dec 13, 2019

Hey, I was looking to do something like this and stumbled upon your PR.. I tried it and it works well

I have a couple of questions/comments:

  • why not select just the partition_by field and create a temp table with just that field? it would make that scan cheaper if the original select had a lot more columns (I think if you do something like select {{partition_by}} from ({{original_select}}) bigquery is smart enough to only scan that 1 column even if more were mentioned in the original)
  • if you don't want to just select the partition_by field for some reason that went over my head, why not set source_sql in the merge as
(
    select * from {{tmp_relation}}
)

in situations where tmp_relation gets created? If the original sql reduced the size from the original data, scanning the filtered/aggregated tmp_relation can be much cheaper.. I tried that and it cut the scanned GB to 1/4 in the data I was testing with

  • I'm looking at creating incremental models that look at something like the last 3 days of data and just overwrite those partitions at an interval. In that case I wouldn't need to figure out what the partition range is, it could be generated at compile time. I don't really have a suggestion here, but do you think this use case could be supported in a nice way after the change here? maybe allow passing dest_partition from the model?

Anyway, thanks for doing this, it's really helpful 👍

@jtcohen6
Copy link
Contributor Author

jtcohen6 commented Dec 13, 2019

@inve1 Thanks for trying it out, and for the great ideas/comments!

Even cheaper!

Between your two suggestions, I prefer the second, where we merge using the temporary table we previously created for the purposes of getting partition min/max:

     {% set source_sql -%}
       (
         select * from {{tmp_relation}}
       )
     {%- endset -%}

After some quick testing, I think this would be the most cost-effective.

1. Create partitioned temp table: charged for full scan of data needed by {{sql}} (model SQL)
2. Get partition min/max from partitioned temp table: negligible
3. Merge from temp table into existing table: charged for full scan of only the temp table + scan of selected partitions in preexisting table

Per your first suggestion, there may be some cases where it's extremely cheap to get the partition min/max values from the model SQL as

select min({{partition_by}}, max({{partition_by}}) from (
         {{ sql }}
       )

but then we would be charged the entire cost of the model SQL over again when we want to merge in step 3.

For unpartitioned tables, we should still elide the need to create a temp table and use

     {% set source_sql -%}
       (
         {{ sql }}
       )
     {%- endset -%}

because we're only going to be using the model SQL once, in the merge statement.

Partition overwrite

This is a great question—and I think it's asking after a genuinely different materialization strategy, one that performs a counterintuitive (but highly effective) merge like:

merge into {{ target }} as DBT_INTERNAL_DEST
        using {{ source }} as DBT_INTERNAL_SOURCE
        on
            FALSE

    when not matched by source
        and DBT_INTERNAL_DEST.{{partition_by}} between {{partition_min}} and {{partition_max}}
        then delete

    when not matched then insert
        ({{ dest_cols_csv }})
    values
        ({{ dest_cols_csv }})

This to me is an insert_overwrite incremental strategy, rather than a unique_key strategy, and it's very much in line what how we use incremental models in Spark.

@drewbanin What do you think about adding first-class support for this as an alternative strategy for incremental models on BQ, in the same way that we support two incremental strategies on Snowflake?

@inve1
Copy link

inve1 commented Dec 13, 2019

Yeah I agree the "selecting the temp table" approach seems a little better.

On that note, you could create a real bigquery temp table and do the whole merge job in one request.
I took the 3 queries that this pr generated for my model and created a sql like this

DECLARE
  partition_min,
  partition_max timestamp; 
CREATE TEMP TABLE scans__dbt_tmp -- scans is just the name of my model
PARTITION BY
  DATE(start_time) AS (
  SELECT
    ......
  FROM
   .....
;
SET
  (partition_min,
    partition_max) = (
  SELECT
    AS STRUCT MIN(start_time), -- partition_by column
    MAX(start_time)
  FROM
    scans__dbt_tmp);

MERGE INTO
  .... AS DBT_INTERNAL_DEST
USING
  (
  SELECT
    *
  FROM
    `scans__dbt_tmp` ) AS DBT_INTERNAL_SOURCE
ON
  DBT_INTERNAL_DEST.start_time BETWEEN partition_min AND partition_max
  AND {{unique_key_constraint_here}}
  WHEN MATCHED THEN UPDATE SET ....
  WHEN NOT MATCHED
  THEN ..... VALUES ......

This has the advantage of not creating a table you can see in your datasets, not getting charged for storing the temp table for 12 hours and scanning from the temp table is free.
I actually have models that make more rows than the source had, so technically re-scanning the source as implemented here originally would be cheaper, but with this approach that case would be handled too
nvm just read the right docs, temp table scans are charged, just the storage isn't
The only issue (not sure what you guys' policy is on this) is that bigquery scripting is in beta

Link to docs in case you haven't seen these

@jtcohen6
Copy link
Contributor Author

jtcohen6 commented Dec 13, 2019

That's really interesting stuff! I've only done a little reading about "true" temp tables and scripting in BQ, since they seem to be quite new as features. It looks like scripts are BQ's way of handling atomic transactions that require several DDL statements.

I'll definitely talk more about it with some of the folks here. For the moment, I think the use of merge gives this operation full atomicity. The only downside of the current approach is, as you say, the annoyance and (minuscule) cost of a __dbt_tmp table that hangs around for 12 hours.

Edit: @drewbanin and I had a chance to play around with scripting quickly. Very cool. It seems like this is a much more straightforward way to code up exactly what we want here. I'll give this a spin soon.

@jtcohen6
Copy link
Contributor Author

jtcohen6 commented Jan 5, 2020

I've rewritten this update to the BQ incremental materialization such that it now leverages the beta scripting feature.

This required some additional code in bigquery__create_table_as to enable the creation of "true" (scripting-only) temporary tables. In that process, I found that temp tables cannot include partition by clauses, else querying them returns an empty result set. [Edit 20 Jan: this was due to a bug that seems to have been resolved since.]

Extensions to incremental materialization

Now that this script-based framework has helped in making the code more straightforward, I think it will be very simple to enable the insert_overwrite incremental strategy by creating a common_get_insert_overwrite_merge_sql macro that executes the merge on false approach I sketched out above. I'll take a look at the Snowflake incremental materialization for the code that enables strategy-picking.

Extensions to scripting

Based on the work in #1967, I'd love to allow users to specify their is_incremental() filter as BQ scripting SET statement inside a sql_header block, instead of needing to call a statement that runs select max() from {{this}}.

In the simplest case, that looks like:

{%- if is_incremental() -%}
{%- call sql_header -%}
DECLARE
    max_from_this date;

SET
    (max_from_this) = (select as struct max(my_date) from {{this}});
{%- endcall -%}
{%- endif -%}

with base as (

    select * from source_table
    {%- if is_incremental() -%}
        where my_date >= max_from_this
    {%- endif -%}
)

The trickiness comes with integrating the script-based work I've done here. Since each BQ script can only have one DECLARE statement and it must come first—though the declared variables can be defined in multiple SET statements throughout—I plan to add a declare keyword to the config that takes an array of scripting variables to be defined first.

Edit: Caveat

I just reread the BQ docs and saw that they have beta support for integer-based partitioning. There are a couple of places in this new script-based approach that presume the partitioning column to be either date or date(timestamp). I think we will need to either:
(a) Update dbt's partition_by config to enable the user to supply a data type (date, timestamp, integer)
(b) Grab the data type of the partition_by column from information schema

@jtcohen6
Copy link
Contributor Author

jtcohen6 commented Jan 20, 2020

New partition_by spec

In order to support the smarter, cost-effective, scripting-based incremental materialization and new integer range partitioning (beta), I propose changing the BigQuery partition_by config argument to accept a dictionary.

This works for date columns:

{{config(
    materialized = 'incremental',
    partition_by = { 'field': 'my_date_column', 'data_type': 'date' }
}}

This will compile simply to partition by my_date_column at time of table creation.

As well as timestamp/datetime columns:

{{config(
    materialized = 'incremental',
    partition_by = { 'field': 'my_ts_column', 'data_type': 'timestamp' }
}}

This compiles to partition by date(my_ts_column) in create table DDL.

And finally, integer range partitioning:

{{config(
    materialized = 'incremental',
    partition_by = {
        'field': 'my_int_column',
        'data_type': 'int64',
        'range': { 'start': 0, 'end': 100, 'interval': 10 }
    }
}}

This compiles to partition by range_bucket(my_int_column, generate_array(0, 100, 10)). It matches the bq API spec in these docs.

Next steps

Deprecation/migration: I created an adapter method, parse_partition_by, that accepts whatever the user has supplied to config.partition_by and checks to ensure it's a dictionary (new spec), raises an error (if supplied string containing range_bucket), or raises a deprecation warning and attempts to parse into a dictionary (if date or timestamp). I drafted deprecation and error warnings and would appreciate guidance on wording. Both have unresolved # TODOs, since I'll need to write and link to updated docs.

Other consideration: I made some updates to the is_replaceable BigQuery adapter method, such that it now supports range_partitioning as well as time_partitioning.

There are some failing tests that I need to investigate.

[Updated 20 Jan]

@clausherther
Copy link
Contributor

Very excited about this! Do you know if this will allow us to run this incremental strategy on tables that have partition filter requirements?
We ran into an issue trying to do incremental runs on tables we set to require filters (from within dbt) and have so far had to resort to disabling filter requirements. Even when I manually wrote the merge statement, I couldn't figure where to tell it to filter the destination table partitions.

https://getdbt.slack.com/archives/C2JRRQDTL/p1579289882011400

@jtcohen6
Copy link
Contributor Author

@clausherther Yes! These merge statements will work for tables that have require_partition_filter = true.

Out of curiosity, how are you configuring that flag with dbt? I don't think it's explicitly supported as a config arg (yet). IMO it also breaks some surprising things, like schema tests, though rightly so to avoid scanning a lot of data.

@clausherther
Copy link
Contributor

@jtcohen6 we set that option via an alter table... model post_hook (similar to how we currently apply BQ table labels).
W/r/t to schema tests, we have so far tried overriding the built-in tests to allow for a filter argument. While that technically works, it's not a great option, since you're only testing the schema over the filtered partitions. So, something like a unique tests wouldn't really be testing the right rows.

@jtcohen6
Copy link
Contributor Author

@drewbanin @beckjake I'm ready to hand this over for expert appraisal.

  • I drafted a few compilation errors and deprecation warnings, which will need to link to new docs.
  • New adapter method parse_partition_by might benefit from a unit test. It's a lot of if/elif and a little bit of regex.
  • BigQuery now supports "real" temp tables, but only inside of scripts (= several SQL statements, separated by ;, the first of which is declare). Drew and I wondered if, in the future, we might rewrite all BQ temp table calls to occur inside of scripts, and do away with the 12-hour expiration + drop. For now, the way I'm differentiating "scripting" temp tables is an explicit hack.

@jtcohen6 jtcohen6 requested a review from beckjake January 22, 2020 14:22
Copy link
Contributor

@beckjake beckjake left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! I have a couple style comments

Can you add a test that the deprecated version works properly?

The 012_deprecation_tests folder has some tests, TestDeprecations should be a decent example: Set up a test that attempts to use the old behavior and runs with strict=True, which should fail. With strict=False, it should pass and generate a properly partitioned table.

plugins/bigquery/dbt/include/bigquery/macros/adapters.sql Outdated Show resolved Hide resolved
core/dbt/context/base.py Outdated Show resolved Hide resolved
Copy link
Contributor

@beckjake beckjake left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@drewbanin drewbanin changed the base branch from dev/0.15.1 to dev/barbara-gittings January 27, 2020 17:11
Copy link
Contributor

@drewbanin drewbanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jtcohen6 I just changed the base branch to dev/barbara-gittings (0.16.0) - looks like there are some merge conflicts to account for as a result.

I'll do a quick pass here too, but this looks really stellar!

@jtcohen6
Copy link
Contributor Author

The changes are good to merge into dev/barbara-gittings. There is an outstanding TODO, which is to write new docs and link to them in the compilation error + deprecation warning. I'll circle back on that next week.

@hui-zheng
Copy link

hui-zheng commented Jan 30, 2020

Hi @jtcohen6 and @drewbanin

That is exciting stuff. Thank you for sharing with me. I ran into some critical blockers in our dbt production that is related to this issue. We would like to patch the fix quickly.

Could I have some questions?

  1. Is this PR feature completed and passed on most tests? Could you comment on the readiness of this PR? Though it's still in the dev branch, I am willing to take some risk to patch my current dbt with this change to resolve some of our critical challenges.

  2. Do you have a suggestion on the right way to patch 0.15.1 with this PR fix? Shall I just take this branch feature/cost-effective-bq-incremental and merge it into my version of dbt 0.15.1? Shall I run some dbt tests after the merge to make sure nothing in dbt breaks?

  3. Could you clarify if this PR is using the BigQuery true temp table as @inve1 suggested? or is it using the 12-hour expiration + drop table? I think you are using the true temp table for this PR but you also mentioned re-write all temp tables so just to confirm.

  4. Does this PR implement first-class support for BQ insert_overwrite strategy? I assume that strategy will further reduce query cost and improve merge speed in those applicable scenarios.

  5. In some incremental scenarios, it's common for incremental models to overwrite the data of a given DateTime range, (e.g. the past 24 hours). the partition range to overwrite is known at the SQL compile time. Do we consider a nice way for dbt to support partitioned merge in such a case?

For dbt-labs/dbt-bigquery#5, As @inve1 mentioned,

I'm looking at creating incremental models that look at something like the last 3 days of data and just overwrite those partitions at an interval. In that case, I wouldn't need to figure out what the partition range is, it could be generated at compile time. I don't really have a suggestion here, but do you think this use case could be supported in a nice way after the change here? maybe allow passing dest_partition from the model?

Thank you

Copy link
Contributor

@drewbanin drewbanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking really good! I left some comments here that I'd love to catch up with you about IRL!

I also cooked up a docs link as a placeholder, so we can merge this thing as soon as it's ready :)

dbt inferred: {inferred_partition_by}

For more information, see:
[dbt docs link]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a placeholder guide here, to be completed before 0.16.0 is released. Can you add this link accordingly? https://docs.getdbt.com/docs/upgrading-to-0-16-0

{% endif %}

{# BigQuery only #}
{%- if partition_by -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love that we're encoding this BigQuery specific logic in the common merge implementation. Further, the pprint_partition_field macro used below is only defined in the BigQuery plugin. This probably won't be an issue, as the Snowflake code path won't be providing a partition_by arg here, but it does feel out of place to me.

I'd be comfortable just leaving the existing common_get_merge_sql as-is and instead baking this logic directly into the BigQuery implementation of get_merge_sql:

https://github.com/fishtown-analytics/dbt/blob/e080bfc79ab11a4d145de4c8da9d955c5e136d92/plugins/bigquery/dbt/include/bigquery/macros/materializations/merge.sql#L1-L3

Alternatively, we could make this a little bit more generic and change the partition_by arg for this macro to predicates (or similar). The BigQuery implementation of get_merge_sql could supply a list of predicates to this macro, but other implementations (like Snowflake) could provide an empty list.

Either way, let's move the partitioning-specific logic out of this macro.

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on
{% if conditions|length == 0 %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a slick way of doing this 👍

if 'range_bucket' in raw_partition_by.lower():
dbt.exceptions.CompilerException('''
BigQuery integer range partitioning (beta) is supported \
by the new `partition_by` config, which accepts a \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BigQuery integer range partitioning (beta) is only supported
with the dictionary format of the partition_by config.

More information: [Link from above]

^ I like using modifiers like only, as they help clarify the nature of these validation errors. These error messages should:

  • Explain what went wrong
  • Explain how to fix it

Also, this error message will invariably stick around for longer than we intend, so I try to avoid temporal words like "new", as things like that tend to cause chuckles a couple of years down the line :p

dictionary. See: [dbt docs link TK]
''') # TODO
else:
p = re.compile(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This regex is a little too complicated for my liking... can we try something a little simpler instead? How about we try something like:

partition_by = raw_partition_by.strip()
if partition_by.lower().startswith('date('):
  partition_by = re.match('date\((.*)\)', partition_by, re.IGNORECASE)
  data_type = 'date'
else:
  data_type = 'timestamp'

inferred_partition_by = ...

Let me know if I'm missing something important in the logic here!

{%- if partition_by_type in ('date','timestamp','datetime') -%}
partition by {{pprint_partition_field(partition_by_dict)}}
{%- elif partition_by_type in ('int64') -%}
{%- set pbr = partition_by_dict.range -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't mind a pbr

@@ -62,18 +77,25 @@
{%- set raw_kms_key_name = config.get('kms_key_name', none) -%}
{%- set raw_labels = config.get('labels', []) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{%- set partition_by_dict = adapter.parse_partition_by(raw_partition_by) -%}
{%- set is_scripting = (temporary == 'scripting') -%} {# "true" temp tables only possible when scripting #}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just remind me where we ended up on this? BigQuery temp tables can't be used outside of scripting?

@jtcohen6
Copy link
Contributor Author

Closing in favor of #2140

@elyobo
Copy link

elyobo commented Oct 29, 2022

Would there be any interest in something like this so that we can get some improvement for integer keys that otherwise exceed the BQ script memory limits (e.g. dbt-labs/dbt-adapters#605)? It's extending the BQ implementation of the insert_overwrite to support the behaviour (as I understand it) from up above, but as an optional change - the regular insert_overwrite behaviour is available too, as it's more efficient if it does work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants