Skip to content

Commit

Permalink
[workspace-refactor] Centralize utilities in dagster._cli.utils
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 2, 2025
1 parent c706236 commit 740488a
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import click
from dagster import _check as check
from dagster._cli.workspace.cli_target import has_pyproject_dagster_block
from dagster._cli.utils import has_pyproject_dagster_block
from dagster._core.remote_representation.origin import ManagedGrpcPythonEnvCodeLocationOrigin
from dagster._core.workspace.load_target import PyProjectFileTarget
from dagster._utils.warnings import disable_dagster_warnings
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster-webserver/dagster_webserver/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import dagster._check as check
import uvicorn
from dagster._annotations import deprecated
from dagster._cli.utils import get_possibly_temporary_instance_for_cli
from dagster._cli.utils import ClickArgValue, get_possibly_temporary_instance_for_cli
from dagster._cli.workspace import (
get_workspace_process_context_from_kwargs,
workspace_target_options,
)
from dagster._cli.workspace.cli_target import WORKSPACE_TARGET_WARNING, ClickArgValue
from dagster._cli.workspace.cli_target import WORKSPACE_TARGET_WARNING
from dagster._core.instance import InstanceRef
from dagster._core.telemetry import START_DAGSTER_WEBSERVER, log_action
from dagster._core.telemetry_upload import uploading_logging_thread
Expand Down
8 changes: 5 additions & 3 deletions python_modules/dagster/dagster/_cli/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import click

