Skip to content

Commit

Permalink
db-based approach
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Nov 30, 2020
1 parent 100c32f commit 303681d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 38 deletions.
30 changes: 30 additions & 0 deletions integration_tests/models/plugins/redshift.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,36 @@ sources:
path_macro: dbt_external_tables.key_value
columns: *cols-of-the-people
tests: *equal-to-the-people

# ensure that all partitions are created
- name: people_csv_multipartitioned
external:
<<: *csv-people
location: "s3://dbt-external-tables-testing/"
partitions:
- name: file_format
data_type: varchar
vals: ['csv', 'json']
path_macro: dbt_external_tables.value_only
- name: section
data_type: varchar
vals: ['a','b','c','d']
path_macro: dbt_external_tables.key_value
- name: some_date
data_type: date
vals:
macro: dbt.dates_in_range
args:
start_date_str: '2020-01-01'
end_date_str: '2020-02-01'
in_fmt: "%Y-%m-%d"
out_fmt: "%Y-%m-%d"
path_macro: dbt_external_tables.year_month_day
- name: file_name
data_type: varchar
vals: ['people', 'not_people']
path_macro: dbt_external_tables.value_only
columns: *cols-of-the-people

- name: people_json_unpartitioned
external: &json-people
Expand Down
4 changes: 2 additions & 2 deletions macros/plugins/redshift/helpers/add_partitions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@

{%- endif -%}

partition ({%- for part in partition.partition_by -%}{{ part.name }}='{{ part.value }}'{{',' if not loop.last}}{%- endfor -%})
location '{{ source_node.external.location }}{{ partition.path }}/'
partition ({%- for part in partition.partition_by -%}{{ part.name }}='{{ part.value }}'{{', ' if not loop.last}}{%- endfor -%})
location '{{ source_node.external.location }}/{{ partition.path }}/'

{% endfor -%}

Expand Down
5 changes: 5 additions & 0 deletions macros/plugins/redshift/helpers/paths.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@
{% set path = name ~ '=' ~ value %}
{{return(path)}}
{% endmacro %}

{% macro value_only(name, value) %}
{% set path = value %}
{{return(path)}}
{% endmacro %}
78 changes: 42 additions & 36 deletions macros/plugins/redshift/refresh_external_table.sql
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
{% macro redshift__refresh_external_table(source_node) %}

{%- set starting = [
{
'partition_by': [],
'path': ''
}
] -%}

{%- set ending = [] -%}
{%- set finals = [] -%}

{%- set partitions = source_node.external.get('partitions',[]) -%}

{%- if partitions -%}{%- for partition in partitions -%}
{%- if partitions -%}

{%- if not loop.first -%}
{%- set starting = ending -%}
{%- set ending = [] -%}
{%- endif -%}
{%- set part_len = partitions|length -%}

{%- set get_partitions_sql -%}

select * from

{%- for partition in partitions %} (

{%- for preexisting in starting -%}
{%- set part_num = loop.index -%}

{%- if partition.vals.macro -%}
{%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%}
Expand All @@ -29,35 +22,48 @@
{%- set vals = partition.vals -%}
{%- endif -%}

{%- for val in vals -%}
{%- for val in vals %}

{# For each preexisting guy, add a new one #}

{%- set next_partition_by = [] -%}

{%- for prexist_part in preexisting.partition_by -%}
{%- do next_partition_by.append(prexist_part) -%}
{%- endfor -%}
select
'{{ partition.name }}' as name_{{ part_num }},
'{{ val }}' as val_{{ part_num }},
'{{ dbt_external_tables.render_from_context(partition.path_macro, partition.name, val) }}' as path_{{ part_num }}

{%- do next_partition_by.append({'name': partition.name, 'value': val}) -%}

{# Concatenate path #}

{%- set concat_path = preexisting.path ~ '/' ~ dbt_external_tables.render_from_context(partition.path_macro, partition.name, val) -%}

{%- do ending.append({'partition_by': next_partition_by, 'path': concat_path}) -%}
{{ 'union all' if not loop.last else ') ' }}

{%- endfor -%}

{{ 'cross join' if not loop.last }}

{%- endfor -%}

{%- if loop.last -%}
{%- for end in ending -%}
{%- do finals.append(end) -%}
{%- endset -%}

{%- set finals = [] -%}

{%- if execute -%}
{%- set results = run_query(get_partitions_sql) -%}
{%- for row in results -%}

{%- set partition_parts = [] -%}
{%- set path_parts = [] -%}

{%- for i in range(0, part_len) -%}
{%- do partition_parts.append({
'name': row[i * 3],
'value': row[i * 3 + 1]
}) -%}
{%- do path_parts.append(row[i * 3 + 2]) -%}
{%- endfor -%}

{%- set construct = {
'partition_by': partition_parts,
'path': path_parts | join('/')
} -%}

{% do finals.append(construct) %}
{%- endfor -%}
{%- endif -%}

{%- endfor -%}

{%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals) -%}
{{ return(ddl) }}
Expand Down

0 comments on commit 303681d

Please sign in to comment.