Skip to content

Commit

Permalink
Fix issues, synchronize CLI command
Browse files Browse the repository at this point in the history
  • Loading branch information
kokorin committed Jul 6, 2024
1 parent 196b7e6 commit 3048e04
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 8 deletions.
20 changes: 20 additions & 0 deletions dbt_pumpkin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ def relocate(project_dir, profiles_dir, target, profile, select, exclude, dry_ru
pumpkin = Pumpkin(project_params, resource_params)
pumpkin.relocate(dry_run=dry_run)

@cli.command
@P.project_dir
@P.profiles_dir
@P.target
@P.profile
@P.select
@P.exclude
@P.dry_run
@P.debug
def synchronize(project_dir, profiles_dir, target, profile, select, exclude, dry_run, debug):
"""
Synchronizes YAML definitions with actual tables in DB
"""
set_up_logging(debug)

project_params = ProjectParams(project_dir=project_dir, profiles_dir=profiles_dir, target=target, profile=profile)
resource_params = ResourceParams(select=select, exclude=exclude)
pumpkin = Pumpkin(project_params, resource_params)
pumpkin.synchronize(dry_run=dry_run)


def main():
cli()
Expand Down
2 changes: 2 additions & 0 deletions dbt_pumpkin/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ def __post_init__(self):
raise PropertyRequiredError("source_name", self.resource_name) # noqa: EM101
if self.resource_type != ResourceType.SOURCE and self.source_name is not None:
raise PropertyNotAllowedError("source_name", self.resource_name) # noqa: EM101
if not self.path:
raise PropertyRequiredError("path", self.resource_name) # noqa: EM101

def affected_files(self) -> set[Path]:
return {self.path}
Expand Down
4 changes: 4 additions & 0 deletions dbt_pumpkin/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ def plan(self) -> Plan:
if not table:
logger.warning("Table not found for resource: %s", resource.unique_id)
continue
if not resource.yaml_path:
logger.warning("Resource %s %s has no YAML path defined, consider using bootstrap command",
resource.type, resource.unique_id)
continue

actions += self._resource_plan(resource, table)

Expand Down
40 changes: 32 additions & 8 deletions dbt_pumpkin/pumpkin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dbt_pumpkin.loader import ResourceLoader
from dbt_pumpkin.params import ProjectParams, ResourceParams
from dbt_pumpkin.planner import ActionPlanner, BootstrapPlanner, RelocationPlanner
from dbt_pumpkin.planner import BootstrapPlanner, RelocationPlanner, SynchronizationPlanner
from dbt_pumpkin.storage import DiskStorage

logger = logging.getLogger(__name__)
Expand All @@ -13,20 +13,44 @@ def __init__(self, project_params: ProjectParams, resource_params: ResourceParam
self.project_params = project_params
self.resource_params = resource_params

def _plan_and_execute(self, planner: ActionPlanner, *, dry_run: bool):
def bootstrap(self, *, dry_run: bool):
loader = ResourceLoader(self.project_params, self.resource_params)

logger.info("Loading resource")
resources = loader.select_resources()

planner = BootstrapPlanner(resources)
plan = planner.plan()

storage = DiskStorage(loader.locate_project_dir(), read_only=dry_run)
logger.info("Executing actions")
plan.execute(storage)

def relocate(self, *, dry_run: bool):
loader = ResourceLoader(self.project_params, self.resource_params)

logger.info("Loading resource")
resources = loader.select_resources()

logger.info("Planning actions")
plan = planner.plan(resources)
planner = RelocationPlanner(resources)
plan = planner.plan()

storage = DiskStorage(loader.locate_project_dir(), read_only=dry_run)
logger.info("Executing actions")
plan.execute(storage)

def bootstrap(self, *, dry_run: bool):
self._plan_and_execute(BootstrapPlanner(), dry_run=dry_run)
def synchronize(self, *, dry_run: bool):
loader = ResourceLoader(self.project_params, self.resource_params)

def relocate(self, *, dry_run: bool):
self._plan_and_execute(RelocationPlanner(), dry_run=dry_run)
logger.info("Loading resource")
resources = loader.select_resources()

logger.info("Looking up tables")
tables = loader.lookup_tables()

planner = SynchronizationPlanner(resources, tables)
plan = planner.plan()

storage = DiskStorage(loader.locate_project_dir(), read_only=dry_run)
logger.info("Executing actions")
plan.execute(storage)

0 comments on commit 3048e04

Please sign in to comment.