Skip to content

Commit

Permalink
Merge pull request #302 from DataRecce/feature/drc-394-feature-recce-…
Browse files Browse the repository at this point in the history
…summary-support-preset-check-result

[Feature] DRC-394 Add impacted preset checks result into recce summary
  • Loading branch information
kentwelcome authored May 7, 2024
2 parents 3794c81 + 3093117 commit e2577cf
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 18 deletions.
5 changes: 4 additions & 1 deletion recce/adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ def find_node_by_name(self, node_name, base=False):
for key, node in manifest.nodes.items():
if node.name == node_name:
return node

return None

def get_node_by_name(self, node_name):
node = self.find_node_by_name(node_name) or self.find_node_by_name(node_name, base=True)
return node

def get_node_name_by_id(self, unique_id):
if unique_id in self.curr_manifest.nodes:
return self.curr_manifest.nodes[unique_id].name
Expand Down
2 changes: 1 addition & 1 deletion recce/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def run(output, **kwargs):
@cli.command(cls=TrackCommand)
@click.argument('state_file', required=True)
@click.option('--format', '-f', help='Output format. Currently only markdown is supported.',
type=click.Choice(['markdown', 'mermaid'], case_sensitive=False),
type=click.Choice(['markdown', 'mermaid', 'check'], case_sensitive=False),
default='markdown', show_default=True)
def summary(state_file, **kwargs):
from rich.console import Console
Expand Down
6 changes: 4 additions & 2 deletions recce/models/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import List

from .types import Run
from .types import Run, RunType

_runs: List[Run] = []

Expand All @@ -20,7 +20,9 @@ def find_run_by_id(self, run_id):

return None

def list(self):
def list(self, type_filter: RunType = None):
if type_filter:
return list(filter(lambda run: run.type == type_filter, _runs))
return list(_runs)

def list_by_check_id(self, check_id):
Expand Down
218 changes: 207 additions & 11 deletions recce/summary.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
from typing import List, Dict, Set, Union
from typing import List, Dict, Set, Union, Type, Optional
from uuid import UUID

from pydantic import BaseModel

from recce.models import CheckDAO, RunDAO, RunType, Run
from recce.tasks.core import TaskResultDiffer
from recce.tasks.histogram import HistogramDiffTaskResultDiffer
from recce.tasks.profile import ProfileDiffResultDiffer
from recce.tasks.query import QueryDiffResultDiffer
from recce.tasks.rowcount import RowCountDiffResultDiffer
from recce.tasks.top_k import TopKDiffTaskResultDiffer
from recce.tasks.valuediff import ValueDiffTaskResultDiffer

ADD_COLOR = '#1dce00'
MODIFIED_COLOR = '#ffa502'
Expand Down Expand Up @@ -64,18 +76,78 @@ def append_child(self, child_id: str):
if child_id not in self.children:
self.children.append(child_id)

def __str__(self):
style = None
def _cal_row_count_delta_percentage(self):
row_count_diff, run_result = _get_node_row_count_diff(self.id, self.name)
if row_count_diff:
base = run_result.get('base', 0)
current = run_result.get('curr', 0)
if int(current) > int(base):
p = (int(current) - int(base)) / int(current) * 100
return f'🔼 +{round(p, 2) if p > 0.1 else "<0.1"}%'
else:
p = (int(base) - int(current)) / int(current) * 100
return f'🔽 -{round(p, 2) if p > 0.1 else "<0.1"}%'
return None

def _get_schema_diff(self):
base_schema = self.base_data.get('columns', {})
current_schema = self.current_data.get('columns', {})
schema_diff = TaskResultDiffer.diff(base_schema, current_schema)
return schema_diff

def _what_changed(self, checks=None):
changes = []
if self.change_status == 'added':
style = f'style {self.id} stroke:{ADD_COLOR}'
elif self.change_status == 'modified':
style = f'style {self.id} stroke:{MODIFIED_COLOR}'
return ['Added Node']
elif self.change_status == 'removed':
style = f'style {self.id} stroke:{REMOVE_COLOR}'
return ['Removed Node']
elif self.change_status == 'modified':
changes.append('Code')
row_count_delta_percentage = self._cal_row_count_delta_percentage()
if row_count_delta_percentage:
changes.append(f'Row Count {row_count_delta_percentage}')
schema_diff = self._get_schema_diff()
if schema_diff:
changes.append('Schema')

