Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Athena support POC #1

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b18f50b
Athena complete_buckets_cte
artem-garmash Jun 2, 2023
112a31b
Disable information_schema usage for Athena for now
artem-garmash Jun 2, 2023
959d3f9
HACK: make timestamp insert work with Athena
artem-garmash Jun 2, 2023
995a80d
Use information_schema for Athena
artem-garmash Jun 2, 2023
882a404
Athena current_timestamp macros
artem-garmash Jun 2, 2023
0ba5813
Athena timeadd
artem-garmash Jun 2, 2023
2f77726
Athena hacky create_or_replace
artem-garmash Jun 2, 2023
fe7c02f
Athena replace_table_data
artem-garmash Jun 2, 2023
8ac690f
Athena escape_special_chars
artem-garmash Jun 2, 2023
9d71a72
Athena cli profile
artem-garmash Jun 2, 2023
b75c383
Fix timestamp type for athena iceberg
artem-garmash Jun 5, 2023
a0be46a
Athena target_database to fix log errors
artem-garmash Jun 5, 2023
c519d4f
EXPLICIT ICEBERG TYPE FOR NOW
artem-garmash Jun 5, 2023
8ca54bf
Fix Athena bucket cte
artem-garmash Jun 8, 2023
b3966c5
Athena current_timestamp_in_utc usable in plain tables
artem-garmash Jun 8, 2023
150abb1
Make descriptions work in Athena, cast all to strings
artem-garmash Jun 8, 2023
14a0bc1
Typo in Athena timeadd macro
artem-garmash Jun 8, 2023
bf4fc24
Athena dirty merge_sql
artem-garmash Jun 8, 2023
8851dd3
Athena delete and insert
artem-garmash Jun 8, 2023
4ba7f2d
Adapt anomaly queries for Athena iceberg, cast timestamps
artem-garmash Jun 8, 2023
1abc47b
Athena datediff
artem-garmash Jun 8, 2023
53ba6fe
Athena string type as varchar
artem-garmash Jun 8, 2023
37ea579
Athena iceberg specific timestamp cast
artem-garmash Jun 8, 2023
8cbdccf
Athen ts cast with ISO8601 format handling
artem-garmash Jun 8, 2023
f90633f
Update HACK for timestamp inserts
artem-garmash Jun 8, 2023
9987d93
Cast timestamps in alert tables for Athena
artem-garmash Jun 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions macros/commands/generate_elementary_cli_profile.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,26 @@ elementary:
threads: {{ target.threads }}
{% endmacro %}

{% macro athena__generate_elementary_cli_profile(method, elementary_database, elementary_schema) %}
elementary:
outputs:
default:
type: "{{ target.type }}"
s3_staging_dir: "{{ target.s3_staging_dir }}"
region_name: "{{ target.region_name }}"
database: "{{ target.database }}"
aws_profile_name: "{{ target.aws_profile_name }}"
work_group: "{{ target.work_group }}"
aws_access_key_id: "<AWS_ACCESS_KEY_ID>"
aws_secret_access_key: "<AWS_SECRET_ACCESS_KEY>"
Comment on lines +89 to +90
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if I run the CLI from a setup that require the usage of an AWS session token? e.g. I'm running the cli from a setup where I use AWS SSO - that might be revisited to support this cases.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? It's just a copy of dbt-athena config from https://github.com/dbt-athena/dbt-athena#configuring-your-profile and dbt is used by the cli to query elementary tables. And the CLI is used same way as dbt with dbt-athena. E.g. if I'm using AWS SSO, aws_profile_name is used and the login is outside of the scope of dbt/elementary.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when working with AWS SSO setups sometimes an extra variable need to be passed if we get the keys from something like aws-vault the key is aws_session_token, and I was wondering if we need to include that.
But you are right in most of the case if the user uses aws sso login, using the aws profile is enough.

