Skip to content

Commit

Permalink
[cli-refactor] Type and validate python_pointer_opts at entry point
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 3, 2025
1 parent 5d7ae2e commit a82982f
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 277 deletions.
71 changes: 33 additions & 38 deletions python_modules/dagster/dagster/_cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@

import dagster._check as check
import dagster._seven as seven
from dagster._cli.utils import get_instance_for_cli
from dagster._cli.workspace.cli_target import (
get_working_directory_from_kwargs,
python_pointer_options,
)
from dagster._cli.utils import assert_no_remaining_opts, get_instance_for_cli
from dagster._cli.workspace.cli_target import PythonPointerOpts, python_pointer_options
from dagster._core.definitions.metadata import MetadataValue
from dagster._core.errors import DagsterExecutionInterruptedError
from dagster._core.events import DagsterEvent, DagsterEventType, EngineEventData
Expand Down Expand Up @@ -577,6 +574,7 @@ def _execute_step_command_body(
@click.option(
"--heartbeat",
is_flag=True,
default=False,
help=(
"If set, the GRPC server will shut itself down when it fails to receive a heartbeat "
"after a timeout configurable with --heartbeat-timeout."
Expand All @@ -601,7 +599,6 @@ def _execute_step_command_body(
),
envvar="DAGSTER_LAZY_LOAD_USER_CODE",
)
@python_pointer_options
@click.option(
"--use-python-environment-entry-point",
is_flag=True,
Expand Down Expand Up @@ -699,26 +696,31 @@ def _execute_step_command_body(
help="[INTERNAL] Retrieves current utilization metrics from GRPC server.",
envvar="DAGSTER_ENABLE_SERVER_METRICS",
)
@python_pointer_options
def grpc_command(
port: Optional[int],
socket: Optional[str],
host: str,
max_workers: Optional[int],
heartbeat: bool = False,
heartbeat_timeout: int = 30,
lazy_load_user_code: bool = False,
fixed_server_id: Optional[str] = None,
log_level: str = "INFO",
log_format: str = "colored",
use_python_environment_entry_point: Optional[bool] = False,
container_image: Optional[str] = None,
container_context: Optional[str] = None,
location_name: Optional[str] = None,
instance_ref=None,
inject_env_vars_from_instance: bool = False,
heartbeat: bool,
heartbeat_timeout: int,
lazy_load_user_code: bool,
use_python_environment_entry_point: bool,
empty_working_directory: bool,
fixed_server_id: Optional[str],
log_level: str,
log_format: str,
container_image: Optional[str],
container_context: Optional[str],
inject_env_vars_from_instance: bool,
location_name: Optional[str],
instance_ref: Optional[str],
enable_metrics: bool = False,
**kwargs: Any,
**other_opts: Any,
) -> None:
python_pointer_opts = PythonPointerOpts.extract_from_cli_options(other_opts)
assert_no_remaining_opts(other_opts)

check.invariant(heartbeat_timeout > 0, "heartbeat_timeout must be greater than 0")

check.invariant(
Expand All @@ -743,31 +745,24 @@ def grpc_command(

loadable_target_origin = None
if any(
kwargs[key]
for key in [
"attribute",
"working_directory",
"module_name",
"package_name",
"python_file",
"empty_working_directory",
[
python_pointer_opts.attribute,
python_pointer_opts.working_directory,
python_pointer_opts.module_name,
python_pointer_opts.package_name,
python_pointer_opts.python_file,
empty_working_directory,
]
):
# in the gRPC api CLI we never load more than one module or python file at a time
module_name = check.opt_str_elem(kwargs, "module_name")
python_file = check.opt_str_elem(kwargs, "python_file")

loadable_target_origin = LoadableTargetOrigin(
executable_path=sys.executable,
attribute=kwargs["attribute"],
working_directory=(
None
if kwargs.get("empty_working_directory")
else get_working_directory_from_kwargs(kwargs)
),
module_name=module_name,
python_file=python_file,
package_name=kwargs["package_name"],
attribute=python_pointer_opts.attribute,
working_directory=python_pointer_opts.normalized_working_directory,
module_name=python_pointer_opts.module_name,
python_file=python_pointer_opts.python_file,
package_name=python_pointer_opts.package_name,
)

code_desc = " "
Expand Down
71 changes: 51 additions & 20 deletions python_modules/dagster/dagster/_cli/asset.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from collections.abc import Mapping
from typing import Optional

import click

import dagster._check as check
from dagster._cli.job import get_config_from_args
from dagster._cli.utils import get_instance_for_cli, get_possibly_temporary_instance_for_cli
from dagster._cli.job import get_run_config_from_cli_opts
from dagster._cli.utils import (
assert_no_remaining_opts,
get_instance_for_cli,
get_possibly_temporary_instance_for_cli,
)
from dagster._cli.workspace.cli_target import (
get_repository_python_origin_from_kwargs,
PythonPointerOpts,
get_repository_python_origin_from_cli_opts,
python_pointer_options,
run_config_option,
)
Expand All @@ -32,37 +37,63 @@ def asset_cli():


@asset_cli.command(name="materialize", help="Execute a run to materialize a selection of assets")
@python_pointer_options
@click.option("--select", help="Comma-separated Asset selection to target", required=True)
@click.option("--partition", help="Asset partition to target", required=False)
@click.option(
"--partition-range",
help="Asset partition range to target i.e. <start>...<end>",
required=False,
)
@run_config_option(name="config", command_name="materialize")
@click.option(
"--config-json",
type=click.STRING,
help="JSON string of run config to use for this job run. Cannot be used with -c / --config.",
)
def asset_materialize_command(**kwargs):
@run_config_option(name="config", command_name="materialize")
@python_pointer_options
def asset_materialize_command(
select: str,
partition: Optional[str],
partition_range: Optional[str],
config: tuple[str, ...],
config_json: Optional[str],
**other_opts: object,
) -> None:
python_pointer_opts = PythonPointerOpts.extract_from_cli_options(other_opts)
assert_no_remaining_opts(other_opts)

with capture_interrupts():
with get_possibly_temporary_instance_for_cli(
"``dagster asset materialize``",
) as instance:
execute_materialize_command(instance, kwargs)
execute_materialize_command(
instance,
select,
partition,
partition_range,
config,
config_json,
python_pointer_opts,
)


@telemetry_wrapper
def execute_materialize_command(instance: DagsterInstance, kwargs: Mapping[str, str]) -> None:
config = get_config_from_args(kwargs)
repository_origin = get_repository_python_origin_from_kwargs(kwargs)
def execute_materialize_command(
instance: DagsterInstance,
select: str,
partition: Optional[str],
partition_range: Optional[str],
config: tuple[str, ...],
config_json: Optional[str],
python_pointer_opts: PythonPointerOpts,
) -> None:
run_config = get_run_config_from_cli_opts(config, config_json)
repository_origin = get_repository_python_origin_from_cli_opts(python_pointer_opts)

recon_repo = recon_repository_from_origin(repository_origin)
repo_def = recon_repo.get_definition()

asset_selection = AssetSelection.from_coercible(kwargs["select"].split(","))
asset_selection = AssetSelection.from_coercible(select.split(","))
asset_keys = asset_selection.resolve(repo_def.asset_graph)

implicit_job_def = repo_def.get_implicit_job_def_for_assets(asset_keys)
Expand All @@ -77,8 +108,6 @@ def execute_materialize_command(instance: DagsterInstance, kwargs: Mapping[str,
reconstructable_job = recon_job_from_origin(
JobPythonOrigin(implicit_job_def.name, repository_origin=repository_origin)
)
partition = kwargs.get("partition")
partition_range = kwargs.get("partition_range")

if partition and partition_range:
check.failed("Cannot specify both --partition and --partition-range options. Use only one.")
Expand Down Expand Up @@ -150,23 +179,25 @@ def execute_materialize_command(instance: DagsterInstance, kwargs: Mapping[str,
asset_selection=list(asset_keys),
instance=instance,
tags=tags,
run_config=config,
run_config=run_config,
)
if not result.success:
raise click.ClickException("Materialization failed.")


@asset_cli.command(name="list", help="List assets")
@python_pointer_options
@click.option("--select", help="Asset selection to target", required=False)
def asset_list_command(**kwargs):
repository_origin = get_repository_python_origin_from_kwargs(kwargs)
@python_pointer_options
def asset_list_command(select: Optional[str], **other_opts):
python_pointer_opts = PythonPointerOpts.extract_from_cli_options(other_opts)
assert_no_remaining_opts(other_opts)

repository_origin = get_repository_python_origin_from_cli_opts(python_pointer_opts)
recon_repo = recon_repository_from_origin(repository_origin)
repo_def = recon_repo.get_definition()

select = kwargs.get("select")
if select is not None:
asset_selection = AssetSelection.from_coercible(kwargs["select"].split(","))
asset_selection = AssetSelection.from_coercible(select.split(","))
else:
asset_selection = AssetSelection.all()

Expand Down
61 changes: 29 additions & 32 deletions python_modules/dagster/dagster/_cli/code_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@

import click

import dagster._check as check
import dagster._seven as seven
from dagster._cli.workspace.cli_target import (
get_working_directory_from_kwargs,
python_pointer_options,
)
from dagster._cli.utils import assert_no_remaining_opts
from dagster._cli.workspace.cli_target import PythonPointerOpts, python_pointer_options
from dagster._core.instance import InstanceRef
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._core.utils import FuturesAwareThreadPoolExecutor
Expand Down Expand Up @@ -62,11 +59,9 @@ def code_server_cli():
"-n",
type=click.INT,
required=False,
default=None,
help="Maximum number of (threaded) workers to use in the code server",
envvar="DAGSTER_CODE_SERVER_MAX_WORKERS",
)
@python_pointer_options
@click.option(
"--use-python-environment-entry-point",
is_flag=True,
Expand Down Expand Up @@ -149,6 +144,7 @@ def code_server_cli():
@click.option(
"--heartbeat",
is_flag=True,
default=False,
help=(
"If set, the GRPC server will shut itself down when it fails to receive a heartbeat "
"after a timeout configurable with --heartbeat-timeout."
Expand All @@ -168,28 +164,32 @@ def code_server_cli():
help="[INTERNAL] Serialized InstanceRef to use for accessing the instance",
envvar="DAGSTER_INSTANCE_REF",
)
@python_pointer_options
def start_command(
port: Optional[int] = None,
socket: Optional[str] = None,
host: str = "localhost",
max_workers: Optional[int] = None,
fixed_server_id: Optional[str] = None,
log_level: str = "INFO",
log_format: str = "colored",
use_python_environment_entry_point: bool = False,
container_image: Optional[str] = None,
container_context: Optional[str] = None,
location_name: Optional[str] = None,
inject_env_vars_from_instance: bool = False,
startup_timeout: int = 0,
heartbeat: bool = False,
heartbeat_timeout: int = DEFAULT_HEARTBEAT_TIMEOUT,
instance_ref=None,
**kwargs,
port: Optional[int],
socket: Optional[str],
host: str,
max_workers: Optional[int],
use_python_environment_entry_point: bool,
fixed_server_id: Optional[str],
log_level: str,
log_format: str,
container_image: Optional[str],
container_context: Optional[str],
inject_env_vars_from_instance: bool,
location_name: Optional[str],
startup_timeout: int,
heartbeat: bool,
heartbeat_timeout,
instance_ref: Optional[str],
**other_opts,
):
from dagster._grpc import DagsterGrpcServer
from dagster._grpc.proxy_server import DagsterProxyApiServicer

python_pointer_opts = PythonPointerOpts.extract_from_cli_options(other_opts)
assert_no_remaining_opts(other_opts)

if seven.IS_WINDOWS and port is None:
raise click.UsageError(
"You must pass a valid --port/-p on Windows: --socket/-s not supported."
Expand All @@ -205,16 +205,13 @@ def start_command(
container_image = container_image or os.getenv("DAGSTER_CURRENT_IMAGE")

# in the gRPC api CLI we never load more than one module or python file at a time
module_name = check.opt_str_elem(kwargs, "module_name")
python_file = check.opt_str_elem(kwargs, "python_file")

loadable_target_origin = LoadableTargetOrigin(
executable_path=sys.executable if use_python_environment_entry_point else None,
attribute=kwargs["attribute"],
working_directory=get_working_directory_from_kwargs(kwargs),
module_name=module_name,
python_file=python_file,
package_name=kwargs["package_name"],
attribute=python_pointer_opts.attribute,
working_directory=python_pointer_opts.normalized_working_directory,
module_name=python_pointer_opts.module_name,
python_file=python_pointer_opts.python_file,
package_name=python_pointer_opts.package_name,
)

code_desc = " "
Expand Down
Loading

0 comments on commit a82982f

Please sign in to comment.