Skip to content

Commit

Permalink
Fix dbt-core v1.4 compat, bump version + lower bound (#121)
Browse files Browse the repository at this point in the history
* Bump version, fix dbt-core lower bound

* Handle exception renames

* Use any python version for flake8

* Missed one

* Implement defer_to_manifest abstractmethod
  • Loading branch information
jtcohen6 authored Jan 13, 2023
1 parent 521ec41 commit c305bfc
Show file tree
Hide file tree
Showing 14 changed files with 57 additions and 49 deletions.
18 changes: 9 additions & 9 deletions dbt_rpc/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
from dbt.utils import ExitCodes
from dbt.config.profile import read_user_config
from dbt.exceptions import (
RuntimeException,
InternalException,
NotImplementedException,
FailedToConnectException
DbtRuntimeError,
DbtInternalError,
NotImplementedError,
FailedToConnectError
)
import dbt.flags as flags

Expand Down Expand Up @@ -75,7 +75,7 @@ def add_optional_argument_inverse(
):
mutex_group = self.add_mutually_exclusive_group()
if not name.startswith('--'):
raise InternalException(
raise DbtInternalError(
'cannot handle optional argument without "--" prefix: '
f'got "{name}"'
)
Expand Down Expand Up @@ -141,7 +141,7 @@ def main(args=None):

if log_manager.initialized:
logger.debug(traceback.format_exc())
elif not isinstance(e, RuntimeException):
elif not isinstance(e, DbtRuntimeError):
# if it did not come from dbt proper and the logger is not
# initialized (so there's no safe path to log to), log the
# stack trace at error level.
Expand Down Expand Up @@ -207,8 +207,8 @@ def track_run(task):
dbt.tracking.track_invocation_end(
config=task.config, args=task.args, result_type="ok"
)
except (NotImplementedException,
FailedToConnectException) as e:
except (NotImplementedError,
FailedToConnectError) as e:
logger.error('ERROR: {}'.format(e))
dbt.tracking.track_invocation_end(
config=task.config, args=task.args, result_type="error"
Expand All @@ -231,7 +231,7 @@ def run_from_args(parsed):

logger.info("Running with dbt{}".format(dbt.version.installed))

# this will convert DbtConfigErrors into RuntimeExceptions
# this will convert DbtConfigErrors into DbtRuntimeErrors
# task could be any one of the task objects
task = parsed.cls.from_args(args=parsed)

Expand Down
4 changes: 2 additions & 2 deletions dbt_rpc/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
RunExecutionResult,
)
from dbt.contracts.util import VersionedSchema, schema_version
from dbt.exceptions import InternalException
from dbt.exceptions import DbtInternalError
from dbt.logger import LogMessage
from dbt.utils import restrict_to

Expand Down Expand Up @@ -409,7 +409,7 @@ def add_result(self, task_id: TaskID, state: GCResultState):
elif state == GCResultState.Deleted:
self.deleted.append(task_id)
else:
raise InternalException(
raise DbtInternalError(
f'Got invalid state in add_result: {state}'
)

Expand Down
4 changes: 2 additions & 2 deletions dbt_rpc/parser/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dbt.contracts.graph.manifest import SourceFile
from dbt.contracts.graph.nodes import RPCNode, Macro
from dbt.contracts.graph.unparsed import UnparsedMacro
from dbt.exceptions import InternalException
from dbt.exceptions import DbtInternalError
from dbt.node_types import NodeType
from dbt.parser.base import SimpleSQLParser
from dbt.parser.macros import MacroParser
Expand Down Expand Up @@ -34,7 +34,7 @@ def resource_type(self) -> NodeType:
def get_compiled_path(cls, block: FileBlock):
# we do it this way to make mypy happy
if not isinstance(block, RPCBlock):
raise InternalException(
raise DbtInternalError(
'While parsing RPC calls, got an actual file block instead of '
'an RPC block: {}'.format(block)
)
Expand Down
6 changes: 3 additions & 3 deletions dbt_rpc/rpc/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dbt_rpc.contracts.rpc import (
RemoteResult,
)
from dbt.exceptions import InternalException
from dbt.exceptions import DbtInternalError
from dbt.utils import restrict_to


Expand Down Expand Up @@ -137,7 +137,7 @@ def recv(
"""
rv = self._recv_raw(timeout)
if not isinstance(rv, QueueMessage):
raise InternalException(
raise DbtInternalError(
'Got invalid queue message: {}'.format(rv)
)
return rv
Expand All @@ -153,7 +153,7 @@ def handle_message(
elif msg.message_type in QueueMessageType.terminating:
return msg
else:
raise InternalException(
raise DbtInternalError(
'Got invalid queue message type {}'.format(msg.message_type)
)

Expand Down
14 changes: 7 additions & 7 deletions dbt_rpc/rpc/method.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dbt.dataclass_schema import dbtClassMixin, ValidationError

from dbt_rpc.contracts.rpc import RPCParameters, RemoteResult, RemoteMethodFlags
from dbt.exceptions import NotImplementedException, InternalException
from dbt.exceptions import NotImplementedError, DbtInternalError

Parameters = TypeVar('Parameters', bound=RPCParameters)
Result = TypeVar('Result', bound=RemoteResult)
Expand All @@ -27,19 +27,19 @@ def get_parameters(cls) -> Type[Parameters]:
argspec = inspect.getfullargspec(cls.set_args)
annotations = argspec.annotations
if 'params' not in annotations:
raise InternalException(
raise DbtInternalError(
'set_args must have parameter named params with a valid '
'RPCParameters type definition (no params annotation found)'
)
params_type = annotations['params']
if not issubclass(params_type, RPCParameters):
raise InternalException(
raise DbtInternalError(
'set_args must have parameter named params with a valid '
'RPCParameters type definition (got {}, expected '
'RPCParameters subclass)'.format(params_type)
)
if params_type is RPCParameters:
raise InternalException(
raise DbtInternalError(
'set_args must have parameter named params with a valid '
'RPCParameters type definition (got RPCParameters itself!)'
)
Expand Down Expand Up @@ -67,12 +67,12 @@ def recursive_subclasses(
@abstractmethod
def set_args(self, params: Parameters):
"""set_args executes in the parent process for an RPC call"""
raise NotImplementedException('set_args not implemented')
raise NotImplementedError('set_args not implemented')

@abstractmethod
def handle_request(self) -> Result:
"""handle_request executes inside the child process for an RPC call"""
raise NotImplementedException('handle_request not implemented')
raise NotImplementedError('handle_request not implemented')

def cleanup(self, result: Optional[Result]):
"""cleanup is an optional method that executes inside the parent
Expand Down Expand Up @@ -103,7 +103,7 @@ def set_args(self, params: Parameters):
self.params = params

def run(self):
raise InternalException(
raise DbtInternalError(
'the run() method on builtins should never be called'
)

Expand Down
2 changes: 1 addition & 1 deletion dbt_rpc/rpc/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, config, adapter, node, node_index, num_nodes):
def handle_exception(self, e, ctx):
logger.debug("Got an exception: {}".format(e), exc_info=True)
if isinstance(e, dbt.exceptions.Exception):
if isinstance(e, dbt.exceptions.RuntimeException):
if isinstance(e, dbt.exceptions.DbtRuntimeError):
e.add_node(ctx.node)
return dbt_error(e)
elif isinstance(e, RPCException):
Expand Down
20 changes: 10 additions & 10 deletions dbt_rpc/rpc/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from dbt_rpc.contracts.rpc import (
RPCParameters, RemoteResult, TaskHandlerState, RemoteMethodFlags, TaskTags,
)
from dbt.exceptions import InternalException
from dbt.exceptions import DbtInternalError
from dbt.logger import (
GLOBAL_LOGGER as logger, list_handler, LogMessage, OutputHandler,
)
Expand Down Expand Up @@ -120,7 +120,7 @@ def task_exec(self) -> None:
elif result is not None:
handler.emit_result(result)
else:
error = dbt_error(InternalException(
error = dbt_error(DbtInternalError(
'after request handling, neither result nor error is None!'
))
handler.emit_error(error.error)
Expand Down Expand Up @@ -211,7 +211,7 @@ def handle_completed(self):
if self.handler.result is None:
# there wasn't an error before, but there sure is one now
self.handler.error = dbt_error(
InternalException(
DbtInternalError(
'got an invalid result=None, but state was {}'
.format(self.handler.state)
)
Expand Down Expand Up @@ -313,7 +313,7 @@ def request_id(self) -> Union[str, int]:
@property
def method(self) -> str:
if self.task.METHOD_NAME is None: # mypy appeasement
raise InternalException(
raise DbtInternalError(
f'In the request handler, got a task({self.task}) with no '
'METHOD_NAME'
)
Expand Down Expand Up @@ -352,7 +352,7 @@ def _wait_for_results(self) -> RemoteResult:
self.started is None or
self.process is None
):
raise InternalException(
raise DbtInternalError(
'_wait_for_results() called before handle()'
)

Expand All @@ -374,13 +374,13 @@ def _wait_for_results(self) -> RemoteResult:
elif isinstance(msg, QueueResultMessage):
return msg.result
else:
raise dbt.exceptions.InternalException(
raise dbt.exceptions.DbtInternalError(
f'Invalid message type {msg.message_type} ({msg})'
)

def get_result(self) -> RemoteResult:
if self.process is None:
raise InternalException(
raise DbtInternalError(
'get_result() called before handle()'
)

Expand Down Expand Up @@ -426,7 +426,7 @@ def handle_singlethreaded(
# note this shouldn't call self.run() as that has different semantics
# (we want errors to raise)
if self.process is None: # mypy appeasement
raise InternalException(
raise DbtInternalError(
'Cannot run a None process'
)
self.process.task_exec()
Expand All @@ -446,7 +446,7 @@ def start(self):
# 'connection already closed' exceptions
cleanup_connections()
if self.process is None:
raise InternalException('self.process is None in start()!')
raise DbtInternalError('self.process is None in start()!')
self.process.start()
self.state = TaskHandlerState.Running
super().start()
Expand Down Expand Up @@ -486,7 +486,7 @@ def handle(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
# tasks use this to set their `real_task`.
self.task.set_config(self.manager.config)
if self.task_params is None: # mypy appeasement
raise InternalException(
raise DbtInternalError(
'Task params set to None!'
)

Expand Down
8 changes: 8 additions & 0 deletions dbt_rpc/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)
from dbt.task.runnable import GraphRunnableTask
from dbt_rpc.rpc.method import RemoteManifestMethod, Parameters
from typing import AbstractSet


RESULT_TYPE_MAP = {
Expand All @@ -33,6 +34,13 @@ def load_manifest(self):
# we started out with a manifest!
pass

def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):
# https://github.com/dbt-labs/dbt-core/pull/6488
# abstract method added to GraphRunnableTask
# bacause of how we do multiple inheritance for project commands,
# this will be overridden by task-specific methods
pass

def get_result(
self, results, elapsed_time, generated_at
) -> RemoteExecutionResult:
Expand Down
10 changes: 5 additions & 5 deletions dbt_rpc/task/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
Parameters,
Result,
)
from dbt.exceptions import InternalException
from dbt.exceptions import DbtInternalError
from dbt.parser.manifest import ManifestLoader

from .base import RPCTask
Expand Down Expand Up @@ -41,7 +41,7 @@ def set_config(self, config):
super().set_config(config)

if self.task_type is None:
raise InternalException('task type not set for set_config')
raise DbtInternalError('task type not set for set_config')
if issubclass(self.task_type, RemoteManifestMethod):
task_type: Type[RemoteManifestMethod] = self.task_type
self.real_task = task_type(
Expand All @@ -62,7 +62,7 @@ def set_args(self, params: RPCCliParameters) -> None:

def get_flags(self):
if self.task_type is None:
raise InternalException('task type not set for get_flags')
raise DbtInternalError('task type not set for get_flags')
# this is a kind of egregious hack from a type perspective...
return self.task_type.get_flags(self) # type: ignore

Expand All @@ -75,7 +75,7 @@ def get_rpc_task_cls(self) -> Type[HasCLI]:
if candidate.METHOD_NAME == self.args.rpc_method:
return candidate
# this shouldn't happen
raise InternalException(
raise DbtInternalError(
'No matching handler found for rpc method {} (which={})'
.format(self.args.rpc_method, self.args.which)
)
Expand All @@ -86,7 +86,7 @@ def load_manifest(self):

def handle_request(self) -> Result:
if self.real_task is None:
raise InternalException(
raise DbtInternalError(
'CLI task is in a bad state: handle_request called with no '
'real_task set!'
)
Expand Down
6 changes: 3 additions & 3 deletions dbt_rpc/task/project_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
RPCListParameters,
RPCBuildParameters,
)
from dbt.exceptions import RuntimeException
from dbt.exceptions import DbtRuntimeError
from dbt_rpc.rpc.method import (
Parameters, RemoteManifestMethod
)
Expand Down Expand Up @@ -302,11 +302,11 @@ def set_args(self, params: RPCListParameters) -> None:

if self.args.models:
if self.args.select:
raise RuntimeException(
raise DbtRuntimeError(
'"models" and "select" are mutually exclusive arguments'
)
if self.args.resource_types:
raise RuntimeException(
raise DbtRuntimeError(
'"models" and "resource_type" are mutually exclusive '
'arguments'
)
Expand Down
4 changes: 2 additions & 2 deletions dbt_rpc/task/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from werkzeug.serving import run_simple
from werkzeug.exceptions import NotFound

from dbt.exceptions import RuntimeException
from dbt.exceptions import DbtRuntimeError
from dbt.logger import (
GLOBAL_LOGGER as logger,
log_manager,
Expand Down Expand Up @@ -77,7 +77,7 @@ def __init__(
self, args, config, tasks: Optional[List[Type[RemoteMethod]]] = None
) -> None:
if os.name == 'nt':
raise RuntimeException(
raise DbtRuntimeError(
'The dbt RPC server is not supported on windows'
)
super().__init__(args, config)
Expand Down
4 changes: 2 additions & 2 deletions dbt_rpc/task/sql_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dbt.contracts.graph.nodes import RPCNode
from dbt_rpc.contracts.rpc import RPCExecParameters
from dbt_rpc.contracts.rpc import RemoteExecutionResult
from dbt.exceptions import RPCKilledException, InternalException
from dbt.exceptions import RPCKilledException, DbtInternalError
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.parser.manifest import process_node, process_macro
from dbt_rpc.parser.rpc import RPCCallParser, RPCMacroParser
Expand Down Expand Up @@ -102,7 +102,7 @@ def _extract_request_data(self, data):

def _get_exec_node(self):
if self.manifest is None:
raise InternalException(
raise DbtInternalError(
'manifest not set in _get_exec_node'
)

Expand Down
Loading

0 comments on commit c305bfc

Please sign in to comment.