{%- if elementary_database %}
catalog: "{{ elementary_database }}"
{%- endif %}
schema: "{{ elementary_schema }}"
threads: {{ target.threads }}
{% endmacro %}


{% macro default__generate_elementary_cli_profile(method, elementary_database, elementary_schema) %}
Adapter "{{ target.type }}" is not supported on Elementary.
{% endmacro %}
14 changes: 7 additions & 7 deletions macros/edr/alerts/anomaly_detection_description.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@
{% endmacro %}

{% macro freshness_description() %}
'Last update was at ' || anomalous_value || ', ' || abs(round({{ elementary.edr_cast_as_numeric('metric_value/3600') }}, 2)) || ' hours ago. Usually the table is updated within ' || abs(round({{ elementary.edr_cast_as_numeric('training_avg/3600') }}, 2)) || ' hours.'
'Last update was at ' || anomalous_value || ', ' || {{ elementary.edr_cast_as_string('abs(round(' ~ elementary.edr_cast_as_numeric('metric_value/3600') ~ ', 2))') }} || ' hours ago. Usually the table is updated within ' || {{ elementary.edr_cast_as_string('abs(round(' ~ elementary.edr_cast_as_numeric('training_avg/3600') ~ ', 2))') }} || ' hours.'
{% endmacro %}

{% macro table_metric_description() %}
'The last ' || metric_name || ' value is ' || round({{ elementary.edr_cast_as_numeric('metric_value') }}, 3) ||
'. The average for this metric is ' || round({{ elementary.edr_cast_as_numeric('training_avg') }}, 3) || '.'
'The last ' || metric_name || ' value is ' || {{ elementary.edr_cast_as_string('round(' ~ elementary.edr_cast_as_numeric('metric_value') ~ ', 3)') }} ||
'. The average for this metric is ' || {{ elementary.edr_cast_as_string('round(' ~ elementary.edr_cast_as_numeric('training_avg') ~ ', 3)') }} || '.'
{% endmacro %}

{% macro column_metric_description() %}
'In column ' || column_name || ', the last ' || metric_name || ' value is ' || round({{ elementary.edr_cast_as_numeric('metric_value') }}, 3) ||
'. The average for this metric is ' || round({{ elementary.edr_cast_as_numeric('training_avg') }}, 3) || '.'
'In column ' || column_name || ', the last ' || metric_name || ' value is ' || {{ elementary.edr_cast_as_string('round(' ~ elementary.edr_cast_as_numeric('metric_value') ~ ', 3)') }} ||
'. The average for this metric is ' || {{ elementary.edr_cast_as_string('round(' ~ elementary.edr_cast_as_numeric('training_avg') ~ ', 3)') }} || '.'
{% endmacro %}

{% macro dimension_metric_description() %}
'The last ' || metric_name || ' value for dimension ' || dimension || ' - ' ||
case when dimension_value is null then 'NULL' else dimension_value end || ' is ' || round({{ elementary.edr_cast_as_numeric('metric_value') }}, 3) ||
'. The average for this metric is ' || round({{ elementary.edr_cast_as_numeric('training_avg') }}, 3) || '.'
case when dimension_value is null then 'NULL' else dimension_value end || ' is ' || {{ elementary.edr_cast_as_string('round(' ~ elementary.edr_cast_as_numeric('metric_value') ~ ', 3)') }} ||
'. The average for this metric is ' || {{ elementary.edr_cast_as_string('round(' ~ elementary.edr_cast_as_numeric('training_avg') ~ ', 3)') }} || '.'
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,23 @@
end as anomaly_score,
{{ test_configuration.anomaly_sensitivity }} as anomaly_score_threshold,
source_value as anomalous_value,
bucket_start,
bucket_end,
{{ elementary.edr_cast_as_timestamp('bucket_start') }} as bucket_start,
{{ elementary.edr_cast_as_timestamp('bucket_end') }} as bucket_end,
bucket_seasonality,
metric_value,
case
when training_stddev is null then null
else (-1) * {{ test_configuration.anomaly_sensitivity }} * training_stddev + training_avg
end as min_metric_value,
case
case
when training_stddev is null then null
else {{ test_configuration.anomaly_sensitivity }} * training_stddev + training_avg
end as max_metric_value,
training_avg,
training_stddev,
training_set_size,
training_start,
training_end,
{{ elementary.edr_cast_as_timestamp('training_start') }} as training_start,
{{ elementary.edr_cast_as_timestamp('training_end') }} as training_end,
dimension,
dimension_value
from time_window_aggregation
Expand All @@ -172,4 +172,4 @@

