Skip to content

Commit

Permalink
Refactor run_operation: use result_callback
Browse files Browse the repository at this point in the history
Closes #17
  • Loading branch information
kokorin committed Jul 3, 2024
1 parent 777d706 commit f04cd56
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 25 deletions.
48 changes: 27 additions & 21 deletions dbt_pumpkin/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import tempfile
from collections import Counter
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Callable

from ruamel.yaml import YAML

Expand Down Expand Up @@ -225,53 +225,59 @@ def _create_pumpkin_project(self, project_vars: dict[str, any]) -> Path:

return pumpkin_dir

def _run_operation(self, operation_name: str, project_vars: dict[str, any]) -> dict:
def _run_operation(
self, operation_name: str, project_vars: dict[str, any], result_callback: Callable[[dict], None]
):
pumpkin_dir = self._create_pumpkin_project(project_vars)

project_params = self._project_params.with_project_dir(str(pumpkin_dir))

args = ["run-operation", operation_name, *project_params.to_args()]
logger.debug("Command line: %s", args)

jinja_log_messages: list[str] = []

def event_callback(event: EventMsg):
if event.info.name in {"JinjaLogInfo", "JinjaLogDebug"}:
jinja_log_messages.append(event.info.msg)
try:
potential_result = json.loads(str(event.info.msg))
if operation_name in potential_result:
result_callback(potential_result[operation_name])
else:
logger.debug("Ignoring potential result: no '%s' key: %s", operation_name, potential_result)
except ValueError:
logger.exception("Failed to parse potential result %s", event.info.msg)

res: dbtRunnerResult = dbtRunner(callbacks=[event_callback]).invoke(args)

if not res.success:
logger.exception("Failed to load resources", exc_info=res.exception)

if not jinja_log_messages:
msg = f"No response retrieved from operation {operation_name}"
msg = f"Run operation failure: {operation_name}. Exception: {res.exception}"
raise PumpkinError(msg)

return json.loads(jinja_log_messages[0])

def _do_lookup_tables(self) -> list[Table]:
logger.info("Looking up tables")

raw_resources = self._raw_resources
project_vars = {
"get_column_types_args": {
str(resource.unique_id): [resource.database, resource.schema, resource.identifier]
for resource in self._raw_resources
for resource in raw_resources
},
}

column_types_response = self._run_operation("get_column_types", project_vars)
result = [
Table(
resource_id=ResourceID(res_id),
columns=[Column(name=c["name"], data_type=c["data_type"], description=None) for c in columns],
tables: list[Table] = []

def on_result(result: dict):
table = Table(
resource_id=ResourceID(result["resource_id"]),
columns=[Column(name=c["name"], data_type=c["data_type"], description=None) for c in result["columns"]],
)
for res_id, columns in column_types_response.items()
]
tables.append(table)
logger.info("Looked up %s / %s: %s", len(tables), len(raw_resources), table.resource_id)

logger.info("Found %s tables", len(result))
self._run_operation("get_column_types", project_vars, on_result)

return result
logger.info("Found %s tables", len(tables))

return tables

def lookup_tables(self):
if self._tables is None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
{% macro get_column_types() %}
{% set result = {} %}
{% for id, database_schema_identifier in var('get_column_types_args').items() %}
{% for resource_id, database_schema_identifier in var('get_column_types_args').items() %}
{% set database, schema, identifier = database_schema_identifier %}
{% set relation = adapter.get_relation(database, schema, identifier) %}

{% set result = {
'resource_id': resource_id,
'columns': none
} %}

{% if relation is not none %}
{% set columns = [] %}
{% for column in adapter.get_columns_in_relation(relation) %}
Expand All @@ -14,8 +19,9 @@
'data_type': column.data_type,
}) %}
{% endfor %}
{% do result.update({id: columns}) %}
{% do result.update({'columns': columns}) %}
{% endif %}

{{ log(tojson( {'get_column_types': result} )) }}
{% endfor %}
{{ log(tojson(result)) }}
{% endmacro %}

0 comments on commit f04cd56

Please sign in to comment.