forked from blockchain-etl/ethereum-etl-airflow
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathethereum_sessions_dag.py
34 lines (28 loc) · 1.24 KB
/
ethereum_sessions_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from __future__ import print_function
from airflow.models import Variable
from datetime import datetime
from ethereumetl_airflow.build_sessions_dag import build_sessions_dag
import logging
import os
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
DAGS_FOLDER = os.environ.get('DAGS_FOLDER', '/home/airflow/gcs/dags')
sql_dir = os.path.join(DAGS_FOLDER, 'resources/stages/sessions/sqls')
environment = Variable.get('environment', 'prod')
# airflow DAG
DAG = build_sessions_dag(
dag_id='ethereum_sessions_dag',
output_bucket=Variable.get('ethereum_output_bucket'),
sql_dir=sql_dir,
source_project_id='bigquery-public-data',
source_dataset_name='crypto_ethereum',
destination_project_id=Variable.get('ethereum_destination_dataset_project_id'),
# Variables default to the prod values. Override for dev environment.
destination_dataset_name=Variable.get('ethereum_destination_dataset_name', 'crypto_ethereum'),
temp_dataset_name=Variable.get('ethereum_temp_dataset_name', 'crypto_ethereum_temp'),
# Load DAG should complete by 14:00.
schedule_interval='0 14 * * *',
start_date=datetime(2021, 12, 12),
notification_emails=Variable.get('notification_emails', None),
environment=environment
)