if checks:
for check in checks:
check_type = check.type
if check_type == RunType.ROW_COUNT_DIFF or check_type == RunType.SCHEMA_DIFF:
# Skip the row count and schema diff check, since we already have it.
continue
if check.node_ids and self.id in check.node_ids:
changes.append(str(check.type).replace('_', ' ').title())
return changes

def get_node_str(self, checks=None):
is_changed = False
style = None

if self.change_status is not None:
is_changed = True
if self.change_status == 'added':
style = f'style {self.id} stroke:{ADD_COLOR}'
elif self.change_status == 'modified':
style = f'style {self.id} stroke:{MODIFIED_COLOR}'
elif self.change_status == 'removed':
style = f'style {self.id} stroke:{REMOVE_COLOR}'

if checks:
for check in checks:
if check.node_ids and self.id in check.node_ids:
is_changed = True

content_output = f'{self.id}["{self.name}'
if is_changed:
content_output += '\n\n[What\'s Changed]\n'
changes = self._what_changed(checks)
content_output += ', '.join(changes)

content_output += '"]\n'
if style:
return f'{self.id}["{self.name} [{self.change_status.upper()}]"]\n{style}\n'
return f'{self.id}["{self.name}"]\n'
content_output += f'{style}\n'
return content_output


class Edge:
Expand All @@ -96,9 +168,36 @@ def update_edge_from(self, edge_from: str):
self.edge_from = 'both'


class CheckSummary(BaseModel):
id: UUID
type: RunType
name: str
description: str
changes: dict
node_ids: Optional[List[str]]


check_result_differ_registry: Dict[RunType, Type[TaskResultDiffer]] = {
RunType.VALUE_DIFF: ValueDiffTaskResultDiffer,
RunType.ROW_COUNT_DIFF: RowCountDiffResultDiffer,
RunType.QUERY_DIFF: QueryDiffResultDiffer,
RunType.TOP_K_DIFF: TopKDiffTaskResultDiffer,
RunType.HISTOGRAM_DIFF: HistogramDiffTaskResultDiffer,
RunType.PROFILE_DIFF: ProfileDiffResultDiffer,
}


def differ_factory(run: Run):
differ_clz = check_result_differ_registry.get(run.type)
if not differ_clz:
raise NotImplementedError()
return differ_clz(run)


class LineageGraph:
nodes: Dict[str, Node] = {}
edges: Dict[str, Edge] = {}
checks: List[CheckSummary] = None

def create_node(self, node_id: str, node_data: dict, data_from: str = 'base'):
if node_id not in self.nodes:
Expand Down Expand Up @@ -162,6 +261,76 @@ def _build_lineage_graph(base, current) -> LineageGraph:
return graph


def _build_node_schema(lineage, node_id):
return lineage.get('nodes', {}).get(node_id, {}).get('columns', {})


def _get_node_row_count_diff(node_id, node_name):
row_count_runs = RunDAO().list(type_filter=RunType.ROW_COUNT_DIFF)
for run in row_count_runs:
if node_id in run.params.get('node_ids', []):
result = run.result.get(node_name, {})
diff = TaskResultDiffer.diff(result.get('base'), result.get('curr'))
return diff, result
elif run.params.get('node_id') == node_id:
result = run.result.get(node_name, {})
diff = TaskResultDiffer.diff(result.get('base'), result.get('curr'))
return diff, result
return None, None


# def _get_node_schema_diff(node_id):


def generate_check_summary(base_lineage, curr_lineage) -> List[CheckSummary]:
runs = RunDAO().list()
checks = CheckDAO().list()
checks_summary: List[CheckSummary] = []

def _find_run_by_check_id(check_id):
for r in runs:
if str(check_id) == str(r.check_id):
return r
return None

for check in checks:
run = _find_run_by_check_id(check.check_id)
if check.type == RunType.SCHEMA_DIFF:
# TODO: Check schema diff of the selected node
node_id = check.params.get('node_id')
base = _build_node_schema(base_lineage, node_id)
current = _build_node_schema(curr_lineage, node_id)
changes = TaskResultDiffer.diff(base, current)
if changes:
checks_summary.append(
CheckSummary(
id=check.check_id,
type=check.type,
name=check.name,
description=check.description,
changes=changes,
node_ids=[node_id]
)
)
elif (check.type in [RunType.ROW_COUNT_DIFF, RunType.QUERY_DIFF,
RunType.VALUE_DIFF, RunType.PROFILE_DIFF,
RunType.TOP_K_DIFF, RunType.HISTOGRAM_DIFF] and run is not None):
# Check the result is changed or not
differ = differ_factory(run)
if differ.changes is not None:
checks_summary.append(
CheckSummary(
id=check.check_id,
type=check.type,
name=check.name,
description=check.description,
changes=differ.changes,
node_ids=differ.related_node_ids)
)

