Skip to content

Commit

Permalink
Remove dbt-core dependency
Browse files Browse the repository at this point in the history
Signed-off-by: popcorny <[email protected]>
  • Loading branch information
popcornylu committed May 8, 2024
1 parent c2902be commit 038d508
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 43 deletions.
13 changes: 1 addition & 12 deletions recce/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Dict, Optional, Tuple

import agate
from typing import Callable, Dict, Optional

from recce.adapter.base import BaseAdapter
from recce.models import RunDAO, CheckDAO, Check
Expand Down Expand Up @@ -65,15 +63,6 @@ def get_node_name_by_id(self, unique_id: object) -> object:
def generate_sql(self, sql_template: str, base: bool = False, context: Dict = {}):
self.adapter.generate_sql(sql_template, base=base, context=context)

def execute(
self,
sql: str,
auto_begin: bool = False,
fetch: bool = False,
limit: Optional[int] = None
) -> Tuple[any, agate.Table]:
return self.adapter.execute(sql, auto_begin=auto_begin, fetch=fetch, limit=limit)

def get_lineage(self, base: Optional[bool] = False):
return self.adapter.get_lineage(base=base)

Expand Down
2 changes: 1 addition & 1 deletion recce/tasks/histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from dateutil.relativedelta import relativedelta

from recce.adapter.dbt_adapter import DbtAdapter
from recce.core import default_context
from recce.tasks import Task
from recce.tasks.core import TaskResultDiffer
Expand Down Expand Up @@ -293,6 +292,7 @@ def __init__(self, params: HistogramDiffParams):
self.connection = None

def execute(self):
from recce.adapter.dbt_adapter import DbtAdapter
result = {}

dbt_adapter: DbtAdapter = default_context().adapter
Expand Down
21 changes: 4 additions & 17 deletions recce/tasks/profile.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from typing import TypedDict, List

import agate
from pydantic import BaseModel

from recce.adapter.dbt_adapter import DbtAdapter, merge_tables
from .core import Task, TaskResultDiffer
from .dataframe import DataFrame
from ..core import default_context
Expand All @@ -27,6 +25,8 @@ def __init__(self, params: ProfileParams):
self.connection = None

def execute(self):
import agate
from recce.adapter.dbt_adapter import DbtAdapter, merge_tables
dbt_adapter: DbtAdapter = default_context().adapter

model: str = self.params['model']
Expand Down Expand Up @@ -72,7 +72,7 @@ def _verify_dbt_profiler(self, dbt_adapter):
raise RecceException(
r"Package 'dbt_profiler' not found. Please refer to the link to install: https://hub.getdbt.com/data-mie/dbt_profiler/")

def _profile_column(self, dbt_adapter: DbtAdapter, relation, column):
def _profile_column(self, dbt_adapter, relation, column):

