From a82982fb90af642a11baea237a3498e76a0154e1 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Sun, 2 Feb 2025 21:36:16 -0500 Subject: [PATCH] [cli-refactor] Type and validate python_pointer_opts at entry point --- python_modules/dagster/dagster/_cli/api.py | 71 +++--- python_modules/dagster/dagster/_cli/asset.py | 71 ++++-- .../dagster/dagster/_cli/code_server.py | 61 +++-- python_modules/dagster/dagster/_cli/job.py | 154 ++++++++---- python_modules/dagster/dagster/_cli/utils.py | 14 +- .../dagster/_cli/workspace/cli_target.py | 235 ++++++++---------- .../dagster-sigma/dagster_sigma/cli.py | 15 +- 7 files changed, 344 insertions(+), 277 deletions(-) diff --git a/python_modules/dagster/dagster/_cli/api.py b/python_modules/dagster/dagster/_cli/api.py index ac964dde73eeb..99cf0210c6e00 100644 --- a/python_modules/dagster/dagster/_cli/api.py +++ b/python_modules/dagster/dagster/_cli/api.py @@ -12,11 +12,8 @@ import dagster._check as check import dagster._seven as seven -from dagster._cli.utils import get_instance_for_cli -from dagster._cli.workspace.cli_target import ( - get_working_directory_from_kwargs, - python_pointer_options, -) +from dagster._cli.utils import assert_no_remaining_opts, get_instance_for_cli +from dagster._cli.workspace.cli_target import PythonPointerOpts, python_pointer_options from dagster._core.definitions.metadata import MetadataValue from dagster._core.errors import DagsterExecutionInterruptedError from dagster._core.events import DagsterEvent, DagsterEventType, EngineEventData @@ -577,6 +574,7 @@ def _execute_step_command_body( @click.option( "--heartbeat", is_flag=True, + default=False, help=( "If set, the GRPC server will shut itself down when it fails to receive a heartbeat " "after a timeout configurable with --heartbeat-timeout." @@ -601,7 +599,6 @@ def _execute_step_command_body( ), envvar="DAGSTER_LAZY_LOAD_USER_CODE", ) -@python_pointer_options @click.option( "--use-python-environment-entry-point", is_flag=True, @@ -699,26 +696,31 @@ def _execute_step_command_body( help="[INTERNAL] Retrieves current utilization metrics from GRPC server.", envvar="DAGSTER_ENABLE_SERVER_METRICS", ) +@python_pointer_options def grpc_command( port: Optional[int], socket: Optional[str], host: str, max_workers: Optional[int], - heartbeat: bool = False, - heartbeat_timeout: int = 30, - lazy_load_user_code: bool = False, - fixed_server_id: Optional[str] = None, - log_level: str = "INFO", - log_format: str = "colored", - use_python_environment_entry_point: Optional[bool] = False, - container_image: Optional[str] = None, - container_context: Optional[str] = None, - location_name: Optional[str] = None, - instance_ref=None, - inject_env_vars_from_instance: bool = False, + heartbeat: bool, + heartbeat_timeout: int, + lazy_load_user_code: bool, + use_python_environment_entry_point: bool, + empty_working_directory: bool, + fixed_server_id: Optional[str], + log_level: str, + log_format: str, + container_image: Optional[str], + container_context: Optional[str], + inject_env_vars_from_instance: bool, + location_name: Optional[str], + instance_ref: Optional[str], enable_metrics: bool = False, - **kwargs: Any, + **other_opts: Any, ) -> None: + python_pointer_opts = PythonPointerOpts.extract_from_cli_options(other_opts) + assert_no_remaining_opts(other_opts) + check.invariant(heartbeat_timeout > 0, "heartbeat_timeout must be greater than 0") check.invariant( @@ -743,31 +745,24 @@ def grpc_command( loadable_target_origin = None if any( - kwargs[key] - for key in [ - "attribute", - "working_directory", - "module_name", - "package_name", - "python_file", - "empty_working_directory", + [ + python_pointer_opts.attribute, + python_pointer_opts.working_directory, + python_pointer_opts.module_name, + python_pointer_opts.package_name, + python_pointer_opts.python_file, + empty_working_directory, ] ): # in the gRPC api CLI we never load more than one module or python file at a time - module_name = check.opt_str_elem(kwargs, "module_name") - python_file = check.opt_str_elem(kwargs, "python_file") loadable_target_origin = LoadableTargetOrigin( executable_path=sys.executable, - attribute=kwargs["attribute"], - working_directory=( - None - if kwargs.get("empty_working_directory") - else get_working_directory_from_kwargs(kwargs) - ), - module_name=module_name, - python_file=python_file, - package_name=kwargs["package_name"], + attribute=python_pointer_opts.attribute, + working_directory=python_pointer_opts.normalized_working_directory, + module_name=python_pointer_opts.module_name, + python_file=python_pointer_opts.python_file, + package_name=python_pointer_opts.package_name, ) code_desc = " " diff --git a/python_modules/dagster/dagster/_cli/asset.py b/python_modules/dagster/dagster/_cli/asset.py index 2508a84962919..700401f726e56 100644 --- a/python_modules/dagster/dagster/_cli/asset.py +++ b/python_modules/dagster/dagster/_cli/asset.py @@ -1,12 +1,17 @@ -from collections.abc import Mapping +from typing import Optional import click import dagster._check as check -from dagster._cli.job import get_config_from_args -from dagster._cli.utils import get_instance_for_cli, get_possibly_temporary_instance_for_cli +from dagster._cli.job import get_run_config_from_cli_opts +from dagster._cli.utils import ( + assert_no_remaining_opts, + get_instance_for_cli, + get_possibly_temporary_instance_for_cli, +) from dagster._cli.workspace.cli_target import ( - get_repository_python_origin_from_kwargs, + PythonPointerOpts, + get_repository_python_origin_from_cli_opts, python_pointer_options, run_config_option, ) @@ -32,7 +37,6 @@ def asset_cli(): @asset_cli.command(name="materialize", help="Execute a run to materialize a selection of assets") -@python_pointer_options @click.option("--select", help="Comma-separated Asset selection to target", required=True) @click.option("--partition", help="Asset partition to target", required=False) @click.option( @@ -40,29 +44,56 @@ def asset_cli(): help="Asset partition range to target i.e. ...", required=False, ) -@run_config_option(name="config", command_name="materialize") @click.option( "--config-json", type=click.STRING, help="JSON string of run config to use for this job run. Cannot be used with -c / --config.", ) -def asset_materialize_command(**kwargs): +@run_config_option(name="config", command_name="materialize") +@python_pointer_options +def asset_materialize_command( + select: str, + partition: Optional[str], + partition_range: Optional[str], + config: tuple[str, ...], + config_json: Optional[str], + **other_opts: object, +) -> None: + python_pointer_opts = PythonPointerOpts.extract_from_cli_options(other_opts) + assert_no_remaining_opts(other_opts) + with capture_interrupts(): with get_possibly_temporary_instance_for_cli( "``dagster asset materialize``", ) as instance: - execute_materialize_command(instance, kwargs) + execute_materialize_command( + instance, + select, + partition, + partition_range, + config, + config_json, + python_pointer_opts, + ) @telemetry_wrapper -def execute_materialize_command(instance: DagsterInstance, kwargs: Mapping[str, str]) -> None: - config = get_config_from_args(kwargs) - repository_origin = get_repository_python_origin_from_kwargs(kwargs) +def execute_materialize_command( + instance: DagsterInstance, + select: str, + partition: Optional[str], + partition_range: Optional[str], + config: tuple[str, ...], + config_json: Optional[str], + python_pointer_opts: PythonPointerOpts, +) -> None: + run_config = get_run_config_from_cli_opts(config, config_json) + repository_origin = get_repository_python_origin_from_cli_opts(python_pointer_opts) recon_repo = recon_repository_from_origin(repository_origin) repo_def = recon_repo.get_definition() - asset_selection = AssetSelection.from_coercible(kwargs["select"].split(",")) + asset_selection = AssetSelection.from_coercible(select.split(",")) asset_keys = asset_selection.resolve(repo_def.asset_graph) implicit_job_def = repo_def.get_implicit_job_def_for_assets(asset_keys) @@ -77,8 +108,6 @@ def execute_materialize_command(instance: DagsterInstance, kwargs: Mapping[str, reconstructable_job = recon_job_from_origin( JobPythonOrigin(implicit_job_def.name, repository_origin=repository_origin) ) - partition = kwargs.get("partition") - partition_range = kwargs.get("partition_range") if partition and partition_range: check.failed("Cannot specify both --partition and --partition-range options. Use only one.") @@ -150,23 +179,25 @@ def execute_materialize_command(instance: DagsterInstance, kwargs: Mapping[str, asset_selection=list(asset_keys), instance=instance, tags=tags, - run_config=config, + run_config=run_config, ) if not result.success: raise click.ClickException("Materialization failed.") @asset_cli.command(name="list", help="List assets") -@python_pointer_options @click.option("--select", help="Asset selection to target", required=False) -def asset_list_command(**kwargs): - repository_origin = get_repository_python_origin_from_kwargs(kwargs) +@python_pointer_options +def asset_list_command(select: Optional[str], **other_opts): + python_pointer_opts = PythonPointerOpts.extract_from_cli_options(other_opts) + assert_no_remaining_opts(other_opts) + + repository_origin = get_repository_python_origin_from_cli_opts(python_pointer_opts) recon_repo = recon_repository_from_origin(repository_origin) repo_def = recon_repo.get_definition() - select = kwargs.get("select") if select is not None: - asset_selection = AssetSelection.from_coercible(kwargs["select"].split(",")) + asset_selection = AssetSelection.from_coercible(select.split(",")) else: asset_selection = AssetSelection.all() diff --git a/python_modules/dagster/dagster/_cli/code_server.py b/python_modules/dagster/dagster/_cli/code_server.py index b320832542d99..93a6e39683241 100644 --- a/python_modules/dagster/dagster/_cli/code_server.py +++ b/python_modules/dagster/dagster/_cli/code_server.py @@ -7,12 +7,9 @@ import click -import dagster._check as check import dagster._seven as seven -from dagster._cli.workspace.cli_target import ( - get_working_directory_from_kwargs, - python_pointer_options, -) +from dagster._cli.utils import assert_no_remaining_opts +from dagster._cli.workspace.cli_target import PythonPointerOpts, python_pointer_options from dagster._core.instance import InstanceRef from dagster._core.types.loadable_target_origin import LoadableTargetOrigin from dagster._core.utils import FuturesAwareThreadPoolExecutor @@ -62,11 +59,9 @@ def code_server_cli(): "-n", type=click.INT, required=False, - default=None, help="Maximum number of (threaded) workers to use in the code server", envvar="DAGSTER_CODE_SERVER_MAX_WORKERS", ) -@python_pointer_options @click.option( "--use-python-environment-entry-point", is_flag=True, @@ -149,6 +144,7 @@ def code_server_cli(): @click.option( "--heartbeat", is_flag=True, + default=False, help=( "If set, the GRPC server will shut itself down when it fails to receive a heartbeat " "after a timeout configurable with --heartbeat-timeout." @@ -168,28 +164,32 @@ def code_server_cli(): help="[INTERNAL] Serialized InstanceRef to use for accessing the instance", envvar="DAGSTER_INSTANCE_REF", ) +@python_pointer_options def start_command( - port: Optional[int] = None, - socket: Optional[str] = None, - host: str = "localhost", - max_workers: Optional[int] = None, - fixed_server_id: Optional[str] = None, - log_level: str = "INFO", - log_format: str = "colored", - use_python_environment_entry_point: bool = False, - container_image: Optional[str] = None, - container_context: Optional[str] = None, - location_name: Optional[str] = None, - inject_env_vars_from_instance: bool = False, - startup_timeout: int = 0, - heartbeat: bool = False, - heartbeat_timeout: int = DEFAULT_HEARTBEAT_TIMEOUT, - instance_ref=None, - **kwargs, + port: Optional[int], + socket: Optional[str], + host: str, + max_workers: Optional[int], + use_python_environment_entry_point: bool, + fixed_server_id: Optional[str], + log_level: str, + log_format: str, + container_image: Optional[str], + container_context: Optional[str], + inject_env_vars_from_instance: bool, + location_name: Optional[str], + startup_timeout: int, + heartbeat: bool, + heartbeat_timeout, + instance_ref: Optional[str], + **other_opts, ): from dagster._grpc import DagsterGrpcServer from dagster._grpc.proxy_server import DagsterProxyApiServicer + python_pointer_opts = PythonPointerOpts.extract_from_cli_options(other_opts) + assert_no_remaining_opts(other_opts) + if seven.IS_WINDOWS and port is None: raise click.UsageError( "You must pass a valid --port/-p on Windows: --socket/-s not supported." @@ -205,16 +205,13 @@ def start_command( container_image = container_image or os.getenv("DAGSTER_CURRENT_IMAGE") # in the gRPC api CLI we never load more than one module or python file at a time - module_name = check.opt_str_elem(kwargs, "module_name") - python_file = check.opt_str_elem(kwargs, "python_file") - loadable_target_origin = LoadableTargetOrigin( executable_path=sys.executable if use_python_environment_entry_point else None, - attribute=kwargs["attribute"], - working_directory=get_working_directory_from_kwargs(kwargs), - module_name=module_name, - python_file=python_file, - package_name=kwargs["package_name"], + attribute=python_pointer_opts.attribute, + working_directory=python_pointer_opts.normalized_working_directory, + module_name=python_pointer_opts.module_name, + python_file=python_pointer_opts.python_file, + package_name=python_pointer_opts.package_name, ) code_desc = " " diff --git a/python_modules/dagster/dagster/_cli/job.py b/python_modules/dagster/dagster/_cli/job.py index dd8e3b2c07738..5047a8d9d2cbc 100644 --- a/python_modules/dagster/dagster/_cli/job.py +++ b/python_modules/dagster/dagster/_cli/job.py @@ -12,19 +12,21 @@ from dagster._cli.config_scaffolder import scaffold_job_config from dagster._cli.utils import ( ClickArgMapping, - ClickArgValue, + assert_no_remaining_opts, get_instance_for_cli, get_possibly_temporary_instance_for_cli, + serialize_sorted_quoted, ) from dagster._cli.workspace.cli_target import ( WORKSPACE_TARGET_WARNING, + PythonPointerOpts, get_code_location_from_workspace, - get_config_from_args, - get_job_python_origin_from_kwargs, get_remote_job_from_kwargs, get_remote_job_from_remote_repo, get_remote_repository_from_code_location, get_remote_repository_from_kwargs, + get_repository_python_origin_from_cli_opts, + get_run_config_from_cli_opts, get_run_config_from_file_list, get_workspace_from_kwargs, job_name_option, @@ -43,6 +45,7 @@ from dagster._core.execution.execution_result import ExecutionResult from dagster._core.execution.job_backfill import create_backfill_run from dagster._core.instance import DagsterInstance +from dagster._core.origin import JobPythonOrigin from dagster._core.remote_representation import ( CodeLocation, RemoteJob, @@ -62,7 +65,7 @@ from dagster._time import get_current_timestamp from dagster._utils import DEFAULT_WORKSPACE_YAML_FILENAME, PrintFn from dagster._utils.error import serializable_error_info_from_exc_info -from dagster._utils.hosted_user_process import recon_job_from_origin +from dagster._utils.hosted_user_process import recon_job_from_origin, recon_repository_from_origin from dagster._utils.indenting_printer import IndentingPrinter from dagster._utils.interrupts import capture_interrupts from dagster._utils.merger import merge_dicts @@ -241,14 +244,10 @@ def print_op( instructions=get_job_in_same_python_env_instructions("execute") ), ) -@python_pointer_options -@repository_name_option(name="repository") -@job_name_option(name="job_name") -@run_config_option(name="config", command_name="execute") @click.option("--tags", type=click.STRING, help="JSON string of tags to use for this job run") @click.option( - "-o", "--op-selection", + "-o", type=click.STRING, help=( "Specify the op subselection to execute. It can be multiple clauses separated by commas." @@ -261,26 +260,55 @@ def print_op( ' ancestors, "other_op_a" itself, and "other_op_b" and its direct child ops' ), ) -def job_execute_command(**kwargs: ClickArgValue): +@repository_name_option(name="repository") +@job_name_option(name="job_name") +@run_config_option(name="config", command_name="execute") +@python_pointer_options +def job_execute_command( + tags: Optional[str], + op_selection: Optional[str], + repository: Optional[str], + job_name: Optional[str], + config: tuple[str, ...], + **other_opts: object, +): + python_pointer_opts = PythonPointerOpts.extract_from_cli_options(other_opts) + assert_no_remaining_opts(other_opts) + with capture_interrupts(): with get_possibly_temporary_instance_for_cli("``dagster job execute``") as instance: - execute_execute_command(instance, kwargs) + execute_execute_command( + instance, + tags, + op_selection, + repository, + job_name, + config, + python_pointer_opts, + ) @telemetry_wrapper -def execute_execute_command(instance: DagsterInstance, kwargs: ClickArgMapping) -> ExecutionResult: +def execute_execute_command( + instance: DagsterInstance, + tags: Optional[str], + op_selection: Optional[str], + repository: Optional[str], + job_name: Optional[str], + config: tuple[str, ...], + python_pointer_opts: PythonPointerOpts, +) -> ExecutionResult: check.inst_param(instance, "instance", DagsterInstance) - config = list( - check.opt_tuple_param(cast(tuple[str, ...], kwargs.get("config")), "config", of_type=str) - ) - - tags = get_tags_from_args(kwargs) + config_files = list(config) + normalized_tags = _normalize_cli_tags(tags) + normalized_op_selection = _normalize_cli_op_selection(op_selection) - job_origin = get_job_python_origin_from_kwargs(kwargs) + job_origin = _get_job_python_origin_from_cli_opts(python_pointer_opts, repository, job_name) recon_job = recon_job_from_origin(job_origin) - op_selection = get_op_selection_from_args(kwargs) - result = do_execute_command(recon_job, instance, config, tags, op_selection) + result = do_execute_command( + recon_job, instance, config_files, normalized_tags, normalized_op_selection + ) if not result.success: raise click.ClickException(f"Run {result.run_id} resulted in failure.") @@ -288,33 +316,59 @@ def execute_execute_command(instance: DagsterInstance, kwargs: ClickArgMapping) return result -def get_tags_from_args(kwargs: ClickArgMapping) -> Mapping[str, str]: - if kwargs.get("tags") is None: +def _normalize_cli_tags(tags: Optional[str]) -> Mapping[str, str]: + if tags is None: return {} try: - tags = json.loads(check.str_elem(kwargs, "tags")) + tags = json.loads(tags) return check.is_dict(tags, str, str) except JSONDecodeError as e: raise click.UsageError( - "Invalid JSON-string given for `--tags`: {}\n\n{}".format( - kwargs.get("tags"), + "Invalid JSON-string given for `--tags`: {}\n\n{}".format( # noqa: UP032 + tags, serializable_error_info_from_exc_info(sys.exc_info()).to_string(), ) ) from e -def get_op_selection_from_args(kwargs: ClickArgMapping) -> Optional[Sequence[str]]: - op_selection_str = kwargs.get("op_selection") - if not isinstance(op_selection_str, str): - return None +def _get_job_python_origin_from_cli_opts( + python_pointer_opts: PythonPointerOpts, repository_name: Optional[str], job_name: Optional[str] +) -> JobPythonOrigin: + repository_origin = get_repository_python_origin_from_cli_opts( + python_pointer_opts, repository_name + ) - return [ele.strip() for ele in op_selection_str.split(",")] if op_selection_str else None + recon_repo = recon_repository_from_origin(repository_origin) + repo_definition = recon_repo.get_definition() + + job_names = set(repo_definition.job_names) # job (all) vs job (non legacy) + + if job_name is None and len(job_names) == 1: + job_name = next(iter(job_names)) + elif job_name is None: + raise click.UsageError( + "Must provide --job as there is more than one job " + f"in {repo_definition.name}. Options are: {serialize_sorted_quoted(job_names)}." + ) + elif job_name not in job_names: + raise click.UsageError( + f'Job "{job_name}" not found in repository "{repo_definition.name}" ' + f"Found {serialize_sorted_quoted(job_names)} instead." + ) + + return JobPythonOrigin(job_name, repository_origin=repository_origin) + + +def _normalize_cli_op_selection(op_selection: Optional[str]) -> Optional[Sequence[str]]: + if not op_selection: + return None + return [ele.strip() for ele in op_selection.split(",")] def do_execute_command( recon_job: ReconstructableJob, instance: DagsterInstance, - config: Optional[Sequence[str]], + config: list[str], tags: Optional[Mapping[str, str]] = None, op_selection: Optional[Sequence[str]] = None, ) -> ExecutionResult: @@ -362,7 +416,9 @@ def execute_launch_command( ) -> DagsterRun: preset = cast(Optional[str], kwargs.get("preset")) check.inst_param(instance, "instance", DagsterInstance) - config = get_config_from_args(kwargs) + config = get_run_config_from_cli_opts( + check.is_tuple(kwargs["config"], of_type=str), kwargs.get("config_json") + ) with get_workspace_from_kwargs(instance, version=dagster_version, kwargs=kwargs) as workspace: code_location = get_code_location_from_workspace(workspace, kwargs.get("location")) @@ -384,9 +440,9 @@ def execute_launch_command( if preset and config: raise click.UsageError("Can not use --preset with -c / --config / --config-json.") - run_tags = get_tags_from_args(kwargs) + run_tags = _normalize_cli_tags(kwargs.get("tags")) - op_selection = get_op_selection_from_args(kwargs) + op_selection = _normalize_cli_op_selection(kwargs.get("op_selection")) dagster_run = _create_run( instance=instance, @@ -496,19 +552,31 @@ def _check_execute_remote_job_args( instructions=get_job_in_same_python_env_instructions("scaffold_config") ), ) -@python_pointer_options +@click.option("--print-only-required", default=False, is_flag=True) @repository_name_option(name="repository") @job_name_option(name="job_name") -@click.option("--print-only-required", default=False, is_flag=True) -def job_scaffold_command(**kwargs): - execute_scaffold_command(kwargs, click.echo) +@python_pointer_options +def job_scaffold_command( + print_only_required: bool, repository: Optional[str], job_name: Optional[str], **other_opts +): + python_pointer_opts = PythonPointerOpts.extract_from_cli_options(other_opts) + assert_no_remaining_opts(other_opts) + + execute_scaffold_command( + print_only_required, repository, job_name, python_pointer_opts, click.echo + ) -def execute_scaffold_command(cli_args, print_fn): - job_origin = get_job_python_origin_from_kwargs(cli_args) +def execute_scaffold_command( + print_only_required: bool, + repository: Optional[str], + job_name: Optional[str], + python_pointer_opts: PythonPointerOpts, + print_fn: PrintFn, +) -> None: + job_origin = _get_job_python_origin_from_cli_opts(python_pointer_opts, repository, job_name) job = recon_job_from_origin(job_origin) - skip_non_required = cli_args["print_only_required"] - do_scaffold_command(job.get_definition(), print_fn, skip_non_required) + do_scaffold_command(job.get_definition(), print_fn, print_only_required) def do_scaffold_command( @@ -610,7 +678,7 @@ def _execute_backfill_command_at_location( if not job_partition_set: raise click.UsageError(f"Job `{remote_job.name}` is not partitioned.") - run_tags = get_tags_from_args(cli_args) + run_tags = _normalize_cli_tags(cli_args.get("tags")) # type: ignore repo_handle = RepositoryHandle.from_location( repository_name=repo.name, diff --git a/python_modules/dagster/dagster/_cli/utils.py b/python_modules/dagster/dagster/_cli/utils.py index a094fd50cc89c..2da243fd73b39 100644 --- a/python_modules/dagster/dagster/_cli/utils.py +++ b/python_modules/dagster/dagster/_cli/utils.py @@ -1,13 +1,14 @@ import logging import os import tempfile -from collections.abc import Iterator, Mapping +from collections.abc import Iterable, Iterator, Mapping from contextlib import contextmanager from typing import Any, Callable, Optional, TypeVar, Union import tomli from typing_extensions import TypeAlias +import dagster._check as check from dagster._core.instance import DagsterInstance, InstanceRef from dagster._core.instance.config import is_dagster_home_set from dagster._core.secrets.env_file import get_env_var_dict @@ -133,3 +134,14 @@ def get_instance_for_cli( else: with DagsterInstance.get() as instance: yield instance + + +def assert_no_remaining_opts(opts: Mapping[str, object]) -> None: + if opts: + check.failed( + f"Unexpected options remaining: {list(opts.keys())}. Ensure that all options are extracted." + ) + + +def serialize_sorted_quoted(strings: Iterable[str]) -> str: + return "[" + ", ".join([f"'{s}'" for s in sorted(list(strings))]) + "]" diff --git a/python_modules/dagster/dagster/_cli/workspace/cli_target.py b/python_modules/dagster/dagster/_cli/workspace/cli_target.py index 45b610ab018c0..4fff6683ca491 100644 --- a/python_modules/dagster/dagster/_cli/workspace/cli_target.py +++ b/python_modules/dagster/dagster/_cli/workspace/cli_target.py @@ -6,7 +6,7 @@ import click from click import UsageError -from typing_extensions import Never +from typing_extensions import Never, Self import dagster._check as check from dagster._cli.utils import ( @@ -14,16 +14,12 @@ ClickOption, apply_click_params, has_pyproject_dagster_block, + serialize_sorted_quoted, ) from dagster._core.code_pointer import CodePointer from dagster._core.definitions.reconstruct import repository_def_from_target_def -from dagster._core.definitions.repository_definition import RepositoryDefinition from dagster._core.instance import DagsterInstance -from dagster._core.origin import ( - DEFAULT_DAGSTER_ENTRY_POINT, - JobPythonOrigin, - RepositoryPythonOrigin, -) +from dagster._core.origin import DEFAULT_DAGSTER_ENTRY_POINT, RepositoryPythonOrigin from dagster._core.remote_representation.code_location import CodeLocation from dagster._core.remote_representation.external import RemoteRepository from dagster._core.workspace.context import WorkspaceRequestContext @@ -39,9 +35,9 @@ WorkspaceLoadTarget, ) from dagster._grpc.utils import get_loadable_targets +from dagster._record import record from dagster._seven import JSONDecodeError, json from dagster._utils.error import serializable_error_info_from_exc_info -from dagster._utils.hosted_user_process import recon_repository_from_origin from dagster._utils.yaml_utils import load_yaml_from_glob_list if TYPE_CHECKING: @@ -474,125 +470,101 @@ def job_options(f: T_Callable) -> T_Callable: ) -def get_job_python_origin_from_kwargs(kwargs: ClickArgMapping) -> JobPythonOrigin: - repository_origin = get_repository_python_origin_from_kwargs(kwargs) - provided_name = kwargs.get("job_name") - - recon_repo = recon_repository_from_origin(repository_origin) - repo_definition = recon_repo.get_definition() +# ######################## +# ##### VALUE OBJECTS +# ######################## - job_names = set(repo_definition.job_names) # job (all) vs job (non legacy) - if provided_name is None and len(job_names) == 1: - job_name = next(iter(job_names)) - elif provided_name is None: - raise click.UsageError( - "Must provide --job as there is more than one job " - f"in {repo_definition.name}. Options are: {_sorted_quoted(job_names)}." - ) - elif provided_name not in job_names: - raise click.UsageError( - f'Job "{provided_name}" not found in repository "{repo_definition.name}" ' - f"Found {_sorted_quoted(job_names)} instead." +@record +class PythonPointerOpts: + python_file: Optional[str] = None + module_name: Optional[str] = None + package_name: Optional[str] = None + working_directory: Optional[str] = None + attribute: Optional[str] = None + + @classmethod + def extract_from_cli_options(cls, cli_options: dict[str, object]) -> Self: + return cls( + python_file=check.opt_inst(cli_options.pop("python_file", None), str), + module_name=check.opt_inst(cli_options.pop("module_name", None), str), + package_name=check.opt_inst(cli_options.pop("package_name", None), str), + working_directory=check.opt_inst(cli_options.pop("working_directory", None), str), + attribute=check.opt_inst(cli_options.pop("attribute", None), str), ) - else: - job_name = provided_name - - return JobPythonOrigin(job_name, repository_origin=repository_origin) - - -def _get_code_pointer_dict_from_kwargs(kwargs: ClickArgMapping) -> Mapping[str, CodePointer]: - python_file = check.opt_str_elem(kwargs, "python_file") - module_name = check.opt_str_elem(kwargs, "module_name") - package_name = check.opt_str_elem(kwargs, "package_name") - working_directory = get_working_directory_from_kwargs(kwargs) - attribute = check.opt_str_elem(kwargs, "attribute") - if python_file: - _check_cli_arguments_none(kwargs, "module_name", "package_name") - return { - cast( - RepositoryDefinition, - repository_def_from_target_def( - loadable_target.target_definition, - ), - ).name: CodePointer.from_python_file( - python_file, loadable_target.attribute, working_directory - ) - for loadable_target in get_loadable_targets( - python_file, module_name, package_name, working_directory, attribute - ) - } - elif module_name: - _check_cli_arguments_none(kwargs, "python_file", "package_name") - return { - cast( - RepositoryDefinition, - repository_def_from_target_def( - loadable_target.target_definition, - ), - ).name: CodePointer.from_module( - module_name, loadable_target.attribute, working_directory - ) - for loadable_target in get_loadable_targets( - python_file, module_name, package_name, working_directory, attribute + + @property + def normalized_working_directory(self) -> str: + return self.working_directory or os.getcwd() + + +def _get_code_pointer_dict_from_python_pointer_opts( + params: PythonPointerOpts, +) -> Mapping[str, CodePointer]: + loadable_targets = get_loadable_targets( + params.python_file, + params.module_name, + params.package_name, + params.working_directory, + params.attribute, + ) + + # repository_name -> code_pointer + code_pointer_dict: dict[str, CodePointer] = {} + for loadable_target in loadable_targets: + repo_def = check.not_none(repository_def_from_target_def(loadable_target.target_definition)) + if params.python_file: + code_pointer = CodePointer.from_python_file( + params.python_file, loadable_target.attribute, params.working_directory ) - } - elif package_name: - _check_cli_arguments_none(kwargs, "module_name", "python_file") - return { - cast( - RepositoryDefinition, - repository_def_from_target_def( - loadable_target.target_definition, - ), - ).name: CodePointer.from_python_package( - package_name, loadable_target.attribute, working_directory + elif params.module_name: + code_pointer = CodePointer.from_module( + params.module_name, loadable_target.attribute, params.working_directory ) - for loadable_target in get_loadable_targets( - python_file, module_name, package_name, working_directory, attribute + elif params.package_name: + code_pointer = CodePointer.from_python_package( + params.package_name, loadable_target.attribute, params.working_directory ) - } - else: - check.failed("Must specify a Python file or module name") + else: + check.failed("Must specify a Python file or module name") + + code_pointer_dict[repo_def.name] = code_pointer + + return code_pointer_dict def get_working_directory_from_kwargs(kwargs: ClickArgMapping) -> Optional[str]: return check.opt_str_elem(kwargs, "working_directory") or os.getcwd() -def get_repository_python_origin_from_kwargs(kwargs: ClickArgMapping) -> RepositoryPythonOrigin: - provided_repo_name = check.opt_str_elem(kwargs, "repository") - - if not (kwargs.get("python_file") or kwargs.get("module_name") or kwargs.get("package_name")): - raise click.UsageError("Must specify a python file or module name") +def get_repository_python_origin_from_cli_opts( + params: PythonPointerOpts, repo_name: Optional[str] = None +) -> RepositoryPythonOrigin: + if sum([bool(x) for x in (params.python_file, params.module_name, params.package_name)]) != 1: + _raise_cli_usage_error() # Short-circuit the case where an attribute and no repository name is passed in, # giving us enough information to return an origin without loading any target # definitions - we may need to return an origin for a non-existent repository # (e.g. to log an origin ID for an error message) - if kwargs.get("attribute") and not provided_repo_name: - if kwargs.get("python_file"): - _check_cli_arguments_none(kwargs, "module_name", "package_name") - python_file = check.str_elem(kwargs, "python_file") + if params.attribute and not repo_name: + if params.python_file: code_pointer: CodePointer = CodePointer.from_python_file( - python_file, - check.str_elem(kwargs, "attribute"), - get_working_directory_from_kwargs(kwargs), + params.python_file, + params.attribute, + params.normalized_working_directory, ) - elif kwargs.get("module_name"): - _check_cli_arguments_none(kwargs, "python_file", "package_name") - module_name = check.str_elem(kwargs, "module_name") + elif params.module_name: code_pointer = CodePointer.from_module( - module_name, - check.str_elem(kwargs, "attribute"), - get_working_directory_from_kwargs(kwargs), + params.module_name, + params.attribute, + params.normalized_working_directory, ) - elif kwargs.get("package_name"): - _check_cli_arguments_none(kwargs, "python_file", "module_name") + elif params.package_name: code_pointer = CodePointer.from_python_package( - check.str_elem(kwargs, "package_name"), - check.str_elem(kwargs, "attribute"), - get_working_directory_from_kwargs(kwargs), + params.package_name, + params.attribute, + params.normalized_working_directory, ) else: check.failed("Must specify a Python file or module name") @@ -602,21 +574,21 @@ def get_repository_python_origin_from_kwargs(kwargs: ClickArgMapping) -> Reposit entry_point=DEFAULT_DAGSTER_ENTRY_POINT, ) - code_pointer_dict = _get_code_pointer_dict_from_kwargs(kwargs) - found_repo_names = _sorted_quoted(code_pointer_dict.keys()) - if provided_repo_name is None and len(code_pointer_dict) == 1: + code_pointer_dict = _get_code_pointer_dict_from_python_pointer_opts(params) + found_repo_names = serialize_sorted_quoted(code_pointer_dict.keys()) + if repo_name is None and len(code_pointer_dict) == 1: code_pointer = next(iter(code_pointer_dict.values())) - elif provided_repo_name is None: + elif repo_name is None: raise click.UsageError( "Must provide --repository as there is more than one repository. " f"Options are: {found_repo_names}." ) - elif provided_repo_name not in code_pointer_dict: + elif repo_name not in code_pointer_dict: raise click.UsageError( - f'Repository "{provided_repo_name}" not found. Found {found_repo_names} instead.' + f'Repository "{repo_name}" not found. Found {found_repo_names} instead.' ) else: - code_pointer = code_pointer_dict[provided_repo_name] + code_pointer = code_pointer_dict[repo_name] return RepositoryPythonOrigin( executable_path=sys.executable, @@ -647,13 +619,13 @@ def get_code_location_from_workspace( elif provided_location_name is None: raise click.UsageError( "Must provide --location as there are multiple locations " - f"available. Options are: {_sorted_quoted(workspace.code_location_names)}" + f"available. Options are: {serialize_sorted_quoted(workspace.code_location_names)}" ) if provided_location_name not in workspace.code_location_names: raise click.UsageError( f'Location "{provided_location_name}" not found in workspace. ' - f"Found {_sorted_quoted(workspace.code_location_names)} instead." + f"Found {serialize_sorted_quoted(workspace.code_location_names)} instead." ) if workspace.has_code_location_error(provided_location_name): @@ -680,13 +652,13 @@ def get_remote_repository_from_code_location( if provided_repo_name is None: raise click.UsageError( "Must provide --repository as there is more than one repository " - f"in {code_location.name}. Options are: {_sorted_quoted(repo_dict.keys())}." + f"in {code_location.name}. Options are: {serialize_sorted_quoted(repo_dict.keys())}." ) if not code_location.has_repository(provided_repo_name): raise click.UsageError( f'Repository "{provided_repo_name}" not found in location "{code_location.name}". ' - f"Found {_sorted_quoted(repo_dict.keys())} instead." + f"Found {serialize_sorted_quoted(repo_dict.keys())} instead." ) return code_location.get_repository(provided_repo_name) @@ -720,13 +692,13 @@ def get_remote_job_from_remote_repo( if provided_name is None: raise click.UsageError( "Must provide --job as there is more than one job " - f"in {remote_repo.name}. Options are: {_sorted_quoted(remote_jobs.keys())}." + f"in {remote_repo.name}. Options are: {serialize_sorted_quoted(remote_jobs.keys())}." ) if provided_name not in remote_jobs: raise click.UsageError( f'Job "{provided_name}" not found in repository "{remote_repo.name}". ' - f"Found {_sorted_quoted(remote_jobs.keys())} instead." + f"Found {serialize_sorted_quoted(remote_jobs.keys())} instead." ) return remote_jobs[provided_name] @@ -741,34 +713,23 @@ def get_remote_job_from_kwargs(instance: DagsterInstance, version: str, kwargs: yield get_remote_job_from_remote_repo(repo, provided_name) -def _sorted_quoted(strings: Iterable[str]) -> str: - return "[" + ", ".join([f"'{s}'" for s in sorted(list(strings))]) + "]" - - -def get_run_config_from_file_list(file_list: Optional[Sequence[str]]) -> Mapping[str, object]: +def get_run_config_from_file_list(file_list: list[str]) -> Mapping[str, object]: check.opt_sequence_param(file_list, "file_list", of_type=str) return cast(Mapping[str, object], load_yaml_from_glob_list(file_list) if file_list else {}) -def get_config_from_args(kwargs: Mapping[str, str]) -> Mapping[str, object]: - config = cast(tuple[str, ...], kwargs.get("config")) # files - config_json = kwargs.get("config_json") - - if not config and not config_json: +def get_run_config_from_cli_opts( + config_files: tuple[str, ...], config_json: Optional[str] +) -> Mapping[str, object]: + if not (config_files or config_json): return {} - - elif config and config_json: + elif config_files and config_json: raise click.UsageError("Cannot specify both -c / --config and --config-json") - - elif config: - config_file_list = list(check.opt_tuple_param(config, "config", of_type=str)) - return get_run_config_from_file_list(config_file_list) - + elif config_files: + return get_run_config_from_file_list(list(config_files)) elif config_json: - config_json = cast(str, config_json) try: return json.loads(config_json) - except JSONDecodeError: raise click.UsageError( f"Invalid JSON-string given for `--config-json`: {config_json}\n\n{serializable_error_info_from_exc_info(sys.exc_info()).to_string()}" diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/cli.py b/python_modules/libraries/dagster-sigma/dagster_sigma/cli.py index 3811d75fbbed0..917d05c0eebdc 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/cli.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/cli.py @@ -1,7 +1,9 @@ import click from dagster import _check as check +from dagster._cli.utils import assert_no_remaining_opts from dagster._cli.workspace.cli_target import ( - get_repository_python_origin_from_kwargs, + PythonPointerOpts, + get_repository_python_origin_from_cli_opts, python_pointer_options, ) from dagster._core.definitions.definitions_load_context import ( @@ -24,9 +26,12 @@ def app(): @app.command(name="snapshot", help="Snapshot sigma instance data") -@python_pointer_options @click.option("--output-path", "-o", help="Path to save the snapshot to", required=True) -def sigma_snapshot_command(**kwargs) -> None: +@python_pointer_options +def sigma_snapshot_command(output_path: str, **other_opts: object) -> None: + python_pointer_opts = PythonPointerOpts.extract_from_cli_options(other_opts) + assert_no_remaining_opts(other_opts) + experimental_warning("The `dagster-sigma snapshot` command") with environ({SNAPSHOT_ENV_VAR_NAME: "1"}): DefinitionsLoadContext.set( @@ -34,8 +39,7 @@ def sigma_snapshot_command(**kwargs) -> None: load_type=DefinitionsLoadType.INITIALIZATION, repository_load_data=None ) ) - - repository_origin = get_repository_python_origin_from_kwargs(kwargs) + repository_origin = get_repository_python_origin_from_cli_opts(python_pointer_opts) pending_data = DefinitionsLoadContext.get().get_pending_reconstruction_metadata() load_data = ( @@ -56,6 +60,5 @@ def sigma_snapshot_command(**kwargs) -> None: raise click.UsageError("No Sigma data found in the repository") click.echo(f"Saving {len(load_data.reconstruction_metadata)} cached Sigma data") - output_path = kwargs["output_path"] with open(output_path, "w") as file: file.write(serialize_value(load_data))