{% endset %}
{{ return(anomaly_scores_query) }}
{% endmacro %}
{% endmacro %}
2 changes: 1 addition & 1 deletion macros/edr/materializations/tests/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@
'test_execution_id': test_execution_id,
'test_unique_id': elementary.insensitive_get_dict_value(flattened_test, 'unique_id'),
'model_unique_id': parent_model_unique_id,
'detected_at': elementary.insensitive_get_dict_value(flattened_test, 'generated_at'),
'detected_at': elementary.edr_cast_as_timestamp("'" ~ elementary.insensitive_get_dict_value(flattened_test, 'generated_at') ~ "'"),
'database_name': elementary.insensitive_get_dict_value(flattened_test, 'database_name'),
'schema_name': elementary.insensitive_get_dict_value(flattened_test, 'schema_name'),
'table_name': parent_model_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,17 @@
from information_schema.columns
where upper(table_schema) = upper('{{ schema_name }}')

{% endmacro %}
{% endmacro %}

{% macro athena__get_columns_from_information_schema(database_name, schema_name) %}
select
upper(table_catalog || '.' || table_schema || '.' || table_name) as full_table_name,
upper(table_catalog) as database_name,
upper(table_schema) as schema_name,
upper(table_name) as table_name,
upper(column_name) as column_name,
data_type
from information_schema.columns
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: this query is going to be super slow - did you think about using a dbt-athena method to make that working?
e.g. we can add another method in the adapter to leverage glue apis - those are going to be few order of magnitude faster

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I noticed dbt-athena switched to glue apis some time ago. Definitely it should be checked, once the port is working properly overall.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the issue is that in order to use glue api you have to do an adapter.dispatch call from your macro, as it's not possible to make python invocations that are outside the scope of the adapter.
As said, we can expose all what we need in the adatper if necessary.

where upper(table_schema) = upper('{{ schema_name }}')

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,22 @@
table_name
from information_schema_tables
{% endmacro %}

{% macro athena__get_tables_from_information_schema(schema_tuple) %}
{%- set database_name, schema_name = schema_tuple %}

select
{{ elementary.full_table_name() }} as full_table_name,
upper(database_name || '.' || schema_name) as full_schema_name,
database_name,
schema_name,
table_name
from (
select
upper(table_catalog) as database_name,
upper(table_schema) as schema_name,
upper(table_name) as table_name
from information_schema.tables
where upper(table_schema) = upper('{{ schema_name }}') and upper(table_catalog) = upper('{{ database_name }}')
)
{% endmacro %}
11 changes: 11 additions & 0 deletions macros/edr/system/system_utils/buckets_cte.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,14 @@
{%- endset %}
{{ return(complete_buckets_cte) }}
{% endmacro %}

{% macro athena__complete_buckets_cte(time_bucket, bucket_end_expr, min_bucket_start_expr, max_bucket_end_expr) %}
{%- set complete_buckets_cte %}
select
edr_bucket_start,
{{ bucket_end_expr }} as edr_bucket_end
from unnest(sequence({{ min_bucket_start_expr }}, {{ max_bucket_end_expr }}, interval '{{ time_bucket.count }}' {{ time_bucket.period }})) as t(edr_bucket_start)
where {{ bucket_end_expr }} <= {{ max_bucket_end_expr }}
{%- endset %}
{{ return(complete_buckets_cte) }}
{% endmacro %}
10 changes: 9 additions & 1 deletion macros/utils/cross_db_utils/current_timestamp.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,12 @@

