From 3f8d3015f126e837ca1c59fbb06c574f52dee3f4 Mon Sep 17 00:00:00 2001 From: Andrew Fleischer Date: Tue, 6 Feb 2024 11:53:42 -0600 Subject: [PATCH 1/3] td assessment init --- .../dags/teradata/teradata_assessment_dag.py | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 src/datamigration/dags/teradata/teradata_assessment_dag.py diff --git a/src/datamigration/dags/teradata/teradata_assessment_dag.py b/src/datamigration/dags/teradata/teradata_assessment_dag.py new file mode 100644 index 0000000..d7e34b0 --- /dev/null +++ b/src/datamigration/dags/teradata/teradata_assessment_dag.py @@ -0,0 +1,44 @@ +""" +Example Airflow DAG for Google Cloud Storage to SFTP transfer operators. +""" +from __future__ import annotations + +from datetime import datetime +from google.cloud import bigquery +from airflow import models +from airflow.models.param import Param +from airflow.operators.python import PythonOperator +from airflow.providers.google.cloud.hooks.gcs import GCSHook + +TD_ASSESSMENT_TABLE="TableInfo" +DMT_CONFIG_TABLE_NAME="table_list.csv" + +default_dag_args = {"start_date": datetime(2022, 1, 1)} + +def upload_tables_to_gcs(project_id, dataset, bucket): + """ Queries the assessment dataset and uploads the table list as a CSV to GCS """ + client = bigquery.Client(project=project_id) + sql = f"SELECT TableNameOriginal FROM {project_id}.{dataset}.{TD_ASSESSMENT_TABLE}`" + tables = client.query(sql).to_dataframe().to_csv() + GCSHook().upload( + bucket_name=bucket, + object_name=DMT_CONFIG_TABLE_NAME, + data=tables + ) + +with models.DAG( +"teradata_asessment", +schedule_interval=None, +default_args=default_dag_args, +params={ + "project_id": Param("defaut", type="string"), + "dataset": Param("default", type="string"), + "bucket": Param("default", type="string") + }, +catchup=False, +) as dag: + PythonOperator( + task_id="test", + python_callable=upload_tables_to_gcs, + op_kwargs={"project_id": "{{ params.project_id }}", "dataset": "{{ params.dataset }}", "bucket": "{{ params.bucket }}"}, +) \ No newline at end of file From e912a5c1f5c801965f6ab86e86fdd950c0450220 Mon Sep 17 00:00:00 2001 From: Andrew Fleischer Date: Tue, 6 Feb 2024 12:59:14 -0600 Subject: [PATCH 2/3] linting error --- src/datamigration/dags/teradata/teradata_assessment_dag.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/datamigration/dags/teradata/teradata_assessment_dag.py b/src/datamigration/dags/teradata/teradata_assessment_dag.py index d7e34b0..7c6b845 100644 --- a/src/datamigration/dags/teradata/teradata_assessment_dag.py +++ b/src/datamigration/dags/teradata/teradata_assessment_dag.py @@ -1,5 +1,5 @@ """ -Example Airflow DAG for Google Cloud Storage to SFTP transfer operators. +Airflow DAG for creating the table_list.csv file from BQ Assessment. """ from __future__ import annotations @@ -18,7 +18,7 @@ def upload_tables_to_gcs(project_id, dataset, bucket): """ Queries the assessment dataset and uploads the table list as a CSV to GCS """ client = bigquery.Client(project=project_id) - sql = f"SELECT TableNameOriginal FROM {project_id}.{dataset}.{TD_ASSESSMENT_TABLE}`" + sql = f"SELECT TableNameOriginal FROM `{project_id}.{dataset}.{TD_ASSESSMENT_TABLE}`" tables = client.query(sql).to_dataframe().to_csv() GCSHook().upload( bucket_name=bucket, From 136ca80b0f60a893d0593a05fc3cb2ee123b1e4e Mon Sep 17 00:00:00 2001 From: Andrew Fleischer Date: Fri, 9 Feb 2024 13:55:06 -0600 Subject: [PATCH 3/3] add dag to generate DDLs --- .../dags/teradata/teradata_assessment_dag.py | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/src/datamigration/dags/teradata/teradata_assessment_dag.py b/src/datamigration/dags/teradata/teradata_assessment_dag.py index 7c6b845..149a8e4 100644 --- a/src/datamigration/dags/teradata/teradata_assessment_dag.py +++ b/src/datamigration/dags/teradata/teradata_assessment_dag.py @@ -11,10 +11,18 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook TD_ASSESSMENT_TABLE="TableInfo" +TD_ASSESSMENT_QUERY_LOGS_TABLE = "QueryLogs" DMT_CONFIG_TABLE_NAME="table_list.csv" default_dag_args = {"start_date": datetime(2022, 1, 1)} +# The the params in the UI to trigger the dag +# { +# "project_id": "assessment-dmt-integration", +# "dataset": "assessment_dmt", +# "bucket": "dvt-afleisc" +# } + def upload_tables_to_gcs(project_id, dataset, bucket): """ Queries the assessment dataset and uploads the table list as a CSV to GCS """ client = bigquery.Client(project=project_id) @@ -26,6 +34,18 @@ def upload_tables_to_gcs(project_id, dataset, bucket): data=tables ) +def upload_ddls(project_id, dataset, bucket): + """ Queries the assessment dataset for source DDLs and uploads SQLs to GCS """ + client = bigquery.Client(project=project_id) + sql = f"SELECT QueryId, QueryText FROM `{project_id}.{dataset}.{TD_ASSESSMENT_QUERY_LOGS_TABLE}` WHERE QueryType = 'DDL'" + ddls = client.query(sql).to_dataframe() + for index, row in ddls.iterrows(): + GCSHook().upload( + bucket_name=bucket, + object_name="ddl/"+row['QueryId']+'.sql', + data=row['QueryText'] + ) + with models.DAG( "teradata_asessment", schedule_interval=None, @@ -37,8 +57,16 @@ def upload_tables_to_gcs(project_id, dataset, bucket): }, catchup=False, ) as dag: - PythonOperator( - task_id="test", + generate_table_list = PythonOperator( + task_id="generate_table_list", python_callable=upload_tables_to_gcs, op_kwargs={"project_id": "{{ params.project_id }}", "dataset": "{{ params.dataset }}", "bucket": "{{ params.bucket }}"}, -) \ No newline at end of file + ) + generate_ddl_files = PythonOperator( + task_id="generate_ddl_files", + python_callable=upload_ddls, + op_kwargs={"project_id": "{{ params.project_id }}", "dataset": "{{ params.dataset }}", "bucket": "{{ params.bucket }}"} + ) + generate_table_list << generate_ddl_files + +