Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dbt seed truncate tables #182

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## dbt-spark 0.20.1 (June 22, 2021)

### Features

### Fixes
- dbt seed command fixed with expected behavior from dbt global project to [truncate table](https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-ddl-truncate-table.html) in order remove all rows from the existing seed tables and replace values. As explained in [issue 112](https://github.com/fishtown-analytics/dbt-spark/issues/112), the current seed command in dbt-spark appends to existing seeded tables instead overwriting.

### Contributors
- [@mv1742](https://github.com/mv1742) ([#181](https://github.com/mv1742/)

## dbt-spark 0.20.0 (Release TBD)

### Fixes
Expand Down
6 changes: 4 additions & 2 deletions dbt/include/spark/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro spark__load_csv_rows(model, agate_table) %}
{% set batch_size = 1000 %}
{% set column_override = model['config'].get('column_types', {}) %}

{% set statements = [] %}

{% for chunk in agate_table.rows | batch(batch_size) %}
Expand Down Expand Up @@ -37,7 +37,9 @@

{% macro spark__reset_csv_table(model, full_refresh, old_relation, agate_table) %}
{% if old_relation %}
{{ adapter.drop_relation(old_relation) }}
{{ adapter.truncate_relation(old_relation) }}
{% set sql = "truncate table " ~ old_relation %}
{{ return(sql) }}
Comment on lines +40 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I see in the TRUNCATE docs that The table must not be a view or an external/temporary table. Given that #112 was initially prompted by the case of a seed being an external table, are we sure this approach will work?
  2. It looks like we're not appropriately handling the full-refresh case today. We want to drop the relation and truncate/remove the data, to enable a new column schema to take its place.
  3. If this ends up being the same code as in the default version, we could just delete spark__reset_csv_table entirely and fall back on default__reset_csv_table.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. hmm, i see. truncate works for me when the external tables already exists, even though docs say table must not be a view or external table. When the table does not exist in the unmanaged database, I'm getting " ('The SQL contains 0 parameter markers, but 2 parameters were supplied', 'HY000')", not sure why...
  2. full-refresh work around for external tables could be truncate > drop > create > trunctate?
  3. default__reset_csv_table won't work as drop_relation doesn't delete the underlying data for external tables

Is there a guide on how to setup tests for dbt-spark like there is for dbt https://github.com/dbt-labs/dbt/blob/HEAD/CONTRIBUTING.md#testing?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For more details this is the dbt.log file message when running dbt seed when the table does not exist in an unmanaged database (something to do with insert into default.gc_branch_correction_test values (cast(%s as bigint),cast(%s as bigint))):

2021-09-16 06:46:36.950408 (ThreadPoolExecutor-1_0): Opening a new connection, currently in state closed
2021-09-16 06:46:42.488937 (ThreadPoolExecutor-1_0): SQL status: OK in 5.54 seconds
2021-09-16 06:46:42.492460 (ThreadPoolExecutor-1_0): On list_None_default: ROLLBACK
2021-09-16 06:46:42.492723 (ThreadPoolExecutor-1_0): NotImplemented: rollback
2021-09-16 06:46:42.492850 (ThreadPoolExecutor-1_0): On list_None_default: Close
2021-09-16 06:46:42.495528 (MainThread): NotImplemented: add_begin_query
2021-09-16 06:46:42.495681 (MainThread): NotImplemented: commit
2021-09-16 06:46:42.496011 (MainThread): 02:46:42 | Concurrency: 1 threads (target='prod')
2021-09-16 06:46:42.496201 (MainThread): 02:46:42 | 
2021-09-16 06:46:42.498050 (Thread-1): Began running node seed.er_silver_gold.gc_branch_correction_test
2021-09-16 06:46:42.498375 (Thread-1): 02:46:42 | 1 of 1 START seed file default.gc_branch_correction_test............. [RUN]
2021-09-16 06:46:42.498717 (Thread-1): Acquiring new spark connection "seed.er_silver_gold.gc_branch_correction_test".
2021-09-16 06:46:42.630036 (Thread-1): finished collecting timing info
2021-09-16 06:46:42.659872 (Thread-1): Using spark connection "seed.er_silver_gold.gc_branch_correction_test".
2021-09-16 06:46:42.660005 (Thread-1): On seed.er_silver_gold.gc_branch_correction_test: /* {"app": "dbt", "dbt_version": "0.19.1", "profile_name": "dbt_databricks", "target_name": "prod", "node_id": "seed.er_silver_gold.gc_branch_correction_test"} */
drop table if exists default.gc_branch_correction_test
2021-09-16 06:46:42.660116 (Thread-1): Opening a new connection, currently in state closed
2021-09-16 06:46:45.129582 (Thread-1): SQL status: OK in 2.47 seconds
2021-09-16 06:46:45.143552 (Thread-1): 'soft_unicode' has been renamed to 'soft_str'. The old name will be removed in MarkupSafe 2.1.
2021-09-16 06:46:45.143772 (Thread-1): 'soft_unicode' has been renamed to 'soft_str'. The old name will be removed in MarkupSafe 2.1.
2021-09-16 06:46:45.162986 (Thread-1): NotImplemented: add_begin_query
2021-09-16 06:46:45.163100 (Thread-1): Using spark connection "seed.er_silver_gold.gc_branch_correction_test".
2021-09-16 06:46:45.163197 (Thread-1): On seed.er_silver_gold.gc_branch_correction_test: /* {"app": "dbt", "dbt_version": "0.19.1", "profile_name": "dbt_databricks", "target_name": "prod", "node_id": "seed.er_silver_gold.gc_branch_correction_test"} */

    create table default.gc_branch_correction_test (Program_ID bigint,Branch_ID_Corrected bigint)
    
    using delta
   
2021-09-16 06:46:52.098564 (Thread-1): SQL status: OK in 6.94 seconds
2021-09-16 06:46:52.104950 (Thread-1): Using spark connection "seed.er_silver_gold.gc_branch_correction_test".
2021-09-16 06:46:52.105085 (Thread-1): On seed.er_silver_gold.gc_branch_correction_test: /* {"app": "dbt", "dbt_version": "0.19.1", "profile_name": "dbt_databricks", "target_name": "prod", "node_id": "seed.er_silver_gold.gc_branch_correction_test"} */
truncate table default.gc_branch_correction_test
2021-09-16 06:46:53.726390 (Thread-1): SQL status: OK in 1.62 seconds
2021-09-16 06:46:53.743346 (Thread-1): Using spark connection "seed.er_silver_gold.gc_branch_correction_test".
2021-09-16 06:46:53.743491 (Thread-1): On seed.er_silver_gold.gc_branch_correction_test: 
            insert into default.gc_branch_correction_test values
            (cast(%s as bigint),cast(%s as bigint))
        ...
2021-09-16 06:46:53.744413 (Thread-1): Error while running:

            insert into default.gc_branch_correction_test values
            (cast(%s as bigint),cast(%s as bigint))
        
2021-09-16 06:46:53.744535 (Thread-1): ('The SQL contains 0 parameter markers, but 2 parameters were supplied', 'HY000')
2021-09-16 06:46:53.744696 (Thread-1): finished collecting timing info
2021-09-16 06:46:53.744847 (Thread-1): On seed.er_silver_gold.gc_branch_correction_test: ROLLBACK
2021-09-16 06:46:53.744951 (Thread-1): NotImplemented: rollback
2021-09-16 06:46:53.745045 (Thread-1): On seed.er_silver_gold.gc_branch_correction_test: Close
2021-09-16 06:46:53.745350 (Thread-1): Runtime Error in seed gc_branch_correction_test (data/default/gc_branch_correction_test.csv)
  ('The SQL contains 0 parameter markers, but 2 parameters were supplied', 'HY000')
Traceback (most recent call last):
  File "/Users/<userid>/dbt_spark-0.19.1-py3.7.egg/dbt/adapters/spark/connections.py", line 276, in exception_handler
    yield
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/adapters/sql/connections.py", line 80, in add_query
    cursor.execute(sql, bindings)
  File "/Users/<userid>/dbt_spark-0.19.1-py3.7.egg/dbt/adapters/spark/connections.py", line 261, in execute
    self._cursor.execute(sql, *bindings)
pyodbc.ProgrammingError: ('The SQL contains 0 parameter markers, but 2 parameters were supplied', 'HY000')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/task/base.py", line 344, in safe_run
    result = self.compile_and_execute(manifest, ctx)
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/task/base.py", line 287, in compile_and_execute
    result = self.run(ctx.node, manifest)
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/task/base.py", line 389, in run
    return self.execute(compiled_node, manifest)
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/task/run.py", line 248, in execute
    result = MacroGenerator(materialization_macro, context)()
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/clients/jinja.py", line 332, in __call__
    return self.call_macro(*args, **kwargs)
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/clients/jinja.py", line 259, in call_macro
    return macro(*args, **kwargs)
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/runtime.py", line 675, in __call__
    return self._invoke(arguments, autoescape)
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/runtime.py", line 679, in _invoke
    rv = self._func(*arguments)
  File "<template>", line 54, in macro
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/sandbox.py", line 462, in call
    return __context.call(__obj, *args, **kwargs)
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/runtime.py", line 290, in call
    return __obj(*args, **kwargs)
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/clients/jinja.py", line 332, in __call__
    return self.call_macro(*args, **kwargs)
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/clients/jinja.py", line 259, in call_macro
    return macro(*args, **kwargs)
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/runtime.py", line 675, in __call__
    return self._invoke(arguments, autoescape)
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/runtime.py", line 679, in _invoke
    rv = self._func(*arguments)
  File "<template>", line 21, in macro
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/sandbox.py", line 462, in call
    return __context.call(__obj, *args, **kwargs)
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/runtime.py", line 290, in call
    return __obj(*args, **kwargs)
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/clients/jinja.py", line 332, in __call__
    return self.call_macro(*args, **kwargs)
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/clients/jinja.py", line 259, in call_macro
    return macro(*args, **kwargs)
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/runtime.py", line 675, in __call__
    return self._invoke(arguments, autoescape)
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/runtime.py", line 679, in _invoke
    rv = self._func(*arguments)
  File "<template>", line 110, in macro
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/sandbox.py", line 462, in call
    return __context.call(__obj, *args, **kwargs)
  File "/Users/<userid>/Jinja2-2.11.2-py3.7.egg/jinja2/runtime.py", line 290, in call
    return __obj(*args, **kwargs)
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/adapters/sql/impl.py", line 64, in add_query
    abridge_sql_log)
  File "/Users/<userid>/dbt_core-0.19.1-py3.7.egg/dbt/adapters/sql/connections.py", line 87, in add_query
    return connection, cursor
  File "/Users/u1214742/.pyenv/versions/3.7.8/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/Users/<userid>/dbt_spark-0.19.1-py3.7.egg/dbt/adapters/spark/connections.py", line 289, in exception_handler
    raise dbt.exceptions.RuntimeException(str(exc))
dbt.exceptions.RuntimeException: Runtime Error in seed gc_branch_correction_test (data/default/gc_branch_correction_test.csv)
  ('The SQL contains 0 parameter markers, but 2 parameters were supplied', 'HY000')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delayed response from me!

  • Glad to hear this works for external tables as well in local testing
  • Agree, truncate + insert should be the standard approach, and then full refresh mode should just add in drop + create as well. And you're right, we'll need to keep this logic around in spark__reset_csv_table, rather than using the default.
  • Ugh, parameter markers are one of those things I just have a tremendously hard time debugging. Which connection method are you using? %s is the standard parameter marker, but ? is the one used by pyodbc, and we don't do a great job in our logs today of making clear which one dbt is actually using, since the switch for pyodbc happens at runtime

Copy link

@binhnefits binhnefits Feb 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this does not work in the case where a seed table already exists and we rename a column in the CSV file. The drop statement does not seem to drop the underlying schema as well.

{% endif %}
{% set sql = create_csv_table(model, agate_table) %}
{{ return(sql) }}
Expand Down