Skip to content

Commit

Permalink
Bug/postgres performance (#126)
Browse files Browse the repository at this point in the history
* patch/update-macro-readme

* bug/postgres-performance

* remove int model

* remove int model

* switch to jsonb

* add limit for test

* update changelog and regen docs

* Update README.md

fixed links

* Update CHANGELOG.md

* Update CHANGELOG.md

* Update CHANGELOG.md

* Update CHANGELOG.md

* Update CHANGELOG.md

---------

Co-authored-by: Alex Ilyichov <[email protected]>
  • Loading branch information
fivetran-catfritz and 5tran-alexil authored May 14, 2024
1 parent d355614 commit 8b325b8
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 48 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# dbt_fivetran_log v1.7.3
[PR #126](https://github.com/fivetran/dbt_fivetran_log/pull/126) includes the following updates:

## Performance Improvements
- Updated the sequence of JSON parsing for model `fivetran_platform__audit_table` to reduce runtime.

## Bug Fixes
- Updated model `fivetran_platform__audit_user_activity` to correct the JSON parsing used to determine column `email`. This fixes an issue introduced in v1.5.0 where `fivetran_platform__audit_user_activity` could potentially have 0 rows.

## Under the hood
- Updated logic for macro `fivetran_log_lookback` to align with logic used in similar macros in other packages.
- Updated logic for the postgres dispatch of macro `fivetran_log_json_parse` to utilize `jsonb` instead of `json` for performance.

# dbt_fivetran_log v1.7.2
[PR #123](https://github.com/fivetran/dbt_fivetran_log/pull/123) includes the following updates:

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# Fivetran Platform dbt Package ([Docs](https://fivetran.github.io/dbt_fivetran_log/))
# 📣 What does this dbt package do?
- Generates a comprehensive data dictionary of your Fivetran Platform connector (previously called Fivetran Log) data via the [dbt docs site](https://fivetran.github.io/dbt_fivetran_log/)
- Produces staging models in the format described by [this ERD](https://fivetran.com/docs/logs/fivetran-log#schemainformation) which clean, test, and prepare your Fivetran data from [Fivetran's free connector](https://fivetran.com/docs/applications/fivetran-log) and generates analysis ready end models.
- Produces staging models in the format described by [this ERD](https://fivetran.com/docs/logs/fivetran-platform#schemainformation) which clean, test, and prepare your Fivetran data from [Fivetran's free connector](https://fivetran.com/docs/logs/fivetran-platform)) and generates analysis ready end models.
- The above mentioned models enable you to better understand how you are spending money in Fivetran according to our [consumption-based pricing model](https://fivetran.com/docs/getting-started/consumption-based-pricing) as well as providing details about the performance and status of your Fivetran connectors. This is achieved by:
- Displaying consumption data at the table, connector, destination, and account levels
- Providing a history of measured free and paid monthly active rows (MAR), credit consumption, and the relationship between the two
Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
config-version: 2
name: 'fivetran_log'
version: '1.7.2'
version: '1.7.3'
require-dbt-version: [">=1.3.0", "<2.0.0"]

models:
Expand Down
2 changes: 1 addition & 1 deletion docs/catalog.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/manifest.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/run_results.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions integration_tests/ci/sample.profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ integration_tests:
host: "{{ env_var('CI_DATABRICKS_DBT_HOST') }}"
http_path: "{{ env_var('CI_DATABRICKS_DBT_HTTP_PATH') }}"
schema: fivetran_platform_integration_tests
threads: 2
threads: 8
token: "{{ env_var('CI_DATABRICKS_DBT_TOKEN') }}"
type: databricks
databricks-sql:
catalog: "{{ env_var('CI_DATABRICKS_DBT_CATALOG') }}"
host: "{{ env_var('CI_DATABRICKS_DBT_HOST') }}"
http_path: "{{ env_var('CI_DATABRICKS_SQL_DBT_HTTP_PATH') }}"
schema: sqlw_tests
threads: 2
threads: 8
token: "{{ env_var('CI_DATABRICKS_SQL_DBT_TOKEN') }}"
type: databricks
sqlserver:
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'fivetran_log_integration_tests'
version: '1.7.2'
version: '1.7.3'

config-version: 2
profile: 'integration_tests'
Expand Down
2 changes: 1 addition & 1 deletion macros/fivetran_log_json_parse.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
{% macro postgres__fivetran_log_json_parse(string, string_path) %}

case when {{ string }} ~ '^\s*[\{].*[\}]?\s*$' -- Postgres has no native json check, so this will check the string for indicators of a JSON object
then {{ string }}::json #>> '{ {%- for s in string_path -%}{{ s }}{%- if not loop.last -%},{%- endif -%}{%- endfor -%} }'
then {{ string }}::jsonb #>> '{ {%- for s in string_path -%}{{ s }}{%- if not loop.last -%},{%- endif -%}{%- endfor -%} }'
else null end

{% endmacro %}
Expand Down
35 changes: 9 additions & 26 deletions macros/fivetran_log_lookback.sql
Original file line number Diff line number Diff line change
@@ -1,35 +1,18 @@
{% macro fivetran_log_lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %}
{% macro fivetran_log_lookback(from_date, datepart='day', interval=7, safety_date='2010-01-01') %}

{{ adapter.dispatch('fivetran_log_lookback', 'fivetran_log') (from_date, datepart='day', interval=7, default_start_date='2010-01-01') }}
{{ adapter.dispatch('fivetran_log_lookback', 'fivetran_log') (from_date, datepart='day', interval=7, safety_date='2010-01-01') }}

{%- endmacro %}

{% macro default__fivetran_log_lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %}
{% macro default__fivetran_log_lookback(from_date, datepart='day', interval=7, safety_date='2010-01-01') %}

coalesce(
(select {{ dbt.dateadd(datepart=datepart, interval=-interval, from_date_or_timestamp=from_date) }}
from {{ this }}),
{{ "'" ~ default_start_date ~ "'" }}
)
{% set sql_statement %}
select coalesce({{ from_date }}, {{ "'" ~ safety_date ~ "'" }})
from {{ this }}
{%- endset -%}

{% endmacro %}
{%- set result = dbt_utils.get_single_value(sql_statement) %}

{% macro bigquery__fivetran_log_lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %}

-- Capture the latest timestamp in a call statement instead of a subquery for optimizing BQ costs on incremental runs
{%- call statement('date_agg', fetch_result=True) -%}
select {{ from_date }} from {{ this }}
{%- endcall -%}

-- load the result from the above query into a new variable
{%- set query_result = load_result('date_agg') -%}

-- the query_result is stored as a dataframe. Therefore, we want to now store it as a singular value.
{%- set date_agg = query_result['data'][0][0] %}

coalesce(
{{ dbt.dateadd(datepart='day', interval=-7, from_date_or_timestamp="'" ~ date_agg ~ "'") }},
{{ "'" ~ default_start_date ~ "'" }}
)
{{ dbt.dateadd(datepart=datepart, interval=-interval, from_date_or_timestamp="cast('" ~ result ~ "' as date)") }}

{% endmacro %}
39 changes: 28 additions & 11 deletions models/fivetran_platform__audit_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,39 @@
file_format='delta' if is_databricks_sql_warehouse(target) else 'parquet'
) }}

with sync_log as (
with base as (

select
*,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name
select *
from {{ ref('stg_fivetran_platform__log') }}
where event_subtype in ('sync_start', 'sync_end', 'write_to_table_start', 'write_to_table_end', 'records_modified')

{% if is_incremental() %}

and cast(created_at as date) > {{ fivetran_log.fivetran_log_lookback(from_date='max(sync_start_day)', interval=7) }}

{% endif %}
),

sync_log as (
select
*,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name,
cast(null as {{ dbt.type_string() }}) as schema_name,
cast(null as {{ dbt.type_string() }}) as operation_type,
cast(null as {{ dbt.type_bigint() }}) as row_count
from base
where event_subtype in ('sync_start', 'sync_end', 'write_to_table_start', 'write_to_table_end')

union all

select
*,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['schema']) }} as schema_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['operationType']) }} as operation_type,
cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_bigint() }}) as row_count
from base
where event_subtype = 'records_modified'
),

connector as (

select *
Expand Down Expand Up @@ -80,13 +98,12 @@ records_modified_log as (
select
connector_id,
created_at,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['schema']) }} as schema_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['operationType']) }} as operation_type,
cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_bigint() }}) as row_count
table_name,
schema_name,
operation_type,
row_count
from sync_log
where event_subtype = 'records_modified'

),

sum_records_modified as (
Expand Down
2 changes: 1 addition & 1 deletion models/fivetran_platform__audit_user_activity.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ with logs as (

select
*,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path='actor') }} as actor_email
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['actor']) }} as actor_email
from {{ ref('stg_fivetran_platform__log') }}
where lower(message_data) like '%actor%'
),
Expand Down
2 changes: 1 addition & 1 deletion models/staging/stg_fivetran_platform__log.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ final as (
)

select *
from final
from final

0 comments on commit 8b325b8

Please sign in to comment.