sql_template = r"""
select
Expand Down Expand Up @@ -116,24 +116,11 @@ def _profile_column(self, dbt_adapter: DbtAdapter, relation, column):
e = RecceException('No profile diff result due to the model is empty.', False)
raise e

def _to_dataframe(self, table: agate.Table):
import pandas as pd
import json

df = pd.DataFrame([row.values() for row in table.rows], columns=table.column_names)

for column_name, column_type in zip(table.column_names, table.column_types):
if column_name.lower() == 'not_null_proportion':
df[column_name] = df[column_name].astype('float')
if column_name.lower() == 'distinct_proportion':
df[column_name] = df[column_name].astype('float')
result_json = df.to_json(orient='table')
return json.loads(result_json)

def cancel(self):
super().cancel()

if self.connection:
from recce.adapter.dbt_adapter import DbtAdapter
dbt_adapter: DbtAdapter = default_context().adapter
with dbt_adapter.connection_named("cancel"):
dbt_adapter.cancel(self.connection)
Expand Down
9 changes: 6 additions & 3 deletions recce/tasks/query.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typing
from typing import TypedDict, Optional, Tuple, List

import agate
from pydantic import BaseModel

from .core import Task, TaskResultDiffer
Expand All @@ -11,6 +11,9 @@

QUERY_LIMIT = 2000

if typing.TYPE_CHECKING:
import agate


class QueryMixin:
@classmethod
Expand All @@ -19,7 +22,7 @@ def execute_sql_with_limit(
sql_template,
base: bool = False,
limit: Optional[int] = None
) -> Tuple[agate.Table, bool]:
) -> Tuple['agate.Table', bool]:
"""
Execute a SQL template and return the result as an agate table.
:param sql_template: SQL template to execute
Expand All @@ -45,7 +48,7 @@ def execute_sql_with_limit(
raise RecceException(f"Jinja template error: line {e.lineno}: {str(e)}")

@classmethod
def execute_sql(cls, sql_template, base: bool = False) -> agate.Table:
def execute_sql(cls, sql_template, base: bool = False) -> 'agate.Table':
result, _ = cls.execute_sql_with_limit(sql_template, base)
return result

Expand Down
3 changes: 1 addition & 2 deletions recce/tasks/rowcount.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import TypedDict, Optional

from recce.adapter.dbt_adapter import DbtAdapter
from recce.core import default_context
from recce.tasks import Task
from recce.tasks.core import TaskResultDiffer
Expand All @@ -18,7 +17,7 @@ def __init__(self, params: RowCountDiffParams):
self.params = params
self.connection = None

def _query_row_count(self, dbt_adapter: DbtAdapter, model_name, base=False):
def _query_row_count(self, dbt_adapter, model_name, base=False):
node = dbt_adapter.find_node_by_name(model_name, base=base)
if node is None:
return None
Expand Down
4 changes: 2 additions & 2 deletions recce/tasks/top_k.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import TypedDict

from recce.adapter.dbt_adapter import DbtAdapter
from recce.core import default_context
from recce.tasks import Task
from recce.tasks.core import TaskResultDiffer
Expand All @@ -20,7 +19,7 @@ def __init__(self, params: TopKDiffParams):
self.params = params
self.connection = None

def _query_row_count_diff(self, dbt_adapter: DbtAdapter, base_relation, curr_relation, column):
def _query_row_count_diff(self, dbt_adapter, base_relation, curr_relation, column):
"""
Query the row count of the base and current relations
Expand Down Expand Up @@ -98,6 +97,7 @@ def _query_top_k(self, dbt_adapter, base_relation, curr_relation, column, k):

def execute(self):

from recce.adapter.dbt_adapter import DbtAdapter
dbt_adapter: DbtAdapter = default_context().adapter

with dbt_adapter.connection_named("query"):
Expand Down
18 changes: 12 additions & 6 deletions recce/tasks/valuediff.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import typing
from typing import TypedDict, Optional, List, Union

import agate
if typing.TYPE_CHECKING:
import agate

from pydantic import BaseModel

from .core import Task, TaskResultDiffer
from .dataframe import DataFrame
from ..adapter.dbt_adapter import DbtAdapter

from ..core import default_context
from ..exceptions import RecceException

Expand Down Expand Up @@ -40,7 +43,7 @@ def _verify_dbt_packages_deps(self, dbt_adapter):
self.legacy_surrogate_key = False
break

def _verify_primary_key(self, dbt_adapter: DbtAdapter, primary_key: Union[str, List[str]], model: str):
def _verify_primary_key(self, dbt_adapter, primary_key: Union[str, List[str]], model: str):
self.update_progress(message=f"Verify primary key: {primary_key}")
composite = True if isinstance(primary_key, List) else False

Expand Down Expand Up @@ -85,7 +88,7 @@ def __init__(self, params: ValueDiffParams):
self.connection = None
self.legacy_surrogate_key = True

def _query_value_diff(self, dbt_adpter: DbtAdapter, primary_key: Union[str, List[str]], model: str,
def _query_value_diff(self, dbt_adpter, primary_key: Union[str, List[str]], model: str,
columns: List[str] = None):
column_groups = {}
composite = True if isinstance(primary_key, List) else False
Expand Down Expand Up @@ -232,6 +235,8 @@ def execute(self):
return self._query_value_diff(dbt_adapter, primary_key, model, columns=columns)

def cancel(self):
from recce.adapter.dbt_adapter import DbtAdapter

if self.connection:
adapter: DbtAdapter = default_context().adapter
with adapter.connection_named("cancel"):
Expand Down Expand Up @@ -290,8 +295,7 @@ def __init__(self, params: ValueDiffParams):
self.connection = None
self.legacy_surrogate_key = True

def _query_value_diff(self, dbt_adapter: DbtAdapter, primary_key: Union[str, List[str]], model: str,
columns: List[str] = None):
def _query_value_diff(self, dbt_adapter, primary_key: Union[str, List[str]], model: str, columns: List[str] = None):

composite = True if isinstance(primary_key, List) else False

Expand Down Expand Up @@ -356,6 +360,7 @@ def _query_value_diff(self, dbt_adapter: DbtAdapter, primary_key: Union[str, Lis

def execute(self):

from recce.adapter.dbt_adapter import DbtAdapter
dbt_adapter: DbtAdapter = default_context().adapter

with dbt_adapter.connection_named("value diff"):
Expand All @@ -374,6 +379,7 @@ def execute(self):
return self._query_value_diff(dbt_adapter, primary_key, model, columns)

def cancel(self):
from recce.adapter.dbt_adapter import DbtAdapter
if self.connection:
adapter: DbtAdapter = default_context().adapter
with adapter.connection_named("cancel"):
Expand Down

0 comments on commit 038d508

Please sign in to comment.