diff --git a/recce/adapter/base.py b/recce/adapter/base.py index bb856957..d9269ac9 100644 --- a/recce/adapter/base.py +++ b/recce/adapter/base.py @@ -39,9 +39,12 @@ def find_node_by_name(self, node_name, base=False): for key, node in manifest.nodes.items(): if node.name == node_name: return node - return None + def get_node_by_name(self, node_name): + node = self.find_node_by_name(node_name) or self.find_node_by_name(node_name, base=True) + return node + def get_node_name_by_id(self, unique_id): if unique_id in self.curr_manifest.nodes: return self.curr_manifest.nodes[unique_id].name diff --git a/recce/cli.py b/recce/cli.py index 025c556d..1eee245a 100644 --- a/recce/cli.py +++ b/recce/cli.py @@ -201,7 +201,7 @@ def run(output, **kwargs): @cli.command(cls=TrackCommand) @click.argument('state_file', required=True) @click.option('--format', '-f', help='Output format. Currently only markdown is supported.', - type=click.Choice(['markdown', 'mermaid'], case_sensitive=False), + type=click.Choice(['markdown', 'mermaid', 'check'], case_sensitive=False), default='markdown', show_default=True) def summary(state_file, **kwargs): from rich.console import Console diff --git a/recce/models/run.py b/recce/models/run.py index cc784604..a60a4959 100644 --- a/recce/models/run.py +++ b/recce/models/run.py @@ -1,6 +1,6 @@ from typing import List -from .types import Run +from .types import Run, RunType _runs: List[Run] = [] @@ -20,7 +20,9 @@ def find_run_by_id(self, run_id): return None - def list(self): + def list(self, type_filter: RunType = None): + if type_filter: + return list(filter(lambda run: run.type == type_filter, _runs)) return list(_runs) def list_by_check_id(self, check_id): diff --git a/recce/summary.py b/recce/summary.py index 8d6d08d8..d2903632 100644 --- a/recce/summary.py +++ b/recce/summary.py @@ -1,4 +1,16 @@ -from typing import List, Dict, Set, Union +from typing import List, Dict, Set, Union, Type, Optional +from uuid import UUID + +from pydantic import BaseModel + +from recce.models import CheckDAO, RunDAO, RunType, Run +from recce.tasks.core import TaskResultDiffer +from recce.tasks.histogram import HistogramDiffTaskResultDiffer +from recce.tasks.profile import ProfileDiffResultDiffer +from recce.tasks.query import QueryDiffResultDiffer +from recce.tasks.rowcount import RowCountDiffResultDiffer +from recce.tasks.top_k import TopKDiffTaskResultDiffer +from recce.tasks.valuediff import ValueDiffTaskResultDiffer ADD_COLOR = '#1dce00' MODIFIED_COLOR = '#ffa502' @@ -64,18 +76,78 @@ def append_child(self, child_id: str): if child_id not in self.children: self.children.append(child_id) - def __str__(self): - style = None + def _cal_row_count_delta_percentage(self): + row_count_diff, run_result = _get_node_row_count_diff(self.id, self.name) + if row_count_diff: + base = run_result.get('base', 0) + current = run_result.get('curr', 0) + if int(current) > int(base): + p = (int(current) - int(base)) / int(current) * 100 + return f'🔼 +{round(p, 2) if p > 0.1 else "<0.1"}%' + else: + p = (int(base) - int(current)) / int(current) * 100 + return f'🔽 -{round(p, 2) if p > 0.1 else "<0.1"}%' + return None + + def _get_schema_diff(self): + base_schema = self.base_data.get('columns', {}) + current_schema = self.current_data.get('columns', {}) + schema_diff = TaskResultDiffer.diff(base_schema, current_schema) + return schema_diff + + def _what_changed(self, checks=None): + changes = [] if self.change_status == 'added': - style = f'style {self.id} stroke:{ADD_COLOR}' - elif self.change_status == 'modified': - style = f'style {self.id} stroke:{MODIFIED_COLOR}' + return ['Added Node'] elif self.change_status == 'removed': - style = f'style {self.id} stroke:{REMOVE_COLOR}' + return ['Removed Node'] + elif self.change_status == 'modified': + changes.append('Code') + row_count_delta_percentage = self._cal_row_count_delta_percentage() + if row_count_delta_percentage: + changes.append(f'Row Count {row_count_delta_percentage}') + schema_diff = self._get_schema_diff() + if schema_diff: + changes.append('Schema') + + if checks: + for check in checks: + check_type = check.type + if check_type == RunType.ROW_COUNT_DIFF or check_type == RunType.SCHEMA_DIFF: + # Skip the row count and schema diff check, since we already have it. + continue + if check.node_ids and self.id in check.node_ids: + changes.append(str(check.type).replace('_', ' ').title()) + return changes + + def get_node_str(self, checks=None): + is_changed = False + style = None + if self.change_status is not None: + is_changed = True + if self.change_status == 'added': + style = f'style {self.id} stroke:{ADD_COLOR}' + elif self.change_status == 'modified': + style = f'style {self.id} stroke:{MODIFIED_COLOR}' + elif self.change_status == 'removed': + style = f'style {self.id} stroke:{REMOVE_COLOR}' + + if checks: + for check in checks: + if check.node_ids and self.id in check.node_ids: + is_changed = True + + content_output = f'{self.id}["{self.name}' + if is_changed: + content_output += '\n\n[What\'s Changed]\n' + changes = self._what_changed(checks) + content_output += ', '.join(changes) + + content_output += '"]\n' if style: - return f'{self.id}["{self.name} [{self.change_status.upper()}]"]\n{style}\n' - return f'{self.id}["{self.name}"]\n' + content_output += f'{style}\n' + return content_output class Edge: @@ -96,9 +168,36 @@ def update_edge_from(self, edge_from: str): self.edge_from = 'both' +class CheckSummary(BaseModel): + id: UUID + type: RunType + name: str + description: str + changes: dict + node_ids: Optional[List[str]] + + +check_result_differ_registry: Dict[RunType, Type[TaskResultDiffer]] = { + RunType.VALUE_DIFF: ValueDiffTaskResultDiffer, + RunType.ROW_COUNT_DIFF: RowCountDiffResultDiffer, + RunType.QUERY_DIFF: QueryDiffResultDiffer, + RunType.TOP_K_DIFF: TopKDiffTaskResultDiffer, + RunType.HISTOGRAM_DIFF: HistogramDiffTaskResultDiffer, + RunType.PROFILE_DIFF: ProfileDiffResultDiffer, +} + + +def differ_factory(run: Run): + differ_clz = check_result_differ_registry.get(run.type) + if not differ_clz: + raise NotImplementedError() + return differ_clz(run) + + class LineageGraph: nodes: Dict[str, Node] = {} edges: Dict[str, Edge] = {} + checks: List[CheckSummary] = None def create_node(self, node_id: str, node_data: dict, data_from: str = 'base'): if node_id not in self.nodes: @@ -162,6 +261,76 @@ def _build_lineage_graph(base, current) -> LineageGraph: return graph +def _build_node_schema(lineage, node_id): + return lineage.get('nodes', {}).get(node_id, {}).get('columns', {}) + + +def _get_node_row_count_diff(node_id, node_name): + row_count_runs = RunDAO().list(type_filter=RunType.ROW_COUNT_DIFF) + for run in row_count_runs: + if node_id in run.params.get('node_ids', []): + result = run.result.get(node_name, {}) + diff = TaskResultDiffer.diff(result.get('base'), result.get('curr')) + return diff, result + elif run.params.get('node_id') == node_id: + result = run.result.get(node_name, {}) + diff = TaskResultDiffer.diff(result.get('base'), result.get('curr')) + return diff, result + return None, None + + +# def _get_node_schema_diff(node_id): + + +def generate_check_summary(base_lineage, curr_lineage) -> List[CheckSummary]: + runs = RunDAO().list() + checks = CheckDAO().list() + checks_summary: List[CheckSummary] = [] + + def _find_run_by_check_id(check_id): + for r in runs: + if str(check_id) == str(r.check_id): + return r + return None + + for check in checks: + run = _find_run_by_check_id(check.check_id) + if check.type == RunType.SCHEMA_DIFF: + # TODO: Check schema diff of the selected node + node_id = check.params.get('node_id') + base = _build_node_schema(base_lineage, node_id) + current = _build_node_schema(curr_lineage, node_id) + changes = TaskResultDiffer.diff(base, current) + if changes: + checks_summary.append( + CheckSummary( + id=check.check_id, + type=check.type, + name=check.name, + description=check.description, + changes=changes, + node_ids=[node_id] + ) + ) + elif (check.type in [RunType.ROW_COUNT_DIFF, RunType.QUERY_DIFF, + RunType.VALUE_DIFF, RunType.PROFILE_DIFF, + RunType.TOP_K_DIFF, RunType.HISTOGRAM_DIFF] and run is not None): + # Check the result is changed or not + differ = differ_factory(run) + if differ.changes is not None: + checks_summary.append( + CheckSummary( + id=check.check_id, + type=check.type, + name=check.name, + description=check.description, + changes=differ.changes, + node_ids=differ.related_node_ids) + ) + + return checks_summary + + def generate_mermaid_lineage_graph(graph: LineageGraph): content = 'graph LR\n' # Only show the modified nodes and there children @@ -174,9 +343,10 @@ def generate_mermaid_lineage_graph(graph: LineageGraph): if node_id in display_nodes: # Skip if already displayed continue + display_nodes.add(node_id) node = graph.nodes[node_id] - content += f'{node}' + content += node.get_node_str(graph.checks) for child_id in node.children: queue.append(child_id) edge_id = f'{node_id}-->{child_id}' @@ -191,14 +361,34 @@ def generate_markdown_summary(ctx, summary_format: str = 'markdown'): curr_lineage = ctx.get_lineage(base=False) base_lineage = ctx.get_lineage(base=True) graph = _build_lineage_graph(base_lineage, curr_lineage) + graph.checks = generate_check_summary(base_lineage, curr_lineage) mermaid_content = generate_mermaid_lineage_graph(graph) + check_content = None + + def _formate_changes(changes): + return ",".join([k.replace('_', ' ').title() for k in list(changes.keys())]) + + # Generate the check summary if we found any changes + if len(graph.checks) > 0: + from py_markdown_table.markdown_table import markdown_table + data = [] + for check in graph.checks: + data.append({ + 'Name': check.name, + 'Type': str(check.type).replace('_', ' ').title(), + 'Description': check.description.replace('\n', ' ') or 'N/A', + 'Type of Changes': _formate_changes(check.changes) + }) + check_content = markdown_table(data).set_params(quote=False, row_sep='markdown').get_markdown() if summary_format == 'mermaid': return mermaid_content + elif summary_format == 'check': + return check_content elif summary_format == 'markdown': # TODO: Check the markdown output content is longer than 65535 characters. # If it is, we should reduce the content length. - return f''' + content = f''' # Recce Summary ## Lineage Graph @@ -206,3 +396,9 @@ def generate_markdown_summary(ctx, summary_format: str = 'markdown'): {mermaid_content} ``` ''' + if check_content: + content += f''' +## Impacted Checks +{check_content} +''' + return content diff --git a/recce/tasks/core.py b/recce/tasks/core.py index 7f26ca12..03262577 100644 --- a/recce/tasks/core.py +++ b/recce/tasks/core.py @@ -1,6 +1,9 @@ from abc import ABC, abstractmethod +from typing import List +from recce.core import default_context from recce.exceptions import RecceCancelException +from recce.models import Run class Task(ABC): @@ -40,3 +43,46 @@ def check_cancel(self): def update_progress(self, message=None, percentage=None): if self.progress_listener is not None: self.progress_listener(message=message, percentage=percentage) + + +class TaskResultDiffer(ABC): + related_node_ids: List[str] = None + + def __init__(self, run: Run): + self.run = run + self.changes = self._check_result_changed_fn(run.result) + self.related_node_ids = self._get_related_node_ids() + + @staticmethod + def diff(base, current): + from deepdiff import DeepDiff + diff = DeepDiff(base, current, ignore_order=True) + return diff if diff else None + + @staticmethod + def get_node_id_by_name(name): + node = default_context().adapter.get_node_by_name(name) + return node.unique_id if node else None + + @abstractmethod + def _check_result_changed_fn(self, result): + """ + Check if the result is changed. + Should be implemented by subclass. + """ + raise NotImplementedError() + + def _get_related_node_ids(self) -> List[str] | None: + """ + Get the related node ids. + Should be implemented by subclass. + """ + params = self.run.params + if params.get('model'): + return [TaskResultDiffer.get_node_id_by_name(params.get('model'))] + elif params.get('node_names'): + names = params.get('node_names', []) + return [TaskResultDiffer.get_node_id_by_name(name) for name in names] + else: + # No related node ids in the params + return None diff --git a/recce/tasks/histogram.py b/recce/tasks/histogram.py index cc2bbb61..766c8c82 100644 --- a/recce/tasks/histogram.py +++ b/recce/tasks/histogram.py @@ -7,6 +7,7 @@ from recce.adapter.dbt_adapter import DbtAdapter from recce.core import default_context from recce.tasks import Task +from recce.tasks.core import TaskResultDiffer from recce.tasks.query import QueryMixin sql_datetime_types = [ @@ -359,3 +360,8 @@ def cancel(self): super().cancel() if self.connection: self.close_connection(self.connection) + + +class HistogramDiffTaskResultDiffer(TaskResultDiffer): + def _check_result_changed_fn(self, result): + return TaskResultDiffer.diff(result['base'], result['current']) diff --git a/recce/tasks/profile.py b/recce/tasks/profile.py index 6f792617..e71584ad 100644 --- a/recce/tasks/profile.py +++ b/recce/tasks/profile.py @@ -4,7 +4,7 @@ from dbt.clients.agate_helper import merge_tables from pydantic import BaseModel -from .core import Task +from .core import Task, TaskResultDiffer from .dataframe import DataFrame from ..adapter.dbt_adapter import DbtAdapter from ..core import default_context @@ -125,3 +125,8 @@ def cancel(self): dbt_adapter: DbtAdapter = default_context().adapter with dbt_adapter.connection_named("cancel"): dbt_adapter.cancel(self.connection) + + +class ProfileDiffResultDiffer(TaskResultDiffer): + def _check_result_changed_fn(self, result): + return self.diff(result['base'], result['current']) diff --git a/recce/tasks/query.py b/recce/tasks/query.py index 0ec8a42f..b47d907b 100644 --- a/recce/tasks/query.py +++ b/recce/tasks/query.py @@ -3,7 +3,7 @@ import agate from pydantic import BaseModel -from .core import Task +from .core import Task, TaskResultDiffer from .dataframe import DataFrame from ..core import default_context from ..exceptions import RecceException @@ -166,3 +166,11 @@ def cancel(self): super().cancel() if self.connection: self.close_connection(self.connection) + + +class QueryDiffResultDiffer(TaskResultDiffer): + def _check_result_changed_fn(self, result): + base = result.get('base') + current = result.get('current') + + return TaskResultDiffer.diff(base, current) diff --git a/recce/tasks/rowcount.py b/recce/tasks/rowcount.py index 75bec2f1..fc1c73ff 100644 --- a/recce/tasks/rowcount.py +++ b/recce/tasks/rowcount.py @@ -3,6 +3,7 @@ from recce.adapter.dbt_adapter import DbtAdapter from recce.core import default_context from recce.tasks import Task +from recce.tasks.core import TaskResultDiffer from recce.tasks.query import QueryMixin @@ -113,3 +114,15 @@ def cancel(self): super().cancel() if self.connection: self.close_connection(self.connection) + + +class RowCountDiffResultDiffer(TaskResultDiffer): + def _check_result_changed_fn(self, result): + base = {} + current = {} + + for node, row_counts in result.items(): + base[node] = row_counts['base'] + current[node] = row_counts['curr'] + + return TaskResultDiffer.diff(base, current) diff --git a/recce/tasks/top_k.py b/recce/tasks/top_k.py index ba38b38e..a7970e9e 100644 --- a/recce/tasks/top_k.py +++ b/recce/tasks/top_k.py @@ -3,6 +3,7 @@ from recce.adapter.dbt_adapter import DbtAdapter from recce.core import default_context from recce.tasks import Task +from recce.tasks.core import TaskResultDiffer from recce.tasks.query import QueryMixin @@ -150,3 +151,11 @@ def cancel(self): super().cancel() if self.connection: self.close_connection(self.connection) + + +class TopKDiffTaskResultDiffer(TaskResultDiffer): + def _check_result_changed_fn(self, result): + base = result.get('base') + current = result.get('current') + + return TaskResultDiffer.diff(base, current) diff --git a/recce/tasks/valuediff.py b/recce/tasks/valuediff.py index 76e0dffc..c01ec34e 100644 --- a/recce/tasks/valuediff.py +++ b/recce/tasks/valuediff.py @@ -3,7 +3,7 @@ import agate from pydantic import BaseModel -from .core import Task +from .core import Task, TaskResultDiffer from .dataframe import DataFrame from ..adapter.dbt_adapter import DbtAdapter from ..core import default_context @@ -238,6 +238,40 @@ def cancel(self): adapter.cancel(self.connection) +class ValueDiffTaskResultDiffer(TaskResultDiffer): + + def _check_result_changed_fn(self, result): + is_changed = False + summary = result.get('summary', {}) + added = summary.get('added', 0) + removed = summary.get('removed', 0) + changes = { + 'column_changed': [] + } + + if added > 0: + is_changed = True + changes['row_added'] = added + + if removed > 0: + is_changed = True + changes['row_removed'] = removed + + row_data = result.get('data', {}).get('data', []) + for row in row_data: + column, matched, matched_p = row + if float(matched_p) < 1.0: + # if there is any mismatched, we consider it as changed + is_changed = True + changes['column_changed'].append({ + 'column': column, + 'matched': matched, + 'matched_p': matched_p, + }) + + return changes if is_changed else None + + class ValueDiffDetailParams(TypedDict): primary_key: str model: str diff --git a/setup.py b/setup.py index 6eca77a7..273815aa 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ def _get_version(): 'sentry-sdk', 'watchdog', 'websockets', + 'py-markdown-table', 'python-multipart', 'GitPython', ],