From a5fdc0951b3453608a945b394a4501a80c3abb21 Mon Sep 17 00:00:00 2001 From: Kent Huang Date: Thu, 2 May 2024 16:32:29 +0800 Subject: [PATCH 1/8] [Feature] DRC-394 Add impacted preset checks result into recce summary Signed-off-by: Kent Huang --- recce/cli.py | 2 +- recce/summary.py | 112 ++++++++++++++++++++++++++++++++++++++- recce/tasks/core.py | 20 +++++++ recce/tasks/histogram.py | 6 +++ recce/tasks/query.py | 10 +++- recce/tasks/rowcount.py | 13 +++++ recce/tasks/top_k.py | 9 ++++ recce/tasks/valuediff.py | 36 ++++++++++++- setup.py | 1 + 9 files changed, 204 insertions(+), 5 deletions(-) diff --git a/recce/cli.py b/recce/cli.py index 025c556d..a43988f0 100644 --- a/recce/cli.py +++ b/recce/cli.py @@ -201,7 +201,7 @@ def run(output, **kwargs): @cli.command(cls=TrackCommand) @click.argument('state_file', required=True) @click.option('--format', '-f', help='Output format. Currently only markdown is supported.', - type=click.Choice(['markdown', 'mermaid'], case_sensitive=False), + type=click.Choice(['markdown', 'mermaid', 'preset'], case_sensitive=False), default='markdown', show_default=True) def summary(state_file, **kwargs): from rich.console import Console diff --git a/recce/summary.py b/recce/summary.py index 8d6d08d8..bfb83354 100644 --- a/recce/summary.py +++ b/recce/summary.py @@ -1,4 +1,15 @@ -from typing import List, Dict, Set, Union +from typing import List, Dict, Set, Union, Type +from uuid import UUID + +from pydantic import BaseModel + +from recce.models import CheckDAO, RunDAO, RunType +from recce.tasks.core import TaskResultDiffer +from recce.tasks.histogram import HistogramDiffTaskResultDiffer +from recce.tasks.query import QueryDiffResultDiffer +from recce.tasks.rowcount import RowCountDiffResultDiffer +from recce.tasks.top_k import TopKDiffTaskResultDiffer +from recce.tasks.valuediff import ValueDiffTaskResultDiffer ADD_COLOR = '#1dce00' MODIFIED_COLOR = '#ffa502' @@ -96,6 +107,30 @@ def update_edge_from(self, edge_from: str): self.edge_from = 'both' +class CheckSummary(BaseModel): + id: UUID + type: RunType + name: str + description: str + changes: dict + + +check_result_differ_registry: Dict[RunType, Type[TaskResultDiffer]] = { + RunType.VALUE_DIFF: ValueDiffTaskResultDiffer, + RunType.ROW_COUNT_DIFF: RowCountDiffResultDiffer, + RunType.QUERY_DIFF: QueryDiffResultDiffer, + RunType.TOP_K_DIFF: TopKDiffTaskResultDiffer, + RunType.HISTOGRAM_DIFF: HistogramDiffTaskResultDiffer, +} + + +def differ_factory(run_type: RunType, result): + differ_clz = check_result_differ_registry.get(run_type) + if not differ_clz: + raise NotImplementedError() + return differ_clz(result) + + class LineageGraph: nodes: Dict[str, Node] = {} edges: Dict[str, Edge] = {} @@ -162,6 +197,54 @@ def _build_lineage_graph(base, current) -> LineageGraph: return graph +def _build_node_schema(lineage, node_id): + return lineage.get('nodes', {}).get(node_id, {}).get('columns', {}) + + +def generate_preset_check_summary(base_lineage, curr_lineage) -> List[CheckSummary]: + runs = RunDAO().list() + preset_checks = [check for check in CheckDAO().list() if check.is_preset is True] + preset_checks_summary: List[CheckSummary] = [] + + def _find_run_by_check_id(check_id): + for r in runs: + if str(check_id) == str(r.check_id): + return r + return None + + for check in preset_checks: + run = _find_run_by_check_id(check.check_id) + if check.type == RunType.SCHEMA_DIFF: + # TODO: Check schema diff of the selected node + node_id = check.params.get('node_id') + base = _build_node_schema(base_lineage, node_id) + current = _build_node_schema(curr_lineage, node_id) + changes = TaskResultDiffer.diff(base, current) + if changes: + preset_checks_summary.append( + CheckSummary( + id=check.check_id, + type=check.type, + name=check.name, + description=check.description, + changes=changes) + ) + elif str(check.type).endswith('_diff') and run is not None: + # Check the result is changed or not + differ = differ_factory(check.type, run.result) + if differ.changes is not None: + preset_checks_summary.append( + CheckSummary( + id=check.check_id, + type=check.type, + name=check.name, + description=check.description, + changes=differ.changes) + ) + + return preset_checks_summary + + def generate_mermaid_lineage_graph(graph: LineageGraph): content = 'graph LR\n' # Only show the modified nodes and there children @@ -191,14 +274,33 @@ def generate_markdown_summary(ctx, summary_format: str = 'markdown'): curr_lineage = ctx.get_lineage(base=False) base_lineage = ctx.get_lineage(base=True) graph = _build_lineage_graph(base_lineage, curr_lineage) + preset_checks = generate_preset_check_summary(base_lineage, curr_lineage) + preset_content = None mermaid_content = generate_mermaid_lineage_graph(graph) + def _formate_changes(changes): + return ",".join([k.replace('_', ' ').title() for k in list(changes.keys())]) + + if len(preset_checks) > 0: + from py_markdown_table.markdown_table import markdown_table + data = [] + for check in preset_checks: + data.append({ + 'Name': check.name, + 'Type': str(check.type).replace('_', ' ').title(), + 'Description': check.description or 'N/A', + 'Type of Changes': _formate_changes(check.changes) + }) + preset_content = markdown_table(data).set_params(quote=False, row_sep='markdown').get_markdown() + if summary_format == 'mermaid': return mermaid_content + elif summary_format == 'preset': + return preset_content elif summary_format == 'markdown': # TODO: Check the markdown output content is longer than 65535 characters. # If it is, we should reduce the content length. - return f''' + content = f''' # Recce Summary ## Lineage Graph @@ -206,3 +308,9 @@ def generate_markdown_summary(ctx, summary_format: str = 'markdown'): {mermaid_content} ``` ''' + if preset_content: + content += f''' +## Impacted Preset Checks +{preset_content} +''' + return content diff --git a/recce/tasks/core.py b/recce/tasks/core.py index 7f26ca12..974dbb89 100644 --- a/recce/tasks/core.py +++ b/recce/tasks/core.py @@ -40,3 +40,23 @@ def check_cancel(self): def update_progress(self, message=None, percentage=None): if self.progress_listener is not None: self.progress_listener(message=message, percentage=percentage) + + +class TaskResultDiffer(ABC): + def __init__(self, result): + self.result = result + self.changes = self._check_result_changed_fn(result) + + @staticmethod + def diff(base, current): + from deepdiff import DeepDiff + diff = DeepDiff(base, current, ignore_order=True) + return diff if diff else None + + @abstractmethod + def _check_result_changed_fn(self, result): + """ + Check if the result is changed. + Should be implemented by subclass. + """ + raise NotImplementedError() diff --git a/recce/tasks/histogram.py b/recce/tasks/histogram.py index cc2bbb61..766c8c82 100644 --- a/recce/tasks/histogram.py +++ b/recce/tasks/histogram.py @@ -7,6 +7,7 @@ from recce.adapter.dbt_adapter import DbtAdapter from recce.core import default_context from recce.tasks import Task +from recce.tasks.core import TaskResultDiffer from recce.tasks.query import QueryMixin sql_datetime_types = [ @@ -359,3 +360,8 @@ def cancel(self): super().cancel() if self.connection: self.close_connection(self.connection) + + +class HistogramDiffTaskResultDiffer(TaskResultDiffer): + def _check_result_changed_fn(self, result): + return TaskResultDiffer.diff(result['base'], result['current']) diff --git a/recce/tasks/query.py b/recce/tasks/query.py index 0ec8a42f..b47d907b 100644 --- a/recce/tasks/query.py +++ b/recce/tasks/query.py @@ -3,7 +3,7 @@ import agate from pydantic import BaseModel -from .core import Task +from .core import Task, TaskResultDiffer from .dataframe import DataFrame from ..core import default_context from ..exceptions import RecceException @@ -166,3 +166,11 @@ def cancel(self): super().cancel() if self.connection: self.close_connection(self.connection) + + +class QueryDiffResultDiffer(TaskResultDiffer): + def _check_result_changed_fn(self, result): + base = result.get('base') + current = result.get('current') + + return TaskResultDiffer.diff(base, current) diff --git a/recce/tasks/rowcount.py b/recce/tasks/rowcount.py index 75bec2f1..fc1c73ff 100644 --- a/recce/tasks/rowcount.py +++ b/recce/tasks/rowcount.py @@ -3,6 +3,7 @@ from recce.adapter.dbt_adapter import DbtAdapter from recce.core import default_context from recce.tasks import Task +from recce.tasks.core import TaskResultDiffer from recce.tasks.query import QueryMixin @@ -113,3 +114,15 @@ def cancel(self): super().cancel() if self.connection: self.close_connection(self.connection) + + +class RowCountDiffResultDiffer(TaskResultDiffer): + def _check_result_changed_fn(self, result): + base = {} + current = {} + + for node, row_counts in result.items(): + base[node] = row_counts['base'] + current[node] = row_counts['curr'] + + return TaskResultDiffer.diff(base, current) diff --git a/recce/tasks/top_k.py b/recce/tasks/top_k.py index ba38b38e..a7970e9e 100644 --- a/recce/tasks/top_k.py +++ b/recce/tasks/top_k.py @@ -3,6 +3,7 @@ from recce.adapter.dbt_adapter import DbtAdapter from recce.core import default_context from recce.tasks import Task +from recce.tasks.core import TaskResultDiffer from recce.tasks.query import QueryMixin @@ -150,3 +151,11 @@ def cancel(self): super().cancel() if self.connection: self.close_connection(self.connection) + + +class TopKDiffTaskResultDiffer(TaskResultDiffer): + def _check_result_changed_fn(self, result): + base = result.get('base') + current = result.get('current') + + return TaskResultDiffer.diff(base, current) diff --git a/recce/tasks/valuediff.py b/recce/tasks/valuediff.py index 76e0dffc..c01ec34e 100644 --- a/recce/tasks/valuediff.py +++ b/recce/tasks/valuediff.py @@ -3,7 +3,7 @@ import agate from pydantic import BaseModel -from .core import Task +from .core import Task, TaskResultDiffer from .dataframe import DataFrame from ..adapter.dbt_adapter import DbtAdapter from ..core import default_context @@ -238,6 +238,40 @@ def cancel(self): adapter.cancel(self.connection) +class ValueDiffTaskResultDiffer(TaskResultDiffer): + + def _check_result_changed_fn(self, result): + is_changed = False + summary = result.get('summary', {}) + added = summary.get('added', 0) + removed = summary.get('removed', 0) + changes = { + 'column_changed': [] + } + + if added > 0: + is_changed = True + changes['row_added'] = added + + if removed > 0: + is_changed = True + changes['row_removed'] = removed + + row_data = result.get('data', {}).get('data', []) + for row in row_data: + column, matched, matched_p = row + if float(matched_p) < 1.0: + # if there is any mismatched, we consider it as changed + is_changed = True + changes['column_changed'].append({ + 'column': column, + 'matched': matched, + 'matched_p': matched_p, + }) + + return changes if is_changed else None + + class ValueDiffDetailParams(TypedDict): primary_key: str model: str diff --git a/setup.py b/setup.py index 6eca77a7..273815aa 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ def _get_version(): 'sentry-sdk', 'watchdog', 'websockets', + 'py-markdown-table', 'python-multipart', 'GitPython', ], From b1221d16c668e5836338d90837080b65d30e69b1 Mon Sep 17 00:00:00 2001 From: Kent Huang Date: Fri, 3 May 2024 03:14:53 +0800 Subject: [PATCH 2/8] [Feature] DRC-394 Add Row Count and Schema changed info - refine Lineage Graph to add row count and schema change info Signed-off-by: Kent Huang --- recce/models/run.py | 6 +++-- recce/summary.py | 59 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/recce/models/run.py b/recce/models/run.py index cc784604..a60a4959 100644 --- a/recce/models/run.py +++ b/recce/models/run.py @@ -1,6 +1,6 @@ from typing import List -from .types import Run +from .types import Run, RunType _runs: List[Run] = [] @@ -20,7 +20,9 @@ def find_run_by_id(self, run_id): return None - def list(self): + def list(self, type_filter: RunType = None): + if type_filter: + return list(filter(lambda run: run.type == type_filter, _runs)) return list(_runs) def list_by_check_id(self, check_id): diff --git a/recce/summary.py b/recce/summary.py index bfb83354..d2763273 100644 --- a/recce/summary.py +++ b/recce/summary.py @@ -75,8 +75,44 @@ def append_child(self, child_id: str): if child_id not in self.children: self.children.append(child_id) + def _cal_row_count_delta_percentage(self): + row_count_diff, run_result = _get_node_row_count_diff(self.id) + if row_count_diff: + base = run_result.get('base', 0) + current = run_result.get('current', 0) + if int(current) > int(base): + return f'+{(int(current) - int(base)) / int(current) * 100}%' + else: + return f'-{(int(base) - int(current)) / int(base) * 100}%' + return None + + def _get_schema_diff(self): + base_schema = self.base_data.get('columns', {}) + current_schema = self.current_data.get('columns', {}) + schema_diff = TaskResultDiffer.diff(base_schema, current_schema) + return schema_diff + + def _what_changed(self): + changes = [] + if self.change_status == 'added': + return ['Added Node'] + elif self.change_status == 'removed': + return ['Removed Node'] + elif self.change_status == 'modified': + changes.append('Code') + row_count_delta_percentage = self._cal_row_count_delta_percentage() + if row_count_delta_percentage: + changes.append(f'Row Count {row_count_delta_percentage}') + schema_diff = self._get_schema_diff() + if schema_diff: + changes.append('Schema') + return changes + def __str__(self): style = None + row_count_delta_percentage = self._cal_row_count_delta_percentage() + schema_diff = self._get_schema_diff() + if self.change_status == 'added': style = f'style {self.id} stroke:{ADD_COLOR}' elif self.change_status == 'modified': @@ -85,7 +121,9 @@ def __str__(self): style = f'style {self.id} stroke:{REMOVE_COLOR}' if style: - return f'{self.id}["{self.name} [{self.change_status.upper()}]"]\n{style}\n' + output = f'{self.id}["{self.name}\n\n[What\'s Changed]\n' + changes = self._what_changed() + return output + ', '.join(changes) + f'"]\n{style}\n' return f'{self.id}["{self.name}"]\n' @@ -201,6 +239,19 @@ def _build_node_schema(lineage, node_id): return lineage.get('nodes', {}).get(node_id, {}).get('columns', {}) +def _get_node_row_count_diff(node_id): + row_count_runs = RunDAO().list(type_filter=RunType.ROW_COUNT_DIFF) + for run in row_count_runs: + if run.params.get('node_id') == node_id: + result = run.result.items() + diff = TaskResultDiffer.diff(result.get('base'), result.get('curr')) + return diff, result + return None, None + + +# def _get_node_schema_diff(node_id): + + def generate_preset_check_summary(base_lineage, curr_lineage) -> List[CheckSummary]: runs = RunDAO().list() preset_checks = [check for check in CheckDAO().list() if check.is_preset is True] @@ -245,7 +296,7 @@ def _find_run_by_check_id(check_id): return preset_checks_summary -def generate_mermaid_lineage_graph(graph: LineageGraph): +def generate_mermaid_lineage_graph(graph: LineageGraph, preset_checks=None): content = 'graph LR\n' # Only show the modified nodes and there children queue = list(graph.modified_set) @@ -257,6 +308,7 @@ def generate_mermaid_lineage_graph(graph: LineageGraph): if node_id in display_nodes: # Skip if already displayed continue + display_nodes.add(node_id) node = graph.nodes[node_id] content += f'{node}' @@ -275,12 +327,13 @@ def generate_markdown_summary(ctx, summary_format: str = 'markdown'): base_lineage = ctx.get_lineage(base=True) graph = _build_lineage_graph(base_lineage, curr_lineage) preset_checks = generate_preset_check_summary(base_lineage, curr_lineage) + mermaid_content = generate_mermaid_lineage_graph(graph, preset_checks) preset_content = None - mermaid_content = generate_mermaid_lineage_graph(graph) def _formate_changes(changes): return ",".join([k.replace('_', ' ').title() for k in list(changes.keys())]) + # Generate preset check summary if we found any changes if len(preset_checks) > 0: from py_markdown_table.markdown_table import markdown_table data = [] From 22c9c0bbc61a67e749bf33aeed9b2c247222c45b Mon Sep 17 00:00:00 2001 From: Kent Huang Date: Fri, 3 May 2024 03:20:13 +0800 Subject: [PATCH 3/8] [Fix] flake8 issues Signed-off-by: Kent Huang --- recce/summary.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/recce/summary.py b/recce/summary.py index d2763273..fcc062ff 100644 --- a/recce/summary.py +++ b/recce/summary.py @@ -110,8 +110,6 @@ def _what_changed(self): def __str__(self): style = None - row_count_delta_percentage = self._cal_row_count_delta_percentage() - schema_diff = self._get_schema_diff() if self.change_status == 'added': style = f'style {self.id} stroke:{ADD_COLOR}' From 8d556b87971f4d7afdd01479ae0985ce74e30073 Mon Sep 17 00:00:00 2001 From: Kent Huang Date: Fri, 3 May 2024 12:33:06 +0800 Subject: [PATCH 4/8] [Feature] DRC-394 Add preset checks changes on lineage graph node Signed-off-by: Kent Huang --- recce/adapter/base.py | 5 ++- recce/summary.py | 86 ++++++++++++++++++++++++++-------------- recce/tasks/core.py | 25 ++++++++++-- recce/tasks/histogram.py | 3 ++ recce/tasks/profile.py | 10 ++++- recce/tasks/query.py | 4 ++ recce/tasks/rowcount.py | 5 +++ recce/tasks/top_k.py | 3 ++ recce/tasks/valuediff.py | 3 ++ 9 files changed, 110 insertions(+), 34 deletions(-) diff --git a/recce/adapter/base.py b/recce/adapter/base.py index bb856957..d9269ac9 100644 --- a/recce/adapter/base.py +++ b/recce/adapter/base.py @@ -39,9 +39,12 @@ def find_node_by_name(self, node_name, base=False): for key, node in manifest.nodes.items(): if node.name == node_name: return node - return None + def get_node_by_name(self, node_name): + node = self.find_node_by_name(node_name) or self.find_node_by_name(node_name, base=True) + return node + def get_node_name_by_id(self, unique_id): if unique_id in self.curr_manifest.nodes: return self.curr_manifest.nodes[unique_id].name diff --git a/recce/summary.py b/recce/summary.py index fcc062ff..9a96cc11 100644 --- a/recce/summary.py +++ b/recce/summary.py @@ -1,11 +1,12 @@ -from typing import List, Dict, Set, Union, Type +from typing import List, Dict, Set, Union, Type, Optional from uuid import UUID from pydantic import BaseModel -from recce.models import CheckDAO, RunDAO, RunType +from recce.models import CheckDAO, RunDAO, RunType, Run from recce.tasks.core import TaskResultDiffer from recce.tasks.histogram import HistogramDiffTaskResultDiffer +from recce.tasks.profile import ProfileDiffResultDiffer from recce.tasks.query import QueryDiffResultDiffer from recce.tasks.rowcount import RowCountDiffResultDiffer from recce.tasks.top_k import TopKDiffTaskResultDiffer @@ -92,7 +93,7 @@ def _get_schema_diff(self): schema_diff = TaskResultDiffer.diff(base_schema, current_schema) return schema_diff - def _what_changed(self): + def _what_changed(self, checks=None): changes = [] if self.change_status == 'added': return ['Added Node'] @@ -106,23 +107,41 @@ def _what_changed(self): schema_diff = self._get_schema_diff() if schema_diff: changes.append('Schema') + + if checks: + for check in checks: + if check.node_ids and self.id in check.node_ids: + changes.append(str(check.type).replace('_', ' ').title()) return changes - def __str__(self): + def get_node_str(self, checks=None): + is_changed = False style = None - if self.change_status == 'added': - style = f'style {self.id} stroke:{ADD_COLOR}' - elif self.change_status == 'modified': - style = f'style {self.id} stroke:{MODIFIED_COLOR}' - elif self.change_status == 'removed': - style = f'style {self.id} stroke:{REMOVE_COLOR}' - + if self.change_status is not None: + is_changed = True + if self.change_status == 'added': + style = f'style {self.id} stroke:{ADD_COLOR}' + elif self.change_status == 'modified': + style = f'style {self.id} stroke:{MODIFIED_COLOR}' + elif self.change_status == 'removed': + style = f'style {self.id} stroke:{REMOVE_COLOR}' + + if checks: + for check in checks: + if check.node_ids and self.id in check.node_ids: + is_changed = True + + content_output = f'{self.id}["{self.name}' + if is_changed: + content_output += '\n\n[What\'s Changed]\n' + changes = self._what_changed(checks) + content_output += ', '.join(changes) + + content_output += '"]\n' if style: - output = f'{self.id}["{self.name}\n\n[What\'s Changed]\n' - changes = self._what_changed() - return output + ', '.join(changes) + f'"]\n{style}\n' - return f'{self.id}["{self.name}"]\n' + content_output += f'{style}\n' + return content_output class Edge: @@ -149,6 +168,7 @@ class CheckSummary(BaseModel): name: str description: str changes: dict + node_ids: Optional[List[str]] check_result_differ_registry: Dict[RunType, Type[TaskResultDiffer]] = { @@ -157,19 +177,21 @@ class CheckSummary(BaseModel): RunType.QUERY_DIFF: QueryDiffResultDiffer, RunType.TOP_K_DIFF: TopKDiffTaskResultDiffer, RunType.HISTOGRAM_DIFF: HistogramDiffTaskResultDiffer, + RunType.PROFILE_DIFF: ProfileDiffResultDiffer, } -def differ_factory(run_type: RunType, result): - differ_clz = check_result_differ_registry.get(run_type) +def differ_factory(run: Run): + differ_clz = check_result_differ_registry.get(run.type) if not differ_clz: raise NotImplementedError() - return differ_clz(result) + return differ_clz(run) class LineageGraph: nodes: Dict[str, Node] = {} edges: Dict[str, Edge] = {} + checks: List[CheckSummary] = None def create_node(self, node_id: str, node_data: dict, data_from: str = 'base'): if node_id not in self.nodes: @@ -276,11 +298,16 @@ def _find_run_by_check_id(check_id): type=check.type, name=check.name, description=check.description, - changes=changes) + changes=changes, + node_ids=[node_id] + ) ) - elif str(check.type).endswith('_diff') and run is not None: + elif (check.type in [RunType.ROW_COUNT_DIFF, RunType.QUERY_DIFF, + RunType.VALUE_DIFF, RunType.PROFILE_DIFF, + RunType.TOP_K_DIFF, RunType.HISTOGRAM_DIFF] + and run is not None): # Check the result is changed or not - differ = differ_factory(check.type, run.result) + differ = differ_factory(run) if differ.changes is not None: preset_checks_summary.append( CheckSummary( @@ -288,13 +315,14 @@ def _find_run_by_check_id(check_id): type=check.type, name=check.name, description=check.description, - changes=differ.changes) + changes=differ.changes, + node_ids=differ.related_node_ids) ) return preset_checks_summary -def generate_mermaid_lineage_graph(graph: LineageGraph, preset_checks=None): +def generate_mermaid_lineage_graph(graph: LineageGraph): content = 'graph LR\n' # Only show the modified nodes and there children queue = list(graph.modified_set) @@ -309,7 +337,7 @@ def generate_mermaid_lineage_graph(graph: LineageGraph, preset_checks=None): display_nodes.add(node_id) node = graph.nodes[node_id] - content += f'{node}' + content += node.get_node_str(graph.checks) for child_id in node.children: queue.append(child_id) edge_id = f'{node_id}-->{child_id}' @@ -324,22 +352,22 @@ def generate_markdown_summary(ctx, summary_format: str = 'markdown'): curr_lineage = ctx.get_lineage(base=False) base_lineage = ctx.get_lineage(base=True) graph = _build_lineage_graph(base_lineage, curr_lineage) - preset_checks = generate_preset_check_summary(base_lineage, curr_lineage) - mermaid_content = generate_mermaid_lineage_graph(graph, preset_checks) + graph.checks = generate_preset_check_summary(base_lineage, curr_lineage) + mermaid_content = generate_mermaid_lineage_graph(graph) preset_content = None def _formate_changes(changes): return ",".join([k.replace('_', ' ').title() for k in list(changes.keys())]) # Generate preset check summary if we found any changes - if len(preset_checks) > 0: + if len(graph.checks) > 0: from py_markdown_table.markdown_table import markdown_table data = [] - for check in preset_checks: + for check in graph.checks: data.append({ 'Name': check.name, 'Type': str(check.type).replace('_', ' ').title(), - 'Description': check.description or 'N/A', + 'Description': check.description.replace('\n', ' ') or 'N/A', 'Type of Changes': _formate_changes(check.changes) }) preset_content = markdown_table(data).set_params(quote=False, row_sep='markdown').get_markdown() diff --git a/recce/tasks/core.py b/recce/tasks/core.py index 974dbb89..94b7ac38 100644 --- a/recce/tasks/core.py +++ b/recce/tasks/core.py @@ -1,6 +1,9 @@ from abc import ABC, abstractmethod +from typing import List +from recce.core import default_context from recce.exceptions import RecceCancelException +from recce.models import Run class Task(ABC): @@ -43,9 +46,12 @@ def update_progress(self, message=None, percentage=None): class TaskResultDiffer(ABC): - def __init__(self, result): - self.result = result - self.changes = self._check_result_changed_fn(result) + related_node_ids: List[str] = None + + def __init__(self, run: Run): + self.run = run + self.changes = self._check_result_changed_fn(run.result) + self.related_node_ids = self._get_related_node_ids(run.params) @staticmethod def diff(base, current): @@ -53,6 +59,11 @@ def diff(base, current): diff = DeepDiff(base, current, ignore_order=True) return diff if diff else None + @staticmethod + def get_node_ids_by_name(name): + node = default_context().adapter.get_node_by_name(name) + return [node.unique_id] if node else None + @abstractmethod def _check_result_changed_fn(self, result): """ @@ -60,3 +71,11 @@ def _check_result_changed_fn(self, result): Should be implemented by subclass. """ raise NotImplementedError() + + @abstractmethod + def _get_related_node_ids(self, params): + """ + Get the related node ids. + Should be implemented by subclass. + """ + raise NotImplementedError() diff --git a/recce/tasks/histogram.py b/recce/tasks/histogram.py index 766c8c82..a70c3bc1 100644 --- a/recce/tasks/histogram.py +++ b/recce/tasks/histogram.py @@ -365,3 +365,6 @@ def cancel(self): class HistogramDiffTaskResultDiffer(TaskResultDiffer): def _check_result_changed_fn(self, result): return TaskResultDiffer.diff(result['base'], result['current']) + + def _get_related_node_ids(self, params): + return TaskResultDiffer.get_node_ids_by_name(params.get('model')) diff --git a/recce/tasks/profile.py b/recce/tasks/profile.py index 6f792617..d4b6a632 100644 --- a/recce/tasks/profile.py +++ b/recce/tasks/profile.py @@ -4,7 +4,7 @@ from dbt.clients.agate_helper import merge_tables from pydantic import BaseModel -from .core import Task +from .core import Task, TaskResultDiffer from .dataframe import DataFrame from ..adapter.dbt_adapter import DbtAdapter from ..core import default_context @@ -125,3 +125,11 @@ def cancel(self): dbt_adapter: DbtAdapter = default_context().adapter with dbt_adapter.connection_named("cancel"): dbt_adapter.cancel(self.connection) + + +class ProfileDiffResultDiffer(TaskResultDiffer): + def _check_result_changed_fn(self, result): + return self.diff(result['base'], result['current']) + + def _get_related_node_ids(self, params): + return TaskResultDiffer.get_node_ids_by_name(params.get('model')) diff --git a/recce/tasks/query.py b/recce/tasks/query.py index b47d907b..6a169915 100644 --- a/recce/tasks/query.py +++ b/recce/tasks/query.py @@ -174,3 +174,7 @@ def _check_result_changed_fn(self, result): current = result.get('current') return TaskResultDiffer.diff(base, current) + + def _get_related_node_ids(self, params): + # QueryDiffTask does not have related node ids + return None diff --git a/recce/tasks/rowcount.py b/recce/tasks/rowcount.py index fc1c73ff..2bef3737 100644 --- a/recce/tasks/rowcount.py +++ b/recce/tasks/rowcount.py @@ -126,3 +126,8 @@ def _check_result_changed_fn(self, result): current[node] = row_counts['curr'] return TaskResultDiffer.diff(base, current) + + def _get_related_node_ids(self, params): + node_names = params.get('node_names', []) + nodes = [default_context().adapter.get_node_by_name(node_name) for node_name in node_names] + return [node.unique_id for node in nodes] diff --git a/recce/tasks/top_k.py b/recce/tasks/top_k.py index a7970e9e..5a94e216 100644 --- a/recce/tasks/top_k.py +++ b/recce/tasks/top_k.py @@ -159,3 +159,6 @@ def _check_result_changed_fn(self, result): current = result.get('current') return TaskResultDiffer.diff(base, current) + + def _get_related_node_ids(self, params): + return TaskResultDiffer.get_node_ids_by_name(params.get('model')) diff --git a/recce/tasks/valuediff.py b/recce/tasks/valuediff.py index c01ec34e..e255140d 100644 --- a/recce/tasks/valuediff.py +++ b/recce/tasks/valuediff.py @@ -271,6 +271,9 @@ def _check_result_changed_fn(self, result): return changes if is_changed else None + def _get_related_node_ids(self, params): + return TaskResultDiffer.get_node_ids_by_name(params.get('model')) + class ValueDiffDetailParams(TypedDict): primary_key: str From 5185055874c6f398b67a14e3319e6d862c990fa3 Mon Sep 17 00:00:00 2001 From: Kent Huang Date: Fri, 3 May 2024 12:34:52 +0800 Subject: [PATCH 5/8] [Fix] flake8 issue Signed-off-by: Kent Huang --- recce/summary.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/recce/summary.py b/recce/summary.py index 9a96cc11..4be31ec8 100644 --- a/recce/summary.py +++ b/recce/summary.py @@ -304,8 +304,7 @@ def _find_run_by_check_id(check_id): ) elif (check.type in [RunType.ROW_COUNT_DIFF, RunType.QUERY_DIFF, RunType.VALUE_DIFF, RunType.PROFILE_DIFF, - RunType.TOP_K_DIFF, RunType.HISTOGRAM_DIFF] - and run is not None): + RunType.TOP_K_DIFF, RunType.HISTOGRAM_DIFF] and run is not None): # Check the result is changed or not differ = differ_factory(run) if differ.changes is not None: From 3219bc7fad7378f248d356eb4a5fa51cc7ddd2ce Mon Sep 17 00:00:00 2001 From: Kent Huang Date: Fri, 3 May 2024 17:18:42 +0800 Subject: [PATCH 6/8] [Fix] Show the correct row count increase/decrease persistent Signed-off-by: Kent Huang --- recce/summary.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/recce/summary.py b/recce/summary.py index 4be31ec8..686d93cd 100644 --- a/recce/summary.py +++ b/recce/summary.py @@ -77,14 +77,16 @@ def append_child(self, child_id: str): self.children.append(child_id) def _cal_row_count_delta_percentage(self): - row_count_diff, run_result = _get_node_row_count_diff(self.id) + row_count_diff, run_result = _get_node_row_count_diff(self.id, self.name) if row_count_diff: base = run_result.get('base', 0) - current = run_result.get('current', 0) + current = run_result.get('curr', 0) if int(current) > int(base): - return f'+{(int(current) - int(base)) / int(current) * 100}%' + p = (int(current) - int(base)) / int(current) * 100 + return f'🔼 +{round(p, 2) if p > 0.1 else "<0.1"}%' else: - return f'-{(int(base) - int(current)) / int(base) * 100}%' + p = (int(base) - int(current)) / int(current) * 100 + return f'🔽 -{round(p, 2) if p > 0.1 else "<0.1"}%' return None def _get_schema_diff(self): @@ -259,11 +261,15 @@ def _build_node_schema(lineage, node_id): return lineage.get('nodes', {}).get(node_id, {}).get('columns', {}) -def _get_node_row_count_diff(node_id): +def _get_node_row_count_diff(node_id, node_name): row_count_runs = RunDAO().list(type_filter=RunType.ROW_COUNT_DIFF) for run in row_count_runs: - if run.params.get('node_id') == node_id: - result = run.result.items() + if node_id in run.params.get('node_ids', []): + result = run.result.get(node_name, {}) + diff = TaskResultDiffer.diff(result.get('base'), result.get('curr')) + return diff, result + elif run.params.get('node_id') == node_id: + result = run.result.get(node_name, {}) diff = TaskResultDiffer.diff(result.get('base'), result.get('curr')) return diff, result return None, None From cb0c5b39c9636ca6d31231f2636fb342ef398d8c Mon Sep 17 00:00:00 2001 From: Kent Huang Date: Fri, 3 May 2024 17:41:33 +0800 Subject: [PATCH 7/8] [Refine] code of generate lineage graph Signed-off-by: Kent Huang --- recce/tasks/core.py | 19 +++++++++++++------ recce/tasks/histogram.py | 3 --- recce/tasks/profile.py | 3 --- recce/tasks/query.py | 4 ---- recce/tasks/rowcount.py | 5 ----- recce/tasks/top_k.py | 3 --- recce/tasks/valuediff.py | 3 --- 7 files changed, 13 insertions(+), 27 deletions(-) diff --git a/recce/tasks/core.py b/recce/tasks/core.py index 94b7ac38..03262577 100644 --- a/recce/tasks/core.py +++ b/recce/tasks/core.py @@ -51,7 +51,7 @@ class TaskResultDiffer(ABC): def __init__(self, run: Run): self.run = run self.changes = self._check_result_changed_fn(run.result) - self.related_node_ids = self._get_related_node_ids(run.params) + self.related_node_ids = self._get_related_node_ids() @staticmethod def diff(base, current): @@ -60,9 +60,9 @@ def diff(base, current): return diff if diff else None @staticmethod - def get_node_ids_by_name(name): + def get_node_id_by_name(name): node = default_context().adapter.get_node_by_name(name) - return [node.unique_id] if node else None + return node.unique_id if node else None @abstractmethod def _check_result_changed_fn(self, result): @@ -72,10 +72,17 @@ def _check_result_changed_fn(self, result): """ raise NotImplementedError() - @abstractmethod - def _get_related_node_ids(self, params): + def _get_related_node_ids(self) -> List[str] | None: """ Get the related node ids. Should be implemented by subclass. """ - raise NotImplementedError() + params = self.run.params + if params.get('model'): + return [TaskResultDiffer.get_node_id_by_name(params.get('model'))] + elif params.get('node_names'): + names = params.get('node_names', []) + return [TaskResultDiffer.get_node_id_by_name(name) for name in names] + else: + # No related node ids in the params + return None diff --git a/recce/tasks/histogram.py b/recce/tasks/histogram.py index a70c3bc1..766c8c82 100644 --- a/recce/tasks/histogram.py +++ b/recce/tasks/histogram.py @@ -365,6 +365,3 @@ def cancel(self): class HistogramDiffTaskResultDiffer(TaskResultDiffer): def _check_result_changed_fn(self, result): return TaskResultDiffer.diff(result['base'], result['current']) - - def _get_related_node_ids(self, params): - return TaskResultDiffer.get_node_ids_by_name(params.get('model')) diff --git a/recce/tasks/profile.py b/recce/tasks/profile.py index d4b6a632..e71584ad 100644 --- a/recce/tasks/profile.py +++ b/recce/tasks/profile.py @@ -130,6 +130,3 @@ def cancel(self): class ProfileDiffResultDiffer(TaskResultDiffer): def _check_result_changed_fn(self, result): return self.diff(result['base'], result['current']) - - def _get_related_node_ids(self, params): - return TaskResultDiffer.get_node_ids_by_name(params.get('model')) diff --git a/recce/tasks/query.py b/recce/tasks/query.py index 6a169915..b47d907b 100644 --- a/recce/tasks/query.py +++ b/recce/tasks/query.py @@ -174,7 +174,3 @@ def _check_result_changed_fn(self, result): current = result.get('current') return TaskResultDiffer.diff(base, current) - - def _get_related_node_ids(self, params): - # QueryDiffTask does not have related node ids - return None diff --git a/recce/tasks/rowcount.py b/recce/tasks/rowcount.py index 2bef3737..fc1c73ff 100644 --- a/recce/tasks/rowcount.py +++ b/recce/tasks/rowcount.py @@ -126,8 +126,3 @@ def _check_result_changed_fn(self, result): current[node] = row_counts['curr'] return TaskResultDiffer.diff(base, current) - - def _get_related_node_ids(self, params): - node_names = params.get('node_names', []) - nodes = [default_context().adapter.get_node_by_name(node_name) for node_name in node_names] - return [node.unique_id for node in nodes] diff --git a/recce/tasks/top_k.py b/recce/tasks/top_k.py index 5a94e216..a7970e9e 100644 --- a/recce/tasks/top_k.py +++ b/recce/tasks/top_k.py @@ -159,6 +159,3 @@ def _check_result_changed_fn(self, result): current = result.get('current') return TaskResultDiffer.diff(base, current) - - def _get_related_node_ids(self, params): - return TaskResultDiffer.get_node_ids_by_name(params.get('model')) diff --git a/recce/tasks/valuediff.py b/recce/tasks/valuediff.py index e255140d..c01ec34e 100644 --- a/recce/tasks/valuediff.py +++ b/recce/tasks/valuediff.py @@ -271,9 +271,6 @@ def _check_result_changed_fn(self, result): return changes if is_changed else None - def _get_related_node_ids(self, params): - return TaskResultDiffer.get_node_ids_by_name(params.get('model')) - class ValueDiffDetailParams(TypedDict): primary_key: str From 3093117dfc94a36fe202bb87fe5e21819d738d05 Mon Sep 17 00:00:00 2001 From: Kent Huang Date: Mon, 6 May 2024 15:58:18 +0800 Subject: [PATCH 8/8] [Fix] Fix some feedback issues Signed-off-by: Kent Huang --- recce/cli.py | 2 +- recce/summary.py | 36 ++++++++++++++++++++---------------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/recce/cli.py b/recce/cli.py index a43988f0..1eee245a 100644 --- a/recce/cli.py +++ b/recce/cli.py @@ -201,7 +201,7 @@ def run(output, **kwargs): @cli.command(cls=TrackCommand) @click.argument('state_file', required=True) @click.option('--format', '-f', help='Output format. Currently only markdown is supported.', - type=click.Choice(['markdown', 'mermaid', 'preset'], case_sensitive=False), + type=click.Choice(['markdown', 'mermaid', 'check'], case_sensitive=False), default='markdown', show_default=True) def summary(state_file, **kwargs): from rich.console import Console diff --git a/recce/summary.py b/recce/summary.py index 686d93cd..d2903632 100644 --- a/recce/summary.py +++ b/recce/summary.py @@ -112,6 +112,10 @@ def _what_changed(self, checks=None): if checks: for check in checks: + check_type = check.type + if check_type == RunType.ROW_COUNT_DIFF or check_type == RunType.SCHEMA_DIFF: + # Skip the row count and schema diff check, since we already have it. + continue if check.node_ids and self.id in check.node_ids: changes.append(str(check.type).replace('_', ' ').title()) return changes @@ -278,10 +282,10 @@ def _get_node_row_count_diff(node_id, node_name): # def _get_node_schema_diff(node_id): -def generate_preset_check_summary(base_lineage, curr_lineage) -> List[CheckSummary]: +def generate_check_summary(base_lineage, curr_lineage) -> List[CheckSummary]: runs = RunDAO().list() - preset_checks = [check for check in CheckDAO().list() if check.is_preset is True] - preset_checks_summary: List[CheckSummary] = [] + checks = CheckDAO().list() + checks_summary: List[CheckSummary] = [] def _find_run_by_check_id(check_id): for r in runs: @@ -289,7 +293,7 @@ def _find_run_by_check_id(check_id): return r return None - for check in preset_checks: + for check in checks: run = _find_run_by_check_id(check.check_id) if check.type == RunType.SCHEMA_DIFF: # TODO: Check schema diff of the selected node @@ -298,7 +302,7 @@ def _find_run_by_check_id(check_id): current = _build_node_schema(curr_lineage, node_id) changes = TaskResultDiffer.diff(base, current) if changes: - preset_checks_summary.append( + checks_summary.append( CheckSummary( id=check.check_id, type=check.type, @@ -314,7 +318,7 @@ def _find_run_by_check_id(check_id): # Check the result is changed or not differ = differ_factory(run) if differ.changes is not None: - preset_checks_summary.append( + checks_summary.append( CheckSummary( id=check.check_id, type=check.type, @@ -324,7 +328,7 @@ def _find_run_by_check_id(check_id): node_ids=differ.related_node_ids) ) - return preset_checks_summary + return checks_summary def generate_mermaid_lineage_graph(graph: LineageGraph): @@ -357,14 +361,14 @@ def generate_markdown_summary(ctx, summary_format: str = 'markdown'): curr_lineage = ctx.get_lineage(base=False) base_lineage = ctx.get_lineage(base=True) graph = _build_lineage_graph(base_lineage, curr_lineage) - graph.checks = generate_preset_check_summary(base_lineage, curr_lineage) + graph.checks = generate_check_summary(base_lineage, curr_lineage) mermaid_content = generate_mermaid_lineage_graph(graph) - preset_content = None + check_content = None def _formate_changes(changes): return ",".join([k.replace('_', ' ').title() for k in list(changes.keys())]) - # Generate preset check summary if we found any changes + # Generate the check summary if we found any changes if len(graph.checks) > 0: from py_markdown_table.markdown_table import markdown_table data = [] @@ -375,12 +379,12 @@ def _formate_changes(changes): 'Description': check.description.replace('\n', ' ') or 'N/A', 'Type of Changes': _formate_changes(check.changes) }) - preset_content = markdown_table(data).set_params(quote=False, row_sep='markdown').get_markdown() + check_content = markdown_table(data).set_params(quote=False, row_sep='markdown').get_markdown() if summary_format == 'mermaid': return mermaid_content - elif summary_format == 'preset': - return preset_content + elif summary_format == 'check': + return check_content elif summary_format == 'markdown': # TODO: Check the markdown output content is longer than 65535 characters. # If it is, we should reduce the content length. @@ -392,9 +396,9 @@ def _formate_changes(changes): {mermaid_content} ``` ''' - if preset_content: + if check_content: content += f''' -## Impacted Preset Checks -{preset_content} +## Impacted Checks +{check_content} ''' return content