from dagster import __version__ as dagster_version
from dagster._cli.job import apply_click_params
from dagster._cli.utils import get_possibly_temporary_instance_for_cli
from dagster._cli.workspace.cli_target import (
from dagster._cli.utils import (
ClickArgValue,
apply_click_params,
get_possibly_temporary_instance_for_cli,
)
from dagster._cli.workspace.cli_target import (
generate_module_name_option,
generate_python_file_option,
generate_workspace_option,
Expand Down
8 changes: 5 additions & 3 deletions python_modules/dagster/dagster/_cli/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

from dagster import _check as check
from dagster._annotations import deprecated
from dagster._cli.job import apply_click_params
from dagster._cli.utils import get_possibly_temporary_instance_for_cli
from dagster._cli.workspace.cli_target import (
from dagster._cli.utils import (
ClickArgValue,
apply_click_params,
get_possibly_temporary_instance_for_cli,
)
from dagster._cli.workspace.cli_target import (
generate_grpc_server_target_options,
generate_module_name_option,
generate_python_file_option,
Expand Down
17 changes: 6 additions & 11 deletions python_modules/dagster/dagster/_cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import dagster._check as check
from dagster import __version__ as dagster_version
from dagster._cli.config_scaffolder import scaffold_job_config
from dagster._cli.utils import get_instance_for_cli, get_possibly_temporary_instance_for_cli
from dagster._cli.workspace.cli_target import (
WORKSPACE_TARGET_WARNING,
from dagster._cli.utils import (
ClickArgMapping,
ClickArgValue,
ClickOption,
get_instance_for_cli,
get_possibly_temporary_instance_for_cli,
)
from dagster._cli.workspace.cli_target import (
WORKSPACE_TARGET_WARNING,
get_code_location_from_workspace,
get_config_from_args,
get_job_python_origin_from_kwargs,
Expand Down Expand Up @@ -66,20 +68,13 @@
from dagster._utils.yaml_utils import dump_run_config_yaml

T = TypeVar("T")
T_Callable = TypeVar("T_Callable", bound=Callable[..., Any])


@click.group(name="job")
def job_cli():
"""Commands for working with Dagster jobs."""


def apply_click_params(command: T_Callable, *click_params: ClickOption) -> T_Callable:
for click_param in click_params:
command = click_param(command)
return command


@job_cli.command(
name="list",
help=f"List the jobs in a repository. {WORKSPACE_TARGET_WARNING}",
Expand Down
30 changes: 28 additions & 2 deletions python_modules/dagster/dagster/_cli/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
import logging
import os
import tempfile
from collections.abc import Iterator
from collections.abc import Iterator, Mapping
from contextlib import contextmanager
from typing import Optional
from typing import Any, Callable, Optional, TypeVar, Union

import tomli
from typing_extensions import TypeAlias

from dagster._core.instance import DagsterInstance, InstanceRef
from dagster._core.instance.config import is_dagster_home_set
from dagster._core.secrets.env_file import get_env_var_dict
from dagster._utils.env import environ

T_Callable = TypeVar("T_Callable", bound=Callable[..., Any])

ClickArgValue: TypeAlias = Union[str, tuple[str]]
ClickArgMapping: TypeAlias = Mapping[str, ClickArgValue]
ClickOption: TypeAlias = Callable[[T_Callable], T_Callable]


def apply_click_params(command: T_Callable, *click_params: ClickOption) -> T_Callable:
for click_param in click_params:
command = click_param(command)
return command


def has_pyproject_dagster_block(path: str) -> bool:
if not os.path.exists(path):
return False
with open(path, "rb") as f:
data = tomli.load(f)
if not isinstance(data, dict):
return False

return "dagster" in data.get("tool", {})


@contextmanager
def _inject_local_env_file(logger: logging.Logger) -> Iterator[None]:
Expand Down
89 changes: 33 additions & 56 deletions python_modules/dagster/dagster/_cli/workspace/cli_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar, Union, cast

import click
import tomli
from click import UsageError
from typing_extensions import Never, TypeAlias
from typing_extensions import Never

import dagster._check as check
from dagster._cli.utils import (
ClickArgMapping,
ClickOption,
apply_click_params,
has_pyproject_dagster_block,
)
from dagster._core.code_pointer import CodePointer
from dagster._core.definitions.reconstruct import repository_def_from_target_def
from dagster._core.definitions.repository_definition import RepositoryDefinition
Expand Down Expand Up @@ -51,30 +56,6 @@

T_Callable = TypeVar("T_Callable", bound=Callable[..., Any])

ClickArgValue: TypeAlias = Union[str, tuple[str]]
ClickArgMapping: TypeAlias = Mapping[str, ClickArgValue]
ClickOption: TypeAlias = Callable[[T_Callable], T_Callable]


def _raise_cli_usage_error(msg: Optional[str] = None) -> Never:
raise UsageError(
msg or "Invalid set of CLI arguments for loading repository/job. See --help for details."
)


def _check_cli_arguments_none(kwargs: ClickArgMapping, *keys: str) -> None:
for key in keys:
if kwargs.get(key):
_raise_cli_usage_error()


def are_all_keys_empty(kwargs: ClickArgMapping, keys: Iterable[str]) -> bool:
for key in keys:
if kwargs.get(key):
return False

return True


WORKSPACE_CLI_ARGS = (
"workspace",
Expand All @@ -90,20 +71,9 @@ def are_all_keys_empty(kwargs: ClickArgMapping, keys: Iterable[str]) -> bool:
)


def has_pyproject_dagster_block(path: str) -> bool:
if not os.path.exists(path):
return False
with open(path, "rb") as f:
data = tomli.load(f)
if not isinstance(data, dict):
return False

return "dagster" in data.get("tool", {})


def get_workspace_load_target(kwargs: ClickArgMapping) -> WorkspaceLoadTarget:
check.mapping_param(kwargs, "kwargs")
if are_all_keys_empty(kwargs, WORKSPACE_CLI_ARGS):
if _are_all_keys_empty(kwargs, WORKSPACE_CLI_ARGS):
if kwargs.get("empty_workspace"):
return EmptyWorkspaceTarget()
if has_pyproject_dagster_block("pyproject.toml"):
Expand Down Expand Up @@ -486,56 +456,40 @@ def generate_repository_identifier_options() -> Sequence[ClickOption]:

def python_job_config_option(*, command_name: str) -> Callable[[T_Callable], T_Callable]:
def wrap(f: T_Callable) -> T_Callable:
from dagster._cli.job import apply_click_params

return apply_click_params(f, generate_run_config_option(command_name))

return wrap


def python_job_target_options(f: T_Callable) -> T_Callable:
from dagster._cli.job import apply_click_params

return apply_click_params(f, *generate_python_job_target_options())


def workspace_target_options(f: T_Callable) -> T_Callable:
from dagster._cli.job import apply_click_params

return apply_click_params(f, *generate_workspace_target_options())


def job_workspace_target_options(f: T_Callable) -> T_Callable:
from dagster._cli.job import apply_click_params

return apply_click_params(f, *generate_workspace_target_options())


def grpc_server_origin_target_options(f: T_Callable) -> T_Callable:
from dagster._cli.job import apply_click_params

return apply_click_params(f, *generate_grpc_server_target_options())


def python_origin_target_options(f: T_Callable) -> T_Callable:
from dagster._cli.job import apply_click_params

return apply_click_params(
f, *generate_python_target_options(allow_multiple_python_targets=False)
)


def repository_target_options(f: T_Callable) -> T_Callable:
from dagster._cli.job import apply_click_params

return apply_click_params(
f, *generate_workspace_target_options(), *generate_repository_identifier_options()
)


def job_repository_target_options(f: T_Callable) -> T_Callable:
from dagster._cli.job import apply_click_params

options = [
*generate_workspace_target_options(),
*generate_repository_identifier_options(),
Expand All @@ -544,8 +498,6 @@ def job_repository_target_options(f: T_Callable) -> T_Callable:


def job_target_options(f: T_Callable) -> T_Callable:
from dagster._cli.job import apply_click_params

options = [
*generate_workspace_target_options(),
*generate_repository_identifier_options(),
Expand Down Expand Up @@ -855,3 +807,28 @@ def get_config_from_args(kwargs: Mapping[str, str]) -> Mapping[str, object]:
)
else:
check.failed("Unhandled case getting config from kwargs")


# ########################
# ##### HELPERS
# ########################


def _raise_cli_usage_error(msg: Optional[str] = None) -> Never:
raise UsageError(
msg or "Invalid set of CLI arguments for loading repository/job. See --help for details."
)


def _check_cli_arguments_none(kwargs: ClickArgMapping, *keys: str) -> None:
for key in keys:
if kwargs.get(key):
_raise_cli_usage_error()


def _are_all_keys_empty(kwargs: ClickArgMapping, keys: Iterable[str]) -> bool:
for key in keys:
if kwargs.get(key):
return False

return True
9 changes: 2 additions & 7 deletions python_modules/dagster/dagster/_daemon/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,8 @@
import click

from dagster import __version__ as dagster_version
from dagster._cli.utils import get_instance_for_cli
from dagster._cli.workspace.cli_target import (
ClickArgMapping,
ClickArgValue,
get_workspace_load_target,
workspace_target_options,
)
from dagster._cli.utils import ClickArgMapping, ClickArgValue, get_instance_for_cli
from dagster._cli.workspace.cli_target import get_workspace_load_target, workspace_target_options
from dagster._core.instance import DagsterInstance, InstanceRef
from dagster._core.telemetry import telemetry_wrapper
from dagster._daemon.controller import (
Expand Down

0 comments on commit 740488a

Please sign in to comment.