{% macro spark__edr_current_timestamp_in_utc() %}
unix_timestamp()
{% endmacro %}
{% endmacro %}

{% macro athena__edr_current_timestamp() -%}
CURRENT_TIMESTAMP
{%- endmacro -%}

{% macro athena__edr_current_timestamp_in_utc() -%}
cast(CURRENT_TIMESTAMP AT TIME ZONE 'utc' AS TIMESTAMP)
{%- endmacro -%}
10 changes: 9 additions & 1 deletion macros/utils/cross_db_utils/datediff.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,12 @@
{%- else %}
{{ exceptions.raise_compiler_error("Unsupported date_part in edr_datediff: ".format(date_part)) }}
{%- endif %}
{% endmacro %}
{% endmacro %}

{% macro athena__edr_datediff(first_date, second_date, date_part) %}
{% set macro = dbt.datediff or dbt_utils.datediff %}
{% if not macro %}
{{ exceptions.raise_compiler_error("Did not find a `datediff` macro.") }}
{% endif %}
{{ return(macro(elementary.edr_cast_as_date(first_date), elementary.edr_cast_as_date(second_date), date_part)) }}
{% endmacro %}
4 changes: 4 additions & 0 deletions macros/utils/cross_db_utils/target_database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@
{% macro bigquery__target_database() %}
{% do return(target.project) %}
{% endmacro %}

{% macro athena__target_database() %}
{% do return(target.database) %}
{% endmacro %}
4 changes: 4 additions & 0 deletions macros/utils/cross_db_utils/timeadd.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@
{% macro redshift__edr_timeadd(date_part, number, timestamp_expression) %}
dateadd({{ date_part }}, {{ elementary.edr_cast_as_int(number) }}, {{ elementary.edr_cast_as_timestamp(timestamp_expression) }})
{% endmacro %}

{% macro athena__edr_timeadd(date_part, number, timestamp_expression) %}
date_add('{{ date_part }}', {{ elementary.edr_cast_as_int(number) }}, {{ elementary.edr_cast_as_timestamp(timestamp_expression) }})
{% endmacro %}
21 changes: 21 additions & 0 deletions macros/utils/data_types/cast_column.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
{%- macro edr_cast_as_timestamp(timestamp_field) -%}
{{ return(adapter.dispatch('edr_cast_as_timestamp', 'elementary')(timestamp_field)) }}
{%- endmacro -%}

{%- macro default__edr_cast_as_timestamp(timestamp_field) -%}
cast({{ timestamp_field }} as {{ elementary.edr_type_timestamp() }})
{%- endmacro -%}

