diff --git a/examples/experimental/dagster-blueprints/dagster_blueprints/cli.py b/examples/experimental/dagster-blueprints/dagster_blueprints/cli.py index e23b386568207..8725833309680 100644 --- a/examples/experimental/dagster-blueprints/dagster_blueprints/cli.py +++ b/examples/experimental/dagster-blueprints/dagster_blueprints/cli.py @@ -11,7 +11,7 @@ import click from dagster import _check as check -from dagster._cli.workspace.cli_target import has_pyproject_dagster_block +from dagster._cli.utils import has_pyproject_dagster_block from dagster._core.remote_representation.origin import ManagedGrpcPythonEnvCodeLocationOrigin from dagster._core.workspace.load_target import PyProjectFileTarget from dagster._utils.warnings import disable_dagster_warnings diff --git a/python_modules/dagster-webserver/dagster_webserver/cli.py b/python_modules/dagster-webserver/dagster_webserver/cli.py index 71a362d51c251..35ac009bd803c 100644 --- a/python_modules/dagster-webserver/dagster_webserver/cli.py +++ b/python_modules/dagster-webserver/dagster_webserver/cli.py @@ -11,12 +11,12 @@ import dagster._check as check import uvicorn from dagster._annotations import deprecated -from dagster._cli.utils import get_possibly_temporary_instance_for_cli +from dagster._cli.utils import ClickArgValue, get_possibly_temporary_instance_for_cli from dagster._cli.workspace import ( get_workspace_process_context_from_kwargs, workspace_target_options, ) -from dagster._cli.workspace.cli_target import WORKSPACE_TARGET_WARNING, ClickArgValue +from dagster._cli.workspace.cli_target import WORKSPACE_TARGET_WARNING from dagster._core.instance import InstanceRef from dagster._core.telemetry import START_DAGSTER_WEBSERVER, log_action from dagster._core.telemetry_upload import uploading_logging_thread diff --git a/python_modules/dagster/dagster/_cli/definitions.py b/python_modules/dagster/dagster/_cli/definitions.py index e6f3c11ced86e..8ea25b12ca67d 100644 --- a/python_modules/dagster/dagster/_cli/definitions.py +++ b/python_modules/dagster/dagster/_cli/definitions.py @@ -5,10 +5,12 @@ import click from dagster import __version__ as dagster_version -from dagster._cli.job import apply_click_params -from dagster._cli.utils import get_possibly_temporary_instance_for_cli -from dagster._cli.workspace.cli_target import ( +from dagster._cli.utils import ( ClickArgValue, + apply_click_params, + get_possibly_temporary_instance_for_cli, +) +from dagster._cli.workspace.cli_target import ( generate_module_name_option, generate_python_file_option, generate_workspace_option, diff --git a/python_modules/dagster/dagster/_cli/dev.py b/python_modules/dagster/dagster/_cli/dev.py index eeab51eb91131..f3979fba8f428 100644 --- a/python_modules/dagster/dagster/_cli/dev.py +++ b/python_modules/dagster/dagster/_cli/dev.py @@ -14,10 +14,12 @@ from dagster import _check as check from dagster._annotations import deprecated -from dagster._cli.job import apply_click_params -from dagster._cli.utils import get_possibly_temporary_instance_for_cli -from dagster._cli.workspace.cli_target import ( +from dagster._cli.utils import ( ClickArgValue, + apply_click_params, + get_possibly_temporary_instance_for_cli, +) +from dagster._cli.workspace.cli_target import ( generate_grpc_server_target_options, generate_module_name_option, generate_python_file_option, diff --git a/python_modules/dagster/dagster/_cli/job.py b/python_modules/dagster/dagster/_cli/job.py index 5d485cad3ead2..b1a38a043e033 100644 --- a/python_modules/dagster/dagster/_cli/job.py +++ b/python_modules/dagster/dagster/_cli/job.py @@ -10,12 +10,14 @@ import dagster._check as check from dagster import __version__ as dagster_version from dagster._cli.config_scaffolder import scaffold_job_config -from dagster._cli.utils import get_instance_for_cli, get_possibly_temporary_instance_for_cli -from dagster._cli.workspace.cli_target import ( - WORKSPACE_TARGET_WARNING, +from dagster._cli.utils import ( ClickArgMapping, ClickArgValue, - ClickOption, + get_instance_for_cli, + get_possibly_temporary_instance_for_cli, +) +from dagster._cli.workspace.cli_target import ( + WORKSPACE_TARGET_WARNING, get_code_location_from_workspace, get_config_from_args, get_job_python_origin_from_kwargs, @@ -66,7 +68,6 @@ from dagster._utils.yaml_utils import dump_run_config_yaml T = TypeVar("T") -T_Callable = TypeVar("T_Callable", bound=Callable[..., Any]) @click.group(name="job") @@ -74,12 +75,6 @@ def job_cli(): """Commands for working with Dagster jobs.""" -def apply_click_params(command: T_Callable, *click_params: ClickOption) -> T_Callable: - for click_param in click_params: - command = click_param(command) - return command - - @job_cli.command( name="list", help=f"List the jobs in a repository. {WORKSPACE_TARGET_WARNING}", diff --git a/python_modules/dagster/dagster/_cli/utils.py b/python_modules/dagster/dagster/_cli/utils.py index 86ef0921e09a3..a094fd50cc89c 100644 --- a/python_modules/dagster/dagster/_cli/utils.py +++ b/python_modules/dagster/dagster/_cli/utils.py @@ -1,15 +1,41 @@ import logging import os import tempfile -from collections.abc import Iterator +from collections.abc import Iterator, Mapping from contextlib import contextmanager -from typing import Optional +from typing import Any, Callable, Optional, TypeVar, Union + +import tomli +from typing_extensions import TypeAlias from dagster._core.instance import DagsterInstance, InstanceRef from dagster._core.instance.config import is_dagster_home_set from dagster._core.secrets.env_file import get_env_var_dict from dagster._utils.env import environ +T_Callable = TypeVar("T_Callable", bound=Callable[..., Any]) + +ClickArgValue: TypeAlias = Union[str, tuple[str]] +ClickArgMapping: TypeAlias = Mapping[str, ClickArgValue] +ClickOption: TypeAlias = Callable[[T_Callable], T_Callable] + + +def apply_click_params(command: T_Callable, *click_params: ClickOption) -> T_Callable: + for click_param in click_params: + command = click_param(command) + return command + + +def has_pyproject_dagster_block(path: str) -> bool: + if not os.path.exists(path): + return False + with open(path, "rb") as f: + data = tomli.load(f) + if not isinstance(data, dict): + return False + + return "dagster" in data.get("tool", {}) + @contextmanager def _inject_local_env_file(logger: logging.Logger) -> Iterator[None]: diff --git a/python_modules/dagster/dagster/_cli/workspace/cli_target.py b/python_modules/dagster/dagster/_cli/workspace/cli_target.py index 7ee9e90258565..1257c60b52583 100644 --- a/python_modules/dagster/dagster/_cli/workspace/cli_target.py +++ b/python_modules/dagster/dagster/_cli/workspace/cli_target.py @@ -5,11 +5,16 @@ from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar, Union, cast import click -import tomli from click import UsageError -from typing_extensions import Never, TypeAlias +from typing_extensions import Never import dagster._check as check +from dagster._cli.utils import ( + ClickArgMapping, + ClickOption, + apply_click_params, + has_pyproject_dagster_block, +) from dagster._core.code_pointer import CodePointer from dagster._core.definitions.reconstruct import repository_def_from_target_def from dagster._core.definitions.repository_definition import RepositoryDefinition @@ -51,30 +56,6 @@ T_Callable = TypeVar("T_Callable", bound=Callable[..., Any]) -ClickArgValue: TypeAlias = Union[str, tuple[str]] -ClickArgMapping: TypeAlias = Mapping[str, ClickArgValue] -ClickOption: TypeAlias = Callable[[T_Callable], T_Callable] - - -def _raise_cli_usage_error(msg: Optional[str] = None) -> Never: - raise UsageError( - msg or "Invalid set of CLI arguments for loading repository/job. See --help for details." - ) - - -def _check_cli_arguments_none(kwargs: ClickArgMapping, *keys: str) -> None: - for key in keys: - if kwargs.get(key): - _raise_cli_usage_error() - - -def are_all_keys_empty(kwargs: ClickArgMapping, keys: Iterable[str]) -> bool: - for key in keys: - if kwargs.get(key): - return False - - return True - WORKSPACE_CLI_ARGS = ( "workspace", @@ -90,20 +71,9 @@ def are_all_keys_empty(kwargs: ClickArgMapping, keys: Iterable[str]) -> bool: ) -def has_pyproject_dagster_block(path: str) -> bool: - if not os.path.exists(path): - return False - with open(path, "rb") as f: - data = tomli.load(f) - if not isinstance(data, dict): - return False - - return "dagster" in data.get("tool", {}) - - def get_workspace_load_target(kwargs: ClickArgMapping) -> WorkspaceLoadTarget: check.mapping_param(kwargs, "kwargs") - if are_all_keys_empty(kwargs, WORKSPACE_CLI_ARGS): + if _are_all_keys_empty(kwargs, WORKSPACE_CLI_ARGS): if kwargs.get("empty_workspace"): return EmptyWorkspaceTarget() if has_pyproject_dagster_block("pyproject.toml"): @@ -486,56 +456,40 @@ def generate_repository_identifier_options() -> Sequence[ClickOption]: def python_job_config_option(*, command_name: str) -> Callable[[T_Callable], T_Callable]: def wrap(f: T_Callable) -> T_Callable: - from dagster._cli.job import apply_click_params - return apply_click_params(f, generate_run_config_option(command_name)) return wrap def python_job_target_options(f: T_Callable) -> T_Callable: - from dagster._cli.job import apply_click_params - return apply_click_params(f, *generate_python_job_target_options()) def workspace_target_options(f: T_Callable) -> T_Callable: - from dagster._cli.job import apply_click_params - return apply_click_params(f, *generate_workspace_target_options()) def job_workspace_target_options(f: T_Callable) -> T_Callable: - from dagster._cli.job import apply_click_params - return apply_click_params(f, *generate_workspace_target_options()) def grpc_server_origin_target_options(f: T_Callable) -> T_Callable: - from dagster._cli.job import apply_click_params - return apply_click_params(f, *generate_grpc_server_target_options()) def python_origin_target_options(f: T_Callable) -> T_Callable: - from dagster._cli.job import apply_click_params - return apply_click_params( f, *generate_python_target_options(allow_multiple_python_targets=False) ) def repository_target_options(f: T_Callable) -> T_Callable: - from dagster._cli.job import apply_click_params - return apply_click_params( f, *generate_workspace_target_options(), *generate_repository_identifier_options() ) def job_repository_target_options(f: T_Callable) -> T_Callable: - from dagster._cli.job import apply_click_params - options = [ *generate_workspace_target_options(), *generate_repository_identifier_options(), @@ -544,8 +498,6 @@ def job_repository_target_options(f: T_Callable) -> T_Callable: def job_target_options(f: T_Callable) -> T_Callable: - from dagster._cli.job import apply_click_params - options = [ *generate_workspace_target_options(), *generate_repository_identifier_options(), @@ -855,3 +807,28 @@ def get_config_from_args(kwargs: Mapping[str, str]) -> Mapping[str, object]: ) else: check.failed("Unhandled case getting config from kwargs") + + +# ######################## +# ##### HELPERS +# ######################## + + +def _raise_cli_usage_error(msg: Optional[str] = None) -> Never: + raise UsageError( + msg or "Invalid set of CLI arguments for loading repository/job. See --help for details." + ) + + +def _check_cli_arguments_none(kwargs: ClickArgMapping, *keys: str) -> None: + for key in keys: + if kwargs.get(key): + _raise_cli_usage_error() + + +def _are_all_keys_empty(kwargs: ClickArgMapping, keys: Iterable[str]) -> bool: + for key in keys: + if kwargs.get(key): + return False + + return True diff --git a/python_modules/dagster/dagster/_daemon/cli/__init__.py b/python_modules/dagster/dagster/_daemon/cli/__init__.py index f70963d2347ed..e70503d7c0da2 100644 --- a/python_modules/dagster/dagster/_daemon/cli/__init__.py +++ b/python_modules/dagster/dagster/_daemon/cli/__init__.py @@ -5,13 +5,8 @@ import click from dagster import __version__ as dagster_version -from dagster._cli.utils import get_instance_for_cli -from dagster._cli.workspace.cli_target import ( - ClickArgMapping, - ClickArgValue, - get_workspace_load_target, - workspace_target_options, -) +from dagster._cli.utils import ClickArgMapping, ClickArgValue, get_instance_for_cli +from dagster._cli.workspace.cli_target import get_workspace_load_target, workspace_target_options from dagster._core.instance import DagsterInstance, InstanceRef from dagster._core.telemetry import telemetry_wrapper from dagster._daemon.controller import (