return checks_summary


def generate_mermaid_lineage_graph(graph: LineageGraph):
content = 'graph LR\n'
# Only show the modified nodes and there children
Expand All @@ -174,9 +343,10 @@ def generate_mermaid_lineage_graph(graph: LineageGraph):
if node_id in display_nodes:
# Skip if already displayed
continue

display_nodes.add(node_id)
node = graph.nodes[node_id]
content += f'{node}'
content += node.get_node_str(graph.checks)
for child_id in node.children:
queue.append(child_id)
edge_id = f'{node_id}-->{child_id}'
Expand All @@ -191,18 +361,44 @@ def generate_markdown_summary(ctx, summary_format: str = 'markdown'):
curr_lineage = ctx.get_lineage(base=False)
base_lineage = ctx.get_lineage(base=True)
graph = _build_lineage_graph(base_lineage, curr_lineage)
graph.checks = generate_check_summary(base_lineage, curr_lineage)
mermaid_content = generate_mermaid_lineage_graph(graph)
check_content = None

def _formate_changes(changes):
return ",".join([k.replace('_', ' ').title() for k in list(changes.keys())])

# Generate the check summary if we found any changes
if len(graph.checks) > 0:
from py_markdown_table.markdown_table import markdown_table
data = []
for check in graph.checks:
data.append({
'Name': check.name,
'Type': str(check.type).replace('_', ' ').title(),
'Description': check.description.replace('\n', ' ') or 'N/A',
'Type of Changes': _formate_changes(check.changes)
})
check_content = markdown_table(data).set_params(quote=False, row_sep='markdown').get_markdown()

if summary_format == 'mermaid':
return mermaid_content
elif summary_format == 'check':
return check_content
elif summary_format == 'markdown':
# TODO: Check the markdown output content is longer than 65535 characters.
# If it is, we should reduce the content length.
return f'''
content = f'''
# Recce Summary
## Lineage Graph
```mermaid
{mermaid_content}
```
'''
if check_content:
content += f'''
## Impacted Checks
{check_content}
'''
return content
46 changes: 46 additions & 0 deletions recce/tasks/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from abc import ABC, abstractmethod
from typing import List

from recce.core import default_context
from recce.exceptions import RecceCancelException
from recce.models import Run


class Task(ABC):
Expand Down Expand Up @@ -40,3 +43,46 @@ def check_cancel(self):
def update_progress(self, message=None, percentage=None):
if self.progress_listener is not None:
self.progress_listener(message=message, percentage=percentage)


class TaskResultDiffer(ABC):
related_node_ids: List[str] = None

def __init__(self, run: Run):
self.run = run
self.changes = self._check_result_changed_fn(run.result)
self.related_node_ids = self._get_related_node_ids()

@staticmethod
def diff(base, current):
from deepdiff import DeepDiff
diff = DeepDiff(base, current, ignore_order=True)
return diff if diff else None

@staticmethod
def get_node_id_by_name(name):
node = default_context().adapter.get_node_by_name(name)
return node.unique_id if node else None

@abstractmethod
def _check_result_changed_fn(self, result):
"""
Check if the result is changed.
Should be implemented by subclass.
"""
raise NotImplementedError()

def _get_related_node_ids(self) -> List[str] | None:
"""
Get the related node ids.
Should be implemented by subclass.
"""
params = self.run.params
if params.get('model'):
return [TaskResultDiffer.get_node_id_by_name(params.get('model'))]
elif params.get('node_names'):
names = params.get('node_names', [])
return [TaskResultDiffer.get_node_id_by_name(name) for name in names]
else:
# No related node ids in the params
return None
6 changes: 6 additions & 0 deletions recce/tasks/histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from recce.adapter.dbt_adapter import DbtAdapter
from recce.core import default_context
from recce.tasks import Task
from recce.tasks.core import TaskResultDiffer
from recce.tasks.query import QueryMixin

sql_datetime_types = [
Expand Down Expand Up @@ -359,3 +360,8 @@ def cancel(self):
super().cancel()
if self.connection:
self.close_connection(self.connection)


class HistogramDiffTaskResultDiffer(TaskResultDiffer):
def _check_result_changed_fn(self, result):
return TaskResultDiffer.diff(result['base'], result['current'])
Loading

0 comments on commit e2577cf

Please sign in to comment.