{# Athena needs explicit conversion for ISO8601 timestamps used in buckets_cte #}
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agate_to_dicts converts timestamps to ISO8601 format and Athena needs explicit handling

{%- macro athena__edr_cast_as_timestamp(timestamp_field) -%}
coalesce(
try_cast({{ timestamp_field }} as {{ elementary.edr_type_timestamp() }}),
cast(from_iso8601_timestamp(cast({{ timestamp_field }} AS {{ elementary.edr_type_string() }})) AS {{ elementary.edr_type_timestamp() }})
)
{%- endmacro -%}

{%- macro edr_cast_as_float(column) -%}
cast({{ column }} as {{ elementary.edr_type_float() }})
{%- endmacro -%}
Expand Down Expand Up @@ -43,6 +55,15 @@
cast({{ elementary.edr_cast_as_timestamp(timestamp_field) }} as {{ elementary.edr_type_date() }})
{%- endmacro -%}

{# Athena needs explicit conversion for ISO8601 timestamps used in buckets_cte #}
{%- macro athena__edr_cast_as_date(timestamp_field) -%}
coalesce(
try_cast({{ timestamp_field }} as {{ elementary.edr_type_date() }}),
cast(from_iso8601_timestamp(cast({{ timestamp_field }} AS {{ elementary.edr_type_string() }})) AS {{ elementary.edr_type_date() }})
)
{%- endmacro -%}


{%- macro const_as_text(string) -%}
{{ return(adapter.dispatch('const_as_text', 'elementary')(string)) }}
{%- endmacro -%}
Expand Down
19 changes: 19 additions & 0 deletions macros/utils/data_types/data_type.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
{% do return("string") %}
{% endmacro %}

{% macro athena__edr_type_string() %}
{% do return("varchar") %}
{% endmacro %}




{%- macro edr_type_long_string() -%}
Expand Down Expand Up @@ -93,6 +98,10 @@


{% macro edr_type_timestamp() %}
{{ return(adapter.dispatch('edr_type_timestamp', 'elementary')()) }}
{% endmacro %}

{% macro default__edr_type_timestamp() %}
{% set macro = dbt.type_timestamp or dbt_utils.type_timestamp %}
{% if not macro %}
{{ exceptions.raise_compiler_error("Did not find a `type_timestamp` macro.") }}
Expand Down Expand Up @@ -121,3 +130,13 @@
{% macro bigquery__edr_type_date() %}
date
{% endmacro %}

{% macro athena__edr_type_timestamp() %}
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on cast_timestamp from dbt-athena

{%- set config = model.get('config', {}) -%}
{%- set table_type = config.get('table_type', 'glue') -%}
{%- if table_type == 'iceberg' -%}
timestamp(6)
{%- else -%}
timestamp
{%- endif -%}
{% endmacro %}
9 changes: 9 additions & 0 deletions macros/utils/table_operations/create_or_replace.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,12 @@
{% do run_query(dbt.create_table_as(temporary, relation, sql_query)) %}
{% do adapter.commit() %}
{% endmacro %}

{% macro athena__create_or_replace(temporary, relation, sql_query) %}
{% set drop_query %}
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: relation doesn't have type for some reason when this is called during the initial run to build elementary tables. drop_relation fails as it checks for type.

drop table if exists {{ relation.schema }}.{{ relation.identifier }}
{% endset %}
{% do elementary.run_query(drop_query) %}
{% do run_query(dbt.create_table_as(temporary, relation, sql_query)) %}
{% do adapter.commit() %}
{% endmacro %}
23 changes: 23 additions & 0 deletions macros/utils/table_operations/delete_and_insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,26 @@

{% do return(queries) %}
{% endmacro %}

{% macro athena__get_delete_and_insert_queries(relation, insert_relation, delete_relation, delete_column_key) %}
{% set queries = [] %}

{% if delete_relation %}
{% set delete_query %}
delete from {{ relation }}
where
{{ delete_column_key }} is null
or {{ delete_column_key }} in (select {{ delete_column_key }} from {{ delete_relation }});
{% endset %}
{% do queries.append(delete_query) %}
{% endif %}

{% if insert_relation %}
{% set insert_query %}
insert into {{ relation }} select * from {{ insert_relation }};
{% endset %}
{% do queries.append(insert_query) %}
{% endif %}

{% do return(queries) %}
{% endmacro %}
12 changes: 11 additions & 1 deletion macros/utils/table_operations/insert_rows.sql
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,22 @@
{{- return(string_value | replace("'", "''")) -}}
{%- endmacro -%}

{%- macro athena__escape_special_chars(string_value) -%}
{{- return(string_value | replace("'", "''")) -}}
{%- endmacro -%}

{%- macro render_value(value) -%}
{%- if value is defined and value is not none -%}
{%- if value is number -%}
{{- value -}}
{%- elif value is string -%}
'{{- elementary.escape_special_chars(value) -}}'
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inserts with timestamp strings should be casted explicitly in Athena. How to handle it properly?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the clean thing would be to pass to this function the actual type of the column we are rendering to from insert_rows:

  1. insert_rows already calls adapter.get_columns_in_relation so we can pass column.type as another parameter to render_value
  2. We can normalize the DB data type with the elementary.normalize_data_type and then treat the value differently if it's a timestamp.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @haritamar , looks really straightforward. Do you see any problem with using edr_cast_as_timestamp for timestamp literals? It's needed for Athena to add timestamp type but will also change timestamp literals for other connector. Or should I add a dedicated macro to handle timestamp literals which would be no-op for all but Athena?

{%- if value.startswith('cast(') -%}
{{- value -}}
{%- elif value.startswith('coalesce(') -%}
{{- value -}}
{%- else -%}
'{{- elementary.escape_special_chars(value) -}}'
{%- endif -%}
{%- elif value is mapping or value is sequence -%}
'{{- elementary.escape_special_chars(tojson(value)) -}}'
{%- else -%}
Expand Down
26 changes: 26 additions & 0 deletions macros/utils/table_operations/merge_sql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,29 @@
{% do return(macro(target_relation, tmp_relation, unique_key, dest_columns)) %}
{{ return(merge_sql) }}
{% endmacro %}

{% macro athena__merge_sql(target_relation, tmp_relation, unique_key, dest_columns, incremental_predicates) %}
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, this should be done in dbt-athena

Copy link

@nicor88 nicor88 Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I was using it for reference. But that iceberg_merge is running merge query and here we need sql only. https://github.com/artem-garmash/dbt-data-reliability/blob/master/macros/utils/table_operations/merge_sql.sql#L7 refers to dbt.merge_sql and part of dbt (https://github.com/dbt-labs/dbt-core/blob/7eedfcd2742fcf789200a4b829c8c2eb98369089/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql#L4). Is it somehow available from dbt-athena?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seems that we overwrite get_merge_sql, maybe in this case is fine to leave then athena__merge_sqlhere


{% set query %}
merge into {{ target_relation }} as target using {{ tmp_relation }} as src
ON (target.{{unique_key}} = src.{{ unique_key}})
when matched
then update set
{%- for col in dest_columns %}
{{ col.column }} = src.{{ col.column }} {{ ", " if not loop.last }}
{%- endfor %}
when not matched
then insert (
{%- for col in dest_columns %}
{{ col.column }} {{ ", " if not loop.last }}
{%- endfor %}

)
values (
{%- for col in dest_columns %}
src.{{ col.column }} {{ ", " if not loop.last }}
{%- endfor %}
)
{% endset %}
{% do return(query) %}
{% endmacro %}
6 changes: 6 additions & 0 deletions macros/utils/table_operations/replace_table_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@

{% do adapter.drop_relation(intermediate_relation) %}
{% endmacro %}

{% macro athena__replace_table_data(relation, rows) %}
{% set intermediate_relation = elementary.create_intermediate_relation(relation, rows, temporary=True) %}
{% do elementary.create_or_replace(False, relation, 'select * from {}'.format(intermediate_relation)) %}
{% do adapter.drop_relation(intermediate_relation) %}
{% endmacro %}
2 changes: 1 addition & 1 deletion models/edr/alerts/alerts_anomaly_detection.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ alerts_anomaly_detection as (
test_execution_id,
test_unique_id,
model_unique_id,
detected_at,
{{ elementary.edr_cast_as_timestamp("detected_at") }} as detected_at,
database_name,
schema_name,
table_name,
Expand Down
2 changes: 1 addition & 1 deletion models/edr/alerts/alerts_dbt_models.sql
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ with error_models as (

select model_execution_id as alert_id,
unique_id,
generated_at as detected_at,
{{ elementary.edr_cast_as_timestamp("generated_at") }} as detected_at,
database_name,
materialization,
path,
Expand Down
Loading