Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support python submissions #248

Merged
merged 102 commits into from
Feb 6, 2024

Conversation

Avinash-1394
Copy link
Contributor

@Avinash-1394 Avinash-1394 commented Apr 21, 2023

Description

Support dbt python models by using Spark

Docs - https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html

Bugs currently identified:

  • mock_athena does not support these functions yet
  • Incremental model does not fully utilize spark capabilities. In sense I didn’t rewrite the entire materialization logic in sql. It probably makes sense to slowly incorporate extensive spark logic like supplying a mode of table creation like ‘append’

Prerequisites

  1. A spark enabled work group created in athena
  2. Spark execution role granted access to Athena, Glue and S3
  3. The spark work group is added to the ~/.dbt/profiles.yml file and the profile is referenced in dbt_project.yml
analytics_spark:
  outputs:
    dev:
      database: awsdatacatalog
      region_name: us-east-1
      s3_data_dir: s3://dbt-athena/
      s3_staging_dir: s3://<user>-athena-query-results/
      schema: analytics_dev
      threads: 4
      type: athena
      work_group: primary
      spark_work_group: spark
      spark_threads: 4
  target: dev

Models used to test - Optional

You can add the below models to your dbt project or you can clone this repository and do poetry installation & run

model
{{ config(materialized="table") }}
select 1 as column_1, 2 as column_2, '{{ run_started_at.strftime("%Y-%m-%d") }}' as run_date
python_table
import pandas as pd


def model(dbt, session):
    dbt.config(materialized="table")

    model_df = pd.DataFrame({"A": [1, 2, 3, 4]})

    return model_df
python_table_2
def model(dbt, spark_session):
    dbt.config(materialized="table")

    data = [(1,), (2,), (3,), (4,)]

    df = spark_session.createDataFrame(data, ["A"])

    return df
python_incremental
import pandas as pd


def model(dbt, session):
    dbt.config(materialized="incremental")
    df = dbt.ref("model")

    if dbt.is_incremental:
        max_from_this = f"select max(run_date) from {dbt.this}"
        df = df.filter(df.run_date >= session.sql(max_from_this).collect()[0][0])

    return df

Build

dbt output


