Skip to content

Commit

Permalink
Cl/fix poll list result main (#138)
Browse files Browse the repository at this point in the history
* fix poll list task

* fix dbt exception

* nits

* bump version
  • Loading branch information
ChenyuLInx authored May 9, 2023
1 parent a935d29 commit 0968c8f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 16 deletions.
27 changes: 27 additions & 0 deletions dbt_rpc/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,33 @@ def from_result(
)


@dataclass
@schema_version('poll-remote-list-result', 1)
class PollListResult(RemoteListResults, PollResult):
state: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success,
TaskHandlerState.Failed),
)

@classmethod
def from_result(
cls: Type['PollListResult'],
base: RemoteListResults,
tags: TaskTags,
timing: TaskTiming,
logs: List[LogMessage],
) -> 'PollListResult':
return cls(
output=base.output,
logs=logs,
tags=tags,
state=timing.state,
start=timing.start,
end=timing.end,
elapsed=timing.elapsed,
)


@dataclass
@schema_version("poll-remote-freshness-result", 1)
class PollFreshnessResult(RemoteFreshnessResult, PollResult):
Expand Down
23 changes: 14 additions & 9 deletions dbt_rpc/rpc/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import dbt.exceptions
from dbt_rpc.contracts.rpc import (
PollListResult,
RemoteListResults,
TaskTags,
StatusParameters,
ReloadParameters,
Expand Down Expand Up @@ -54,7 +56,7 @@ def set_args(self, params: GCParameters):

def handle_request(self) -> GCResult:
if self.params is None:
raise dbt.exceptions.InternalException("GC: params not set")
raise dbt.exceptions.DbtInternalError("GC: params not set")
return self.task_manager.gc_safe(
task_ids=self.params.task_ids,
before=self.params.before,
Expand All @@ -70,7 +72,7 @@ def set_args(self, params: KillParameters):

def handle_request(self) -> KillResult:
if self.params is None:
raise dbt.exceptions.InternalException("Kill: params not set")
raise dbt.exceptions.DbtInternalError("Kill: params not set")
result = KillResult()
task: RequestTaskHandler
try:
Expand Down Expand Up @@ -117,7 +119,7 @@ def set_args(self, params: PSParameters):

def keep(self, row: TaskRow):
if self.params is None:
raise dbt.exceptions.InternalException("PS: params not set")
raise dbt.exceptions.DbtInternalError("PS: params not set")
if row.state.finished and self.params.completed:
return True
elif not row.state.finished and self.params.active:
Expand All @@ -136,7 +138,7 @@ def poll_complete(
timing: TaskTiming, result: Any, tags: TaskTags, logs: List[LogMessage]
) -> PollResult:
if timing.state not in (TaskHandlerState.Success, TaskHandlerState.Failed):
raise dbt.exceptions.InternalException(
raise dbt.exceptions.DbtInternalError(
f"got invalid result state in poll_complete: {timing.state}"
)

Expand All @@ -150,6 +152,7 @@ def poll_complete(
PollRunOperationCompleteResult,
PollGetManifestResult,
PollFreshnessResult,
PollListResult,
]
]

Expand All @@ -170,8 +173,10 @@ def poll_complete(
cls = PollGetManifestResult
elif isinstance(result, RemoteFreshnessResult):
cls = PollFreshnessResult
elif isinstance(result, RemoteListResults):
cls = PollListResult
else:
raise dbt.exceptions.InternalException(
raise dbt.exceptions.DbtInternalError(
"got invalid result in poll_complete: {}".format(result)
)
return cls.from_result(result, tags, timing, logs)
Expand All @@ -189,7 +194,7 @@ def set_args(self, params: PollParameters):

def handle_request(self) -> PollResult:
if self.params is None:
raise dbt.exceptions.InternalException("Poll: params not set")
raise dbt.exceptions.DbtInternalError("Poll: params not set")
task_id = self.params.request_token
task: RequestTaskHandler = self.task_manager.get_request(task_id)

Expand All @@ -216,7 +221,7 @@ def handle_request(self) -> PollResult:
elif state == TaskHandlerState.Error:
err = task.error
if err is None:
exc = dbt.exceptions.InternalException(
exc = dbt.exceptions.DbtInternalError(
f"At end of task {task_id}, error state but error is None"
)
raise RPCException.from_error(
Expand All @@ -227,7 +232,7 @@ def handle_request(self) -> PollResult:
raise err
elif state in (TaskHandlerState.Success, TaskHandlerState.Failed):
if task.result is None:
exc = dbt.exceptions.InternalException(
exc = dbt.exceptions.DbtInternalError(
f"At end of task {task_id}, state={state} but result is " "None"
)
raise RPCException.from_error(
Expand All @@ -246,7 +251,7 @@ def handle_request(self) -> PollResult:
elapsed=timing.elapsed,
)
else:
exc = dbt.exceptions.InternalException(
exc = dbt.exceptions.DbtInternalError(
f"Got unknown value state={state} for task {task_id}"
)
raise RPCException.from_error(dbt_error(exc, logs=_dict_logs(task_logs)))
Expand Down
2 changes: 1 addition & 1 deletion dbt_rpc/rpc/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def collect_task_id(
except KeyError:
# someone was mutating tasks while we had the lock, that's
# not right!
raise dbt.exceptions.InternalException(
raise dbt.exceptions.DbtInternalError(
'Got a KeyError for task uuid={} during gc'
.format(task_id)
)
Expand Down
2 changes: 1 addition & 1 deletion dbt_rpc/rpc/response_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __getitem__(self, key) -> Callable[..., Dict[str, Any]]:
self.manager, handler, self.http_request, self.json_rpc_request
)
else:
raise dbt.exceptions.InternalException(
raise dbt.exceptions.DbtInternalError(
f'Got an invalid handler from get_handler. Expected None, '
f'callable, or RemoteMethod, got {handler}'
)
Expand Down
4 changes: 2 additions & 2 deletions dbt_rpc/rpc/task_handler_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ def tags(self) -> Optional[TaskTags]:

def _assert_started(self) -> datetime:
if self.started is None:
raise dbt.exceptions.InternalException(
raise dbt.exceptions.DbtInternalError(
'task handler started but start time is not set'
)
return self.started

def _assert_ended(self) -> datetime:
if self.ended is None:
raise dbt.exceptions.InternalException(
raise dbt.exceptions.DbtInternalError(
'task handler finished but end time is not set'
)
return self.ended
Expand Down
4 changes: 2 additions & 2 deletions dbt_rpc/rpc/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def _get_manifest_callable(
return ParseError(self.last_parse.error)
else:
if self.manifest is None:
raise dbt.exceptions.InternalException(
raise dbt.exceptions.DbtInternalError(
f'Manifest should not be None if the last parse state is '
f'{state}'
)
Expand All @@ -179,7 +179,7 @@ def rpc_task(
elif issubclass(task, RemoteMethod):
return task(deepcopy(self.args), self.config)
else:
raise dbt.exceptions.InternalException(
raise dbt.exceptions.DbtInternalError(
f'Got a task with an invalid type! {task} with method '
f'name {method_name} has a type of {task.__class__}, '
f'should be a RemoteMethod'
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def read(fname):


package_name = "dbt-rpc"
package_version = "0.4.0"
package_version = "0.4.1"
description = """ A JSON RPC server that provides an interface to programmically interact with dbt projects. """


Expand Down

0 comments on commit 0968c8f

Please sign in to comment.