From 303681d80209ba90bf6330cb1a9ae95460eaa3c3 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Sun, 29 Nov 2020 23:15:42 -0500 Subject: [PATCH] db-based approach --- integration_tests/models/plugins/redshift.yml | 30 +++++++ .../redshift/helpers/add_partitions.sql | 4 +- macros/plugins/redshift/helpers/paths.sql | 5 ++ .../redshift/refresh_external_table.sql | 78 ++++++++++--------- 4 files changed, 79 insertions(+), 38 deletions(-) diff --git a/integration_tests/models/plugins/redshift.yml b/integration_tests/models/plugins/redshift.yml index 4e003beb..8c4d06c3 100644 --- a/integration_tests/models/plugins/redshift.yml +++ b/integration_tests/models/plugins/redshift.yml @@ -40,6 +40,36 @@ sources: path_macro: dbt_external_tables.key_value columns: *cols-of-the-people tests: *equal-to-the-people + + # ensure that all partitions are created + - name: people_csv_multipartitioned + external: + <<: *csv-people + location: "s3://dbt-external-tables-testing/" + partitions: + - name: file_format + data_type: varchar + vals: ['csv', 'json'] + path_macro: dbt_external_tables.value_only + - name: section + data_type: varchar + vals: ['a','b','c','d'] + path_macro: dbt_external_tables.key_value + - name: some_date + data_type: date + vals: + macro: dbt.dates_in_range + args: + start_date_str: '2020-01-01' + end_date_str: '2020-02-01' + in_fmt: "%Y-%m-%d" + out_fmt: "%Y-%m-%d" + path_macro: dbt_external_tables.year_month_day + - name: file_name + data_type: varchar + vals: ['people', 'not_people'] + path_macro: dbt_external_tables.value_only + columns: *cols-of-the-people - name: people_json_unpartitioned external: &json-people diff --git a/macros/plugins/redshift/helpers/add_partitions.sql b/macros/plugins/redshift/helpers/add_partitions.sql index c974f634..844c6605 100644 --- a/macros/plugins/redshift/helpers/add_partitions.sql +++ b/macros/plugins/redshift/helpers/add_partitions.sql @@ -43,8 +43,8 @@ {%- endif -%} - partition ({%- for part in partition.partition_by -%}{{ part.name }}='{{ part.value }}'{{',' if not loop.last}}{%- endfor -%}) - location '{{ source_node.external.location }}{{ partition.path }}/' + partition ({%- for part in partition.partition_by -%}{{ part.name }}='{{ part.value }}'{{', ' if not loop.last}}{%- endfor -%}) + location '{{ source_node.external.location }}/{{ partition.path }}/' {% endfor -%} diff --git a/macros/plugins/redshift/helpers/paths.sql b/macros/plugins/redshift/helpers/paths.sql index c3cc6d75..9643e034 100644 --- a/macros/plugins/redshift/helpers/paths.sql +++ b/macros/plugins/redshift/helpers/paths.sql @@ -7,3 +7,8 @@ {% set path = name ~ '=' ~ value %} {{return(path)}} {% endmacro %} + +{% macro value_only(name, value) %} + {% set path = value %} + {{return(path)}} +{% endmacro %} diff --git a/macros/plugins/redshift/refresh_external_table.sql b/macros/plugins/redshift/refresh_external_table.sql index ad2c87b2..b7bed1a1 100644 --- a/macros/plugins/redshift/refresh_external_table.sql +++ b/macros/plugins/redshift/refresh_external_table.sql @@ -1,25 +1,18 @@ {% macro redshift__refresh_external_table(source_node) %} - {%- set starting = [ - { - 'partition_by': [], - 'path': '' - } - ] -%} - - {%- set ending = [] -%} - {%- set finals = [] -%} - {%- set partitions = source_node.external.get('partitions',[]) -%} - {%- if partitions -%}{%- for partition in partitions -%} + {%- if partitions -%} - {%- if not loop.first -%} - {%- set starting = ending -%} - {%- set ending = [] -%} - {%- endif -%} + {%- set part_len = partitions|length -%} + + {%- set get_partitions_sql -%} + + select * from + + {%- for partition in partitions %} ( - {%- for preexisting in starting -%} + {%- set part_num = loop.index -%} {%- if partition.vals.macro -%} {%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%} @@ -29,35 +22,48 @@ {%- set vals = partition.vals -%} {%- endif -%} - {%- for val in vals -%} + {%- for val in vals %} - {# For each preexisting guy, add a new one #} - - {%- set next_partition_by = [] -%} - - {%- for prexist_part in preexisting.partition_by -%} - {%- do next_partition_by.append(prexist_part) -%} - {%- endfor -%} + select + '{{ partition.name }}' as name_{{ part_num }}, + '{{ val }}' as val_{{ part_num }}, + '{{ dbt_external_tables.render_from_context(partition.path_macro, partition.name, val) }}' as path_{{ part_num }} - {%- do next_partition_by.append({'name': partition.name, 'value': val}) -%} - - {# Concatenate path #} - - {%- set concat_path = preexisting.path ~ '/' ~ dbt_external_tables.render_from_context(partition.path_macro, partition.name, val) -%} - - {%- do ending.append({'partition_by': next_partition_by, 'path': concat_path}) -%} + {{ 'union all' if not loop.last else ') ' }} {%- endfor -%} + {{ 'cross join' if not loop.last }} + {%- endfor -%} - {%- if loop.last -%} - {%- for end in ending -%} - {%- do finals.append(end) -%} + {%- endset -%} + + {%- set finals = [] -%} + + {%- if execute -%} + {%- set results = run_query(get_partitions_sql) -%} + {%- for row in results -%} + + {%- set partition_parts = [] -%} + {%- set path_parts = [] -%} + + {%- for i in range(0, part_len) -%} + {%- do partition_parts.append({ + 'name': row[i * 3], + 'value': row[i * 3 + 1] + }) -%} + {%- do path_parts.append(row[i * 3 + 2]) -%} + {%- endfor -%} + + {%- set construct = { + 'partition_by': partition_parts, + 'path': path_parts | join('/') + } -%} + + {% do finals.append(construct) %} {%- endfor -%} {%- endif -%} - - {%- endfor -%} {%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals) -%} {{ return(ddl) }}