============================== 2023-04-24 03:03:32.852469 | 789ff864-d9d0-4f03-832e-6ea11cc3b9d2 ==============================
�[0m03:03:32.852469 [info ] [MainThread]: Running with dbt=1.4.6
�[0m03:03:32.861048 [debug] [MainThread]: running dbt with arguments {'debug': True, 'write_json': True, 'use_colors': True, 'printer_width': 80, 'version_check': True, 'partial_parse': True, 'static_parser': True, 'profiles_dir': '/home/avinash1394/.dbt', 'send_anonymous_usage_stats': True, 'quiet': False, 'no_print': False, 'cache_selected_only': False, 'which': 'run', 'rpc_method': 'run', 'indirect_selection': 'eager'}
�[0m03:03:32.885458 [debug] [MainThread]: Tracking: tracking
�[0m03:03:32.893004 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b8873c70>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b8872230>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b8873dc0>]}
�[0m03:03:32.988287 [debug] [MainThread]: checksum: 0325e47f1211ebbdb24627f81d8289d705fdb573380364d07ab35982cb3d57cd, vars: {}, profile: None, target: None, version: 1.4.6
�[0m03:03:33.082263 [debug] [MainThread]: Partial parsing enabled: 0 files deleted, 0 files added, 1 files changed.
�[0m03:03:33.083845 [debug] [MainThread]: Partial parsing: updated file: dbt_athena_spark://models/example/my_incremental_dbt_model.py
�[0m03:03:33.186668 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'load_project', 'label': '789ff864-d9d0-4f03-832e-6ea11cc3b9d2', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b83980d0>]}
�[0m03:03:33.200212 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'resource_counts', 'label': '789ff864-d9d0-4f03-832e-6ea11cc3b9d2', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b9408580>]}
�[0m03:03:33.201871 [info ] [MainThread]: Found 4 models, 0 tests, 0 snapshots, 0 analyses, 317 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
�[0m03:03:33.203041 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '789ff864-d9d0-4f03-832e-6ea11cc3b9d2', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b88c37c0>]}
�[0m03:03:33.206647 [info ] [MainThread]: 
�[0m03:03:33.211984 [debug] [MainThread]: Acquiring new athena connection 'master'
�[0m03:03:33.216454 [debug] [ThreadPool]: Acquiring new athena connection 'list_awsdatacatalog'
�[0m03:03:33.218693 [debug] [ThreadPool]: Opening a new connection, currently in state init
�[0m03:03:34.134575 [debug] [ThreadPool]: On list_awsdatacatalog: Close
�[0m03:03:34.143360 [debug] [ThreadPool]: Acquiring new athena connection 'list_awsdatacatalog_analytics_dev'
�[0m03:03:34.147709 [debug] [ThreadPool]: Opening a new connection, currently in state closed
�[0m03:03:35.064616 [debug] [ThreadPool]: On list_awsdatacatalog_analytics_dev: Close
�[0m03:03:35.085927 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '789ff864-d9d0-4f03-832e-6ea11cc3b9d2', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b85d86a0>]}
�[0m03:03:35.092783 [info ] [MainThread]: Concurrency: 4 threads (target='dev')
�[0m03:03:35.094955 [info ] [MainThread]: 
�[0m03:03:35.116562 [debug] [Thread-1 (]: Began running node model.dbt_athena_spark.model
�[0m03:03:35.118349 [debug] [Thread-2 (]: Began running node model.dbt_athena_spark.my_first_dbt_model
�[0m03:03:35.120612 [debug] [Thread-3 (]: Began running node model.dbt_athena_spark.my_second_dbt_model
�[0m03:03:35.123239 [info ] [Thread-1 (]: 1 of 4 START sql table model analytics_dev.model ............................... [RUN]
�[0m03:03:35.125298 [info ] [Thread-2 (]: 2 of 4 START python table model analytics_dev.my_first_dbt_model ............... [RUN]
�[0m03:03:35.128442 [info ] [Thread-3 (]: 3 of 4 START python table model analytics_dev.my_second_dbt_model .............. [RUN]
�[0m03:03:35.130792 [debug] [Thread-1 (]: Acquiring new athena connection 'model.dbt_athena_spark.model'
�[0m03:03:35.132943 [debug] [Thread-2 (]: Acquiring new athena connection 'model.dbt_athena_spark.my_first_dbt_model'
�[0m03:03:35.135974 [debug] [Thread-3 (]: Acquiring new athena connection 'model.dbt_athena_spark.my_second_dbt_model'
�[0m03:03:35.138529 [debug] [Thread-1 (]: Began compiling node model.dbt_athena_spark.model
�[0m03:03:35.141000 [debug] [Thread-2 (]: Began compiling node model.dbt_athena_spark.my_first_dbt_model
�[0m03:03:35.144665 [debug] [Thread-3 (]: Began compiling node model.dbt_athena_spark.my_second_dbt_model
�[0m03:03:35.161360 [debug] [Thread-1 (]: Writing injected SQL for node "model.dbt_athena_spark.model"
�[0m03:03:35.251605 [debug] [Thread-1 (]: Timing info for model.dbt_athena_spark.model (compile): 2023-04-24 03:03:35.147820 => 2023-04-24 03:03:35.251222
�[0m03:03:35.302277 [debug] [Thread-1 (]: Began executing node model.dbt_athena_spark.model
�[0m03:03:35.322788 [debug] [Thread-2 (]: Writing injected SQL for node "model.dbt_athena_spark.my_first_dbt_model"
�[0m03:03:35.346870 [debug] [Thread-3 (]: Writing injected SQL for node "model.dbt_athena_spark.my_second_dbt_model"
�[0m03:03:35.380844 [debug] [Thread-2 (]: Timing info for model.dbt_athena_spark.my_first_dbt_model (compile): 2023-04-24 03:03:35.162585 => 2023-04-24 03:03:35.380484
�[0m03:03:35.399452 [debug] [Thread-2 (]: Began executing node model.dbt_athena_spark.my_first_dbt_model
�[0m03:03:35.440431 [debug] [Thread-3 (]: Timing info for model.dbt_athena_spark.my_second_dbt_model (compile): 2023-04-24 03:03:35.178590 => 2023-04-24 03:03:35.440220
�[0m03:03:35.457461 [debug] [Thread-3 (]: Began executing node model.dbt_athena_spark.my_second_dbt_model
�[0m03:03:35.473425 [debug] [Thread-2 (]: Opening a new connection, currently in state init
�[0m03:03:35.470513 [debug] [Thread-1 (]: Opening a new connection, currently in state closed
�[0m03:03:35.603618 [debug] [Thread-3 (]: Opening a new connection, currently in state init
�[0m03:03:36.767588 [debug] [Thread-2 (]: Athena adapter: table_name : my_first_dbt_model
�[0m03:03:36.771058 [debug] [Thread-2 (]: Athena adapter: table type : TableType.TABLE
�[0m03:03:36.776069 [debug] [Thread-2 (]: Using athena connection "model.dbt_athena_spark.my_first_dbt_model"
�[0m03:03:36.778928 [debug] [Thread-2 (]: On model.dbt_athena_spark.my_first_dbt_model: drop table if exists `awsdatacatalog`.`analytics_dev`.`my_first_dbt_model`
      
�[0m03:03:36.845589 [debug] [Thread-1 (]: Athena adapter: table_name : model
�[0m03:03:36.847637 [debug] [Thread-1 (]: Athena adapter: table type : TableType.TABLE
�[0m03:03:36.851712 [debug] [Thread-1 (]: Using athena connection "model.dbt_athena_spark.model"
�[0m03:03:36.853669 [debug] [Thread-1 (]: On model.dbt_athena_spark.model: drop table if exists `awsdatacatalog`.`analytics_dev`.`model`
      
�[0m03:03:37.389227 [debug] [Thread-3 (]: Athena adapter: S3 path does not exist
�[0m03:03:37.412152 [debug] [Thread-3 (]: Writing runtime python for node "model.dbt_athena_spark.my_second_dbt_model"
�[0m03:03:37.414169 [debug] [Thread-3 (]: On model.dbt_athena_spark.my_second_dbt_model: 
  
    

  
    
    
    
    
    
    
    
    
    

import pyspark


import pandas as pd


def model(dbt, spark_session):
    dbt.config(materialized="table")

    model_df = pd.DataFrame({"A": [1, 2, 3, 4]})

    return model_df


# This part is user provided model code
# you will need to copy the next section to run the code
# COMMAND ----------
# this part is dbt logic for get ref work, do not modify

def ref(*args,dbt_load_df_function):
    refs = {}
    key = '.'.join(args)
    return dbt_load_df_function(refs[key])


def source(*args, dbt_load_df_function):
    sources = {}
    key = '.'.join(args)
    return dbt_load_df_function(sources[key])


config_dict = {}


class config:
    def __init__(self, *args, **kwargs):
        pass

    @staticmethod
    def get(key, default=None):
        return config_dict.get(key, default)

class this:
    """dbt.this() or dbt.this.identifier"""
    database = "awsdatacatalog"
    schema = "analytics_dev"
    identifier = "my_second_dbt_model"
    
    def __repr__(self):
        return 'awsdatacatalog.analytics_dev.my_second_dbt_model'


class dbtObj:
    def __init__(self, load_df_function) -> None:
        self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function)
        self.ref = lambda *args: ref(*args, dbt_load_df_function=load_df_function)
        self.config = config
        self.this = this()
        self.is_incremental = False

# COMMAND ----------



def materialize(spark_session, df, target_relation):
    import pandas
    try:
        if isinstance(df, pyspark.sql.dataframe.DataFrame):
            pass
        elif isinstance(df, pandas.core.frame.DataFrame):
            df = spark_session.createDataFrame(df)
        else:
            msg = f"{type(df)} is not a supported type for dbt Python materialization"
            raise Exception(msg)
        writer = df.write \
        .format("parquet") \
        .mode("overwrite") \
        .option("path", "s3://dbt-athena/analytics_dev/my_second_dbt_model/dbb32732-5233-450c-8426-2d413857be41") \
        .option("compression", "None") \
        .option("mergeSchema", "True") \
        .option("delimiter", "None")
        if None is not None:
            writer = writer.partitionBy(None)
        if None is not None:
            writer = writer.bucketBy(None,None)
        if None is not None:
            writer = writer.sortBy(None)
        writer.saveAsTable(
            name="analytics_dev.my_second_dbt_model",
        )
        return "OK"
    except Exception:
        raise

def get_spark_df(identifier):
    """
    Override the arguments to ref and source dynamically

    spark.table('awsdatacatalog.analytics_dev.model')
    Raises pyspark.sql.utils.AnalysisException:
    spark_catalog requires a single-part namespace,
    but got [awsdatacatalog, analytics_dev]

    So the override removes the catalog component and only
    provides the schema and identifer to spark.table()
    """
    return spark.table(identifier.split(".", 1)[1])

class SparkdbtObj(dbtObj):
    def __init__(self):
        super().__init__(load_df_function=get_spark_df)
        self.source = lambda *args: source(*args, dbt_load_df_function=get_spark_df)
        self.ref = lambda *args: ref(*args, dbt_load_df_function=get_spark_df)

dbt = SparkdbtObj()
df = model(dbt, spark)
materialize(spark, df, dbt.this)
  
�[0m03:03:38.328278 [info ] [Thread-3 (]: Athena adapter: Setting polling_interval: 5
�[0m03:03:40.110211 [debug] [Thread-1 (]: SQL status: OK -1 in 3 seconds
�[0m03:03:40.115494 [debug] [Thread-2 (]: SQL status: OK -1 in 3 seconds
�[0m03:03:41.308563 [debug] [Thread-2 (]: Athena adapter: S3 path does not exist
�[0m03:03:41.310848 [debug] [Thread-2 (]: Writing runtime python for node "model.dbt_athena_spark.my_first_dbt_model"
�[0m03:03:41.312189 [debug] [Thread-2 (]: On model.dbt_athena_spark.my_first_dbt_model: 
  
    

  
    
    
    
    
    
    
    
    
    

import pyspark


def model(dbt, spark_session):
    dbt.config(materialized="table")

    data = [(1,), (2,), (3,), (4,)]

    df = spark_session.createDataFrame(data, ["A"])

    return df


# This part is user provided model code
# you will need to copy the next section to run the code
# COMMAND ----------
# this part is dbt logic for get ref work, do not modify

def ref(*args,dbt_load_df_function):
    refs = {}
    key = '.'.join(args)
    return dbt_load_df_function(refs[key])


def source(*args, dbt_load_df_function):
    sources = {}
    key = '.'.join(args)
    return dbt_load_df_function(sources[key])


config_dict = {}


class config:
    def __init__(self, *args, **kwargs):
        pass

    @staticmethod
    def get(key, default=None):
        return config_dict.get(key, default)

class this:
    """dbt.this() or dbt.this.identifier"""
    database = "awsdatacatalog"
    schema = "analytics_dev"
    identifier = "my_first_dbt_model"
    
    def __repr__(self):
        return 'awsdatacatalog.analytics_dev.my_first_dbt_model'


class dbtObj:
    def __init__(self, load_df_function) -> None:
        self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function)
        self.ref = lambda *args: ref(*args, dbt_load_df_function=load_df_function)
        self.config = config
        self.this = this()
        self.is_incremental = False

# COMMAND ----------



def materialize(spark_session, df, target_relation):
    import pandas
    try:
        if isinstance(df, pyspark.sql.dataframe.DataFrame):
            pass
        elif isinstance(df, pandas.core.frame.DataFrame):
            df = spark_session.createDataFrame(df)
        else:
            msg = f"{type(df)} is not a supported type for dbt Python materialization"
            raise Exception(msg)
        writer = df.write \
        .format("parquet") \
        .mode("overwrite") \
        .option("path", "s3://dbt-athena/analytics_dev/my_first_dbt_model/15e69a57-b7a1-4c30-811c-c3100372c2e5") \
        .option("compression", "None") \
        .option("mergeSchema", "True") \
        .option("delimiter", "None")
        if None is not None:
            writer = writer.partitionBy(None)
        if None is not None:
            writer = writer.bucketBy(None,None)
        if None is not None:
            writer = writer.sortBy(None)
        writer.saveAsTable(
            name="analytics_dev.my_first_dbt_model",
        )
        return "OK"
    except Exception:
        raise

def get_spark_df(identifier):
    """
    Override the arguments to ref and source dynamically

    spark.table('awsdatacatalog.analytics_dev.model')
    Raises pyspark.sql.utils.AnalysisException:
    spark_catalog requires a single-part namespace,
    but got [awsdatacatalog, analytics_dev]

    So the override removes the catalog component and only
    provides the schema and identifer to spark.table()
    """
    return spark.table(identifier.split(".", 1)[1])

class SparkdbtObj(dbtObj):
    def __init__(self):
        super().__init__(load_df_function=get_spark_df)
        self.source = lambda *args: source(*args, dbt_load_df_function=get_spark_df)
        self.ref = lambda *args: ref(*args, dbt_load_df_function=get_spark_df)

dbt = SparkdbtObj()
df = model(dbt, spark)
materialize(spark, df, dbt.this)
  
�[0m03:03:41.332940 [debug] [Thread-1 (]: Athena adapter: S3 path does not exist
�[0m03:03:41.337672 [debug] [Thread-1 (]: Writing runtime sql for node "model.dbt_athena_spark.model"
�[0m03:03:41.341795 [debug] [Thread-1 (]: Using athena connection "model.dbt_athena_spark.model"
�[0m03:03:41.345560 [debug] [Thread-1 (]: On model.dbt_athena_spark.model: -- /* {"app": "dbt", "dbt_version": "1.4.6", "profile_name": "analytics_spark", "target_name": "dev", "node_id": "model.dbt_athena_spark.model"} */

  
    

  create table "awsdatacatalog"."analytics_dev"."model"
    with (
      table_type='hive',
      is_external=true,external_location='s3://dbt-athena/analytics_dev/model/0eac81fb-6ca9-4cdc-ad93-1958d910b455',
      format='parquet'
    )
    as
      
select 1 as column_1, 2 as column_2, '2023-04-24' as run_date
  
�[0m03:03:42.416395 [info ] [Thread-2 (]: Athena adapter: Setting polling_interval: 5
�[0m03:03:43.473079 [info ] [Thread-3 (]: Athena adapter: Setting timeout: 7200
�[0m03:03:44.045645 [debug] [Thread-1 (]: SQL status: OK -1 in 3 seconds
�[0m03:03:44.077064 [debug] [Thread-1 (]: Using athena connection "model.dbt_athena_spark.model"
�[0m03:03:44.080093 [debug] [Thread-1 (]: On model.dbt_athena_spark.model: alter table `awsdatacatalog`.`analytics_dev`.`model` set tblproperties ('classification' = 'parquet')
�[0m03:03:45.632106 [debug] [Thread-1 (]: SQL status: OK -1 in 2 seconds
�[0m03:03:45.672557 [debug] [Thread-1 (]: Timing info for model.dbt_athena_spark.model (execute): 2023-04-24 03:03:35.330331 => 2023-04-24 03:03:45.672108
�[0m03:03:45.676715 [debug] [Thread-1 (]: On model.dbt_athena_spark.model: Close
�[0m03:03:45.682913 [debug] [Thread-1 (]: Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '789ff864-d9d0-4f03-832e-6ea11cc3b9d2', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b86fdf90>]}
�[0m03:03:45.687510 [info ] [Thread-1 (]: 1 of 4 OK created sql table model analytics_dev.model .......................... [�[32mOK -1�[0m in 10.55s]
�[0m03:03:45.699372 [debug] [Thread-1 (]: Finished running node model.dbt_athena_spark.model
�[0m03:03:45.706811 [debug] [Thread-4 (]: Began running node model.dbt_athena_spark.my_incremental_dbt_model
�[0m03:03:45.712304 [info ] [Thread-4 (]: 4 of 4 START python incremental model analytics_dev.my_incremental_dbt_model ... [RUN]
�[0m03:03:45.718559 [debug] [Thread-4 (]: Acquiring new athena connection 'model.dbt_athena_spark.my_incremental_dbt_model'
�[0m03:03:45.722657 [debug] [Thread-4 (]: Began compiling node model.dbt_athena_spark.my_incremental_dbt_model
�[0m03:03:45.769420 [debug] [Thread-4 (]: Writing injected SQL for node "model.dbt_athena_spark.my_incremental_dbt_model"
�[0m03:03:45.774403 [debug] [Thread-4 (]: Timing info for model.dbt_athena_spark.my_incremental_dbt_model (compile): 2023-04-24 03:03:45.726009 => 2023-04-24 03:03:45.773894
�[0m03:03:45.778968 [debug] [Thread-4 (]: Began executing node model.dbt_athena_spark.my_incremental_dbt_model
�[0m03:03:46.106963 [debug] [Thread-4 (]: temporary relation isanalytics_devmy_incremental_dbt_model__dbt_tmp
�[0m03:03:46.117859 [debug] [Thread-4 (]: Opening a new connection, currently in state init
�[0m03:03:46.982432 [debug] [Thread-4 (]: Athena adapter: Error calling Glue get_table: An error occurred (EntityNotFoundException) when calling the GetTable operation: Table my_incremental_dbt_model__dbt_tmp not found.
�[0m03:03:47.511342 [info ] [Thread-2 (]: Athena adapter: Setting timeout: 7200
�[0m03:03:47.974021 [debug] [Thread-4 (]: Athena adapter: S3 path does not exist
�[0m03:03:47.976629 [debug] [Thread-4 (]: On model.dbt_athena_spark.my_incremental_dbt_model: 
  
    

  
    
    
    
    
    
    
    
    
    

import pyspark


import pandas as pd


def model(dbt, spark_session):
    dbt.config(materialized="incremental")
    df = dbt.ref("model")

    if dbt.is_incremental:
        max_from_this = f"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}"
        df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0])

    return df


# This part is user provided model code
# you will need to copy the next section to run the code
# COMMAND ----------
# this part is dbt logic for get ref work, do not modify

def ref(*args,dbt_load_df_function):
    refs = {"model": "awsdatacatalog.analytics_dev.model"}
    key = '.'.join(args)
    return dbt_load_df_function(refs[key])


def source(*args, dbt_load_df_function):
    sources = {}
    key = '.'.join(args)
    return dbt_load_df_function(sources[key])


config_dict = {}


class config:
    def __init__(self, *args, **kwargs):
        pass

    @staticmethod
    def get(key, default=None):
        return config_dict.get(key, default)

class this:
    """dbt.this() or dbt.this.identifier"""
    database = "awsdatacatalog"
    schema = "analytics_dev"
    identifier = "my_incremental_dbt_model"
    
    def __repr__(self):
        return 'awsdatacatalog.analytics_dev.my_incremental_dbt_model'


class dbtObj:
    def __init__(self, load_df_function) -> None:
        self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function)
        self.ref = lambda *args: ref(*args, dbt_load_df_function=load_df_function)
        self.config = config
        self.this = this()
        self.is_incremental = True

# COMMAND ----------



def materialize(spark_session, df, target_relation):
    import pandas
    try:
        if isinstance(df, pyspark.sql.dataframe.DataFrame):
            pass
        elif isinstance(df, pandas.core.frame.DataFrame):
            df = spark_session.createDataFrame(df)
        else:
            msg = f"{type(df)} is not a supported type for dbt Python materialization"
            raise Exception(msg)
        writer = df.write \
        .format("parquet") \
        .mode("overwrite") \
        .option("path", "s3://dbt-athena/analytics_dev/my_incremental_dbt_model__dbt_tmp/455c239f-bbb2-4f40-8421-0f952ba335f1") \
        .option("compression", "None") \
        .option("mergeSchema", "True") \
        .option("delimiter", "None")
        if None is not None:
            writer = writer.partitionBy(None)
        if None is not None:
            writer = writer.bucketBy(None,None)
        if None is not None:
            writer = writer.sortBy(None)
        writer.saveAsTable(
            name="analytics_dev.my_incremental_dbt_model__dbt_tmp",
        )
        return "OK"
    except Exception:
        raise

def get_spark_df(identifier):
    """
    Override the arguments to ref and source dynamically

    spark.table('awsdatacatalog.analytics_dev.model')
    Raises pyspark.sql.utils.AnalysisException:
    spark_catalog requires a single-part namespace,
    but got [awsdatacatalog, analytics_dev]

    So the override removes the catalog component and only
    provides the schema and identifer to spark.table()
    """
    return spark.table(identifier.split(".", 1)[1])

class SparkdbtObj(dbtObj):
    def __init__(self):
        super().__init__(load_df_function=get_spark_df)
        self.source = lambda *args: source(*args, dbt_load_df_function=get_spark_df)
        self.ref = lambda *args: ref(*args, dbt_load_df_function=get_spark_df)

dbt = SparkdbtObj()
df = model(dbt, spark)
materialize(spark, df, dbt.this)
  
�[0m03:03:48.108662 [info ] [Thread-2 (]: Athena adapter: Setting polling_interval: 5
�[0m03:03:49.072742 [info ] [Thread-4 (]: Athena adapter: Setting polling_interval: 5
�[0m03:03:53.636158 [info ] [Thread-3 (]: Athena adapter: Setting timeout: 7200
�[0m03:03:53.713150 [info ] [Thread-2 (]: Athena adapter: Setting timeout: 7200
�[0m03:03:53.850003 [info ] [Thread-3 (]: Athena adapter: Setting timeout: 7200
�[0m03:03:53.853534 [debug] [Thread-3 (]: Athena adapter: Received execution status COMPLETED
�[0m03:03:53.938440 [debug] [Thread-3 (]: Execution status: OK in 16.52 seconds
�[0m03:03:53.948239 [debug] [Thread-3 (]: Timing info for model.dbt_athena_spark.my_second_dbt_model (execute): 2023-04-24 03:03:35.474440 => 2023-04-24 03:03:53.948024
�[0m03:03:53.949774 [debug] [Thread-3 (]: On model.dbt_athena_spark.my_second_dbt_model: Close
�[0m03:03:53.952000 [debug] [Thread-3 (]: Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '789ff864-d9d0-4f03-832e-6ea11cc3b9d2', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b890fb20>]}
�[0m03:03:53.955149 [info ] [Thread-3 (]: 3 of 4 OK created python table model analytics_dev.my_second_dbt_model ......... [�[32mOK�[0m in 18.82s]
�[0m03:03:53.959070 [debug] [Thread-3 (]: Finished running node model.dbt_athena_spark.my_second_dbt_model
�[0m03:03:54.196468 [info ] [Thread-4 (]: Athena adapter: Setting timeout: 7200
�[0m03:04:03.896132 [info ] [Thread-2 (]: Athena adapter: Setting timeout: 7200
�[0m03:04:04.163218 [info ] [Thread-2 (]: Athena adapter: Setting timeout: 7200
�[0m03:04:04.166217 [debug] [Thread-2 (]: Athena adapter: Received execution status COMPLETED
�[0m03:04:04.255882 [debug] [Thread-2 (]: Execution status: OK in 22.94 seconds
�[0m03:04:04.261209 [debug] [Thread-2 (]: Timing info for model.dbt_athena_spark.my_first_dbt_model (execute): 2023-04-24 03:03:35.409212 => 2023-04-24 03:04:04.261005
�[0m03:04:04.262891 [debug] [Thread-2 (]: On model.dbt_athena_spark.my_first_dbt_model: Close
�[0m03:04:04.265174 [debug] [Thread-2 (]: Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '789ff864-d9d0-4f03-832e-6ea11cc3b9d2', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b890dfc0>]}
�[0m03:04:04.266638 [info ] [Thread-2 (]: 2 of 4 OK created python table model analytics_dev.my_first_dbt_model .......... [�[32mOK�[0m in 29.13s]
�[0m03:04:04.267765 [debug] [Thread-2 (]: Finished running node model.dbt_athena_spark.my_first_dbt_model
�[0m03:04:04.316244 [info ] [Thread-4 (]: Athena adapter: Setting timeout: 7200
�[0m03:04:04.481018 [info ] [Thread-4 (]: Athena adapter: Setting timeout: 7200
�[0m03:04:04.482884 [debug] [Thread-4 (]: Athena adapter: Received execution status COMPLETED
�[0m03:04:04.560360 [debug] [Thread-4 (]: Execution status: OK in 16.58 seconds
�[0m03:04:05.778342 [debug] [Thread-4 (]: Athena adapter: table_name : my_incremental_dbt_model
�[0m03:04:05.781371 [debug] [Thread-4 (]: Athena adapter: table type : TableType.TABLE
�[0m03:04:05.787111 [debug] [Thread-4 (]: Athena adapter: Columns in relation my_incremental_dbt_model: [{'Name': 'column_1', 'Type': 'int', 'Comment': ''}, {'Name': 'column_2', 'Type': 'int', 'Comment': ''}, {'Name': 'run_date', 'Type': 'varchar(10)', 'Comment': ''}]
�[0m03:04:05.792233 [debug] [Thread-4 (]: 
    
    insert into awsdatacatalog.analytics_dev.my_incremental_dbt_model ("column_1", "column_2", "run_date")
    (
       select "column_1", "column_2", "run_date"
       from awsdatacatalog.analytics_dev.my_incremental_dbt_model__dbt_tmp
    );
�[0m03:04:05.804964 [debug] [Thread-4 (]: Writing runtime python for node "model.dbt_athena_spark.my_incremental_dbt_model"
�[0m03:04:05.818700 [debug] [Thread-4 (]: On model.dbt_athena_spark.my_incremental_dbt_model: 
    
      
          
          
      
    
  
�[0m03:04:07.096735 [info ] [Thread-4 (]: Athena adapter: Setting polling_interval: 5
�[0m03:04:07.294370 [info ] [Thread-4 (]: Athena adapter: Setting timeout: 7200
�[0m03:04:07.296004 [debug] [Thread-4 (]: Athena adapter: Received execution status COMPLETED
�[0m03:04:07.372022 [debug] [Thread-4 (]: Execution status: OK in 1.55 seconds
�[0m03:04:07.898391 [debug] [Thread-4 (]: Athena adapter: table_name : my_incremental_dbt_model__dbt_tmp
�[0m03:04:07.899513 [debug] [Thread-4 (]: Athena adapter: table type : TableType.TABLE
�[0m03:04:07.902083 [debug] [Thread-4 (]: Using athena connection "model.dbt_athena_spark.my_incremental_dbt_model"
�[0m03:04:07.903065 [debug] [Thread-4 (]: On model.dbt_athena_spark.my_incremental_dbt_model: drop table if exists awsdatacatalog.analytics_dev.my_incremental_dbt_model__dbt_tmp
      
�[0m03:04:10.773757 [debug] [Thread-4 (]: SQL status: OK -1 in 3 seconds
�[0m03:04:10.789147 [debug] [Thread-4 (]: Timing info for model.dbt_athena_spark.my_incremental_dbt_model (execute): 2023-04-24 03:03:45.782682 => 2023-04-24 03:04:10.788269
�[0m03:04:10.793213 [debug] [Thread-4 (]: On model.dbt_athena_spark.my_incremental_dbt_model: Close
�[0m03:04:10.800660 [debug] [Thread-4 (]: Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '789ff864-d9d0-4f03-832e-6ea11cc3b9d2', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b46fc0d0>]}
�[0m03:04:10.806328 [info ] [Thread-4 (]: 4 of 4 OK created python incremental model analytics_dev.my_incremental_dbt_model  [�[32mOK�[0m in 25.08s]
�[0m03:04:10.810147 [debug] [Thread-4 (]: Finished running node model.dbt_athena_spark.my_incremental_dbt_model
�[0m03:04:10.819259 [debug] [MainThread]: Acquiring new athena connection 'master'
�[0m03:04:10.824475 [debug] [MainThread]: Connection 'master' was properly closed.
�[0m03:04:10.827098 [debug] [MainThread]: Connection 'model.dbt_athena_spark.model' was properly closed.
�[0m03:04:10.829695 [debug] [MainThread]: Connection 'model.dbt_athena_spark.my_first_dbt_model' was properly closed.
�[0m03:04:10.832262 [debug] [MainThread]: Connection 'model.dbt_athena_spark.my_second_dbt_model' was properly closed.
�[0m03:04:10.834723 [debug] [MainThread]: Connection 'model.dbt_athena_spark.my_incremental_dbt_model' was properly closed.
�[0m03:04:10.842942 [info ] [MainThread]: 
�[0m03:04:10.847754 [info ] [MainThread]: Finished running 3 table models, 1 incremental model in 0 hours 0 minutes and 37.63 seconds (37.63s).
�[0m03:04:10.852165 [debug] [MainThread]: Command end result
�[0m03:04:10.893058 [info ] [MainThread]: 
�[0m03:04:10.896102 [info ] [MainThread]: �[32mCompleted successfully�[0m
�[0m03:04:10.898209 [info ] [MainThread]: 
�[0m03:04:10.899827 [info ] [MainThread]: Done. PASS=4 WARN=0 ERROR=0 SKIP=0 TOTAL=4
�[0m03:04:10.901772 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b93fb400>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b93fb940>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f90b8f6b820>]}
�[0m03:04:10.904243 [debug] [MainThread]: Flushing usage events

Tests added

  1. Starting a spark session
  2. Polling session status
  3. Setting engine configuration
  4. Setting timeout
  5. Setting polling interval
  6. Listing sessions

TODO:

  1. Check if calculation is submitted
  2. Check if execution is polled

Checklist

  • You followed contributing section
  • You added unit testing when necessary
  • You added functional testing when necessary

Avinash-1394 and others added 30 commits March 27, 2023 16:22
Co-authored-by: nicor88 <[email protected]>
Co-authored-by: Jérémy Guiselin <[email protected]>
Co-authored-by: allcontributors[bot] <46447321+allcontributors[bot]@users.noreply.github.com>
Co-authored-by: allcontributors[bot] <46447321+allcontributors[bot]@users.noreply.github.com>
Co-authored-by: allcontributors[bot] <46447321+allcontributors[bot]@users.noreply.github.com>
Co-authored-by: Mattia <[email protected]>
)

Co-authored-by: allcontributors[bot] <46447321+allcontributors[bot]@users.noreply.github.com>
Co-authored-by: allcontributors[bot] <46447321+allcontributors[bot]@users.noreply.github.com>
Co-authored-by: Mattia <[email protected]>
Co-authored-by: allcontributors[bot] <46447321+allcontributors[bot]@users.noreply.github.com>
Co-authored-by: allcontributors[bot] <46447321+allcontributors[bot]@users.noreply.github.com>
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: nicor88 <[email protected]>
Co-authored-by: Jeremy Guiselin <[email protected]>
@Jrmyy Jrmyy self-requested a review December 14, 2023 09:51
Avinash-1394 and others added 4 commits January 2, 2024 14:52
I am seeing an empty calculation along with main python model code calculation is submitted for almost every model
Also, if not returning the result json, we are getting green ERROR messages instead of OK messages.
And with this handling, I am not seeing the run model code in target folder every model under run folder seems to be empty.
Need to address this work around solution in order to have the target folder show the run model content.
Copy link
Contributor

@svdimchenko svdimchenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Avinash-1394 solid job done. There are some minor points to improve, but we can do it later. As this PR is waiting for so long time, we can merge and try it as @nicor88 suggested.

Copy link
Contributor

@nicor88 nicor88 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @Avinash-1394
this will be included in the next release 💯
Sorry for letting you wait so long.

@nicor88 nicor88 merged commit 8d7dea4 into dbt-labs:main Feb 6, 2024
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants