Skip to content

Commit

Permalink
Merge pull request #57 from fishtown-analytics/reorganize-pkg
Browse files Browse the repository at this point in the history
reorganize package
  • Loading branch information
jtcohen6 authored Nov 30, 2020
2 parents 0a29a30 + 100c32f commit b7e8c28
Show file tree
Hide file tree
Showing 55 changed files with 719 additions and 1,425 deletions.
162 changes: 71 additions & 91 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,45 +1,55 @@
# External sources in dbt

* Source config extension for metadata about external file structure
* Adapter macros to create external tables and refresh external table partitions
* Snowflake-specific macros to create, backfill, and refresh snowpipes
dbt v0.15.0 added support for an `external` property within `sources` that can include information about `location`, `partitions`, and other database-specific properties.

This package provides:
* Macros to create/replace external tables and refresh their partitions, using the metadata provided in your `.yml` file source definitions
* Snowflake-specific macros to create, backfill, and refresh snowpipes, using the same metadata

## Supported databases

* Redshift (Spectrum)
* Snowflake
* BigQuery
* Spark
* Synapse

![sample docs](etc/sample_docs.png)

## Syntax

The `stage_external_sources` macro is the primary point of entry when using this package. It has two operational modes: standard and "full refresh."

```bash
# iterate through all source nodes, create if missing + refresh if appropriate
# iterate through all source nodes, create if missing, refresh metadata
$ dbt run-operation stage_external_sources

# iterate through all source nodes, create or replace + refresh if appropriate
# iterate through all source nodes, create or replace (+ refresh if necessary)
$ dbt run-operation stage_external_sources --vars 'ext_full_refresh: true'
```

![sample docs](etc/sample_docs.png)

The macros assume that you have already:
- created either:
- an external stage (Snowflake),
- external schema (Redshift/Spectrum), or
- and external data source and file format (Synapse); and that you
- have permissions to select from it and create tables in it.

The `stage_external_sources` macro accepts a similar node selection syntax to
[snapshotting source freshness](https://docs.getdbt.com/docs/running-a-dbt-project/command-line-interface/source/#specifying-sources-to-snapshot).
The `stage_external_sources` macro accepts a limited node selection syntax similar to
[snapshotting source freshness](https://docs.getdbt.com/docs/running-a-dbt-project/command-line-interface/source/#specifying-sources-to-snapshot):

```bash
# Stage all Snowplow and Logs external sources:
# stage all Snowplow and Logs external sources:
$ dbt run-operation stage_external_sources --args 'select: snowplow logs'

# Stage a particular external source table:
# stage a particular external source table:
$ dbt run-operation stage_external_sources --args 'select: snowplow.event'
```

Maybe someday:
```bash
$ dbt source stage-external
$ dbt source stage-external --full-refresh
$ dbt source stage-external --select snowplow.event logs
```
## Setup

The macros assume that you:
1. Have already created your database's required scaffolding for external resources:
- an external stage (Snowflake)
- an external schema + S3 bucket (Redshift Spectrum)
- an external data source and file format (Synapse)
- a Google Cloud Storage bucket (BigQuery)
- an accessible set of files (Spark)
2. Have the appropriate permissions on to create tables using that scaffolding
3. Have already created the database/project and/or schema/dataset in which dbt will create external tables (or snowpiped tables)

## Spec

Expand All @@ -50,65 +60,24 @@ sources:
- name: snowplow
tables:
- name: event

# NEW: "external" property of source node
description: >
This source table is actually a set of files in external storage.
The dbt-external-tables package provides handy macros for getting
those files queryable, just in time for modeling.
external:
location: # S3 file path or Snowflake stage
file_format: # Hive specification or Snowflake named format / specification
using: # Hive specification
row_format: # Hive specification
table_properties: # Hive specification
options: # Hive specification
header: 'TRUE'
# ------ SYNAPSE ------
data_source: # External Data Source Name
reject_type:
reject_value:
ansi_nulls:
quoted_identifier:
# Snowflake: create an empty table + pipe instead of an external table
snowpipe:
auto_ingest: true
aws_sns_topic: # AWS
integration: # Azure
copy_options: "on_error = continue, enforce_length = false" # e.g.

# Specify a list of file-path partitions.

# ------ SNOWFLAKE ------
partitions:
- name: collector_date
data_type: date
expression: to_date(substr(metadata$filename, 8, 10), 'YYYY/MM/DD')

# ------ REDSHIFT -------
partitions:
- name: appId
data_type: varchar(255)
vals: # list of values
- dev
- prod
path_macro: dbt_external_tables.key_value
# Macro to convert partition value to file path specification.
# This "helper" macro is defined in the package, but you can use
# any custom macro that takes keyword arguments 'name' + 'value'
# and returns the path as a string

# If multiple partitions, order matters for compiling S3 path
location: # required: S3 file path, GCS file path, Snowflake stage, Synapse data source

... # database-specific properties of external table

partitions: # optional
- name: collector_date
data_type: date
vals: # macro w/ keyword args to generate list of values
macro: dbt.dates_in_range
args:
start_date_str: '2019-08-01'
end_date_str: '{{modules.datetime.date.today().strftime("%Y-%m-%d")}}'
in_fmt: "%Y-%m-%d"
out_fmt: "%Y-%m-%d"
path_macro: dbt_external_tables.year_month_day

... # database-specific properties

# Specify ALL column names + datatypes. Column order matters for CSVs.
# Other file formats require column names to exactly match.
# Specify ALL column names + datatypes.
# Column order must match for CSVs, column names must match for other formats.
# Some databases support schema inference.

columns:
- name: app_id
Expand All @@ -117,21 +86,32 @@ sources:
- name: platform
data_type: varchar(255)
description: "Platform"
...
...
```

The `stage_external_sources` macro will use this YAML config to compile and
execute the appropriate `create`, `refresh`, and/or `drop` commands:

```
19:01:48 + 1 of 1 START external source spectrum.my_partitioned_tbl
19:01:48 + 1 of 1 (1) drop table if exists "db"."spectrum"."my_partitioned_tbl"
19:01:48 + 1 of 1 (1) DROP TABLE
19:01:48 + 1 of 1 (2) create external table "db"."spectrum"."my_partitioned_tbl"...
19:01:48 + 1 of 1 (2) CREATE EXTERNAL TABLE
19:01:48 + 1 of 1 (3) alter table "db"."spectrum"."my_partitioned_tbl"...
19:01:49 + 1 of 1 (3) ALTER EXTERNAL TABLE
```

## Resources

* [`sample_sources`](sample_sources) for full valid YML config that establishes Snowplow events
as a dbt source and stage-ready external table in Snowflake and Spectrum, and other sample
data sources for BigQuery and Spark.
* [`sample_analysis`](sample_analysis) for a "dry run" version of the DDL/DML that
`stage_external_sources` will run as an operation
* [`sample_sources`](sample_sources): detailed example source specs, with annotations, for each database's implementation
* [`sample_analysis`](sample_analysis): a "dry run" version of the compiled DDL/DML that
`stage_external_sources` runs as an operation
* [`tested specs`](integration_tests/models/plugins): source spec variations that are confirmed to work on each database, via integration tests

## Supported databases
If you encounter issues using this package or have questions, please check the [open issues](https://github.com/fishtown-analytics/dbt-external-tables/issues), as there's a chance it's a known limitation or work in progress. If not, you can:
- open a new issue to report a bug or suggest an enhancement
- post a technical question to [StackOverflow](https://stackoverflow.com/questions/tagged/dbt)
- post a conceptual question to the relevant database channel (#db-redshift, #dbt-snowflake, etc) in the [dbt Slack community](https://community.getdbt.com/)

* Redshift (Spectrum)
* Snowflake
* BigQuery
* Spark
* Synapse
Additional contributions to this package are very welcome! Please create issues or open PRs against `master`. Check out [this post](https://discourse.getdbt.com/t/contributing-to-an-external-dbt-package/657) on the best workflow for contributing to a package.
21 changes: 11 additions & 10 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ vars:

sources:
dbt_external_tables_integration_tests:
redshift_external:
+enabled: "{{ target.type == 'redshift' }}"
snowflake_external:
+enabled: "{{ target.type == 'snowflake' }}"
bigquery_external:
+enabled: "{{ target.type == 'bigquery' }}"
spark_external:
+enabled: "{{ target.type == 'spark' }}"
sqlserver_external:
+enabled: "{{ target.type == 'sqlserver' }}"
plugins:
redshift_external:
+enabled: "{{ target.type == 'redshift' }}"
snowflake_external:
+enabled: "{{ target.type == 'snowflake' }}"
bigquery_external:
+enabled: "{{ target.type == 'bigquery' }}"
spark_external:
+enabled: "{{ target.type == 'spark' }}"
sqlserver_external:
+enabled: "{{ target.type == 'sqlserver' }}"

seeds:
quote_columns: false
8 changes: 8 additions & 0 deletions integration_tests/macros/common/prep_external.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{% macro prep_external() %}
{{ return(adapter.dispatch('prep_external', dbt_external_tables._get_dbt_external_tables_namespaces())()) }}
{% endmacro %}

{% macro default__prep_external() %}
{% do log('No prep necessary, skipping', info = true) %}
{# noop #}
{% endmacro %}
19 changes: 19 additions & 0 deletions integration_tests/macros/plugins/redshift/prep_external.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{% macro redshift__prep_external() %}

{% set external_schema = target.schema ~ '_spectrum' %}

{% set create_external_schema %}

create external schema if not exists
{{ external_schema }}
from data catalog
database '{{ external_schema }}'
iam_role '{{ env_var("SPECTRUM_IAM_ROLE", "") }}'
create external database if not exists;

{% endset %}

{% do log('Creating external schema ' ~ external_schema, info = true) %}
{% do run_query(create_external_schema) %}

{% endmacro %}
16 changes: 16 additions & 0 deletions integration_tests/macros/plugins/snowflake/prep_external.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{% macro snowflake__prep_external() %}

{% set external_stage = target.schema ~ '.dbt_external_tables_testing' %}

{% set create_external_stage %}

create or replace stage
{{ external_stage }}
url = 's3://dbt-external-tables-testing';

{% endset %}

{% do log('Creating external stage ' ~ external_stage, info = true) %}
{% do run_query(create_external_stage) %}

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro test_sqlserver__equal_rowcount(model) %}
{% macro test_tsql_equal_rowcount(model) %}

{% set compare_model = kwargs.get('compare_model', kwargs.get('arg')) %}

Expand Down Expand Up @@ -32,7 +32,7 @@ select diff_count from final
{% endmacro %}


{% macro test_sqlserver__equality(model) %}
{% macro test_tsql_equality(model) %}


{#-- Prevent querying of db in parsing mode. This works because this macro does not create any new refs. #}
Expand Down
35 changes: 35 additions & 0 deletions integration_tests/macros/plugins/sqlserver/prep_external.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{% macro sqlserver__prep_external() %}

{% set external_data_source = target.schema ~ '.dbt_external_tables_testing' %}

{% set create_external_data_source %}
IF NOT EXISTS ( SELECT * FROM sys.external_data_sources WHERE name = '{{external_data_source}}' )

CREATE EXTERNAL DATA SOURCE [{{external_data_source}}] WITH (
TYPE = HADOOP,
LOCATION = 'wasbs://[email protected]'
)
{% endset %}

{% set external_file_format = target.schema ~ '.dbt_external_ff_testing' %}

{% set create_external_file_format %}
IF NOT EXISTS ( SELECT * FROM sys.external_file_formats WHERE name = '{{external_file_format}}' )

CREATE EXTERNAL FILE FORMAT [{{external_file_format}}]
WITH (
FORMAT_TYPE = DELIMITEDTEXT,
FORMAT_OPTIONS (
FIELD_TERMINATOR = N',',
FIRST_ROW = 2,
USE_TYPE_DEFAULT = True
)
)
{% endset %}

{% do log('Creating external data source ' ~ external_data_source, info = true) %}
{% do run_query(create_external_data_source) %}
{% do log('Creating external file format ' ~ external_file_format, info = true) %}
{% do run_query(create_external_file_format) %}

{% endmacro %}
Loading

0 comments on commit b7e8c28

Please sign in to comment.