Skip to content

Commit

Permalink
[cli-refactor] Type and validate workspace_opts at entry point (dagst…
Browse files Browse the repository at this point in the history
…er-io#27540)

## Summary & Motivation

dagster-io#27501 added typing and validation of args for all CLI entry points that
accept a "python pointer" (i.e. pointer to a single code location). This
PR continues that theme for all commands that accept a workspace
specification.

All commands that operate on a workspace now immediately parse passed
workspace-specifyng options into the new typed `WorkspaceOpts` object.
All these commands have also had their signatures cleaned up. Many of
the helper methods used within these commands (which previously operated
on untyped bags of "kwargs") have also been refactored.

## How I Tested These Changes

Existing test suite (with some refactors to handle new `WorkspaceOpts`).
  • Loading branch information
smackesey authored and LoHertel committed Feb 11, 2025
1 parent 0f7f297 commit 3f75b43
Show file tree
Hide file tree
Showing 26 changed files with 1,784 additions and 1,480 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from dagster_webserver.app import create_app_from_workspace_process_context
from starlette.testclient import TestClient

from dagster._cli.workspace import get_workspace_process_context_from_kwargs
from dagster._cli.workspace.cli_target import WorkspaceOpts
from dagster._core.test_utils import instance_for_test
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster._utils import check_script, pushd, script_relative_path

PIPELINES_OR_ERROR_QUERY = """
Expand Down Expand Up @@ -99,10 +100,15 @@ def path_to_tutorial_file(path):
)


def load_dagster_webserver_for_workspace_cli_args(n_pipelines=1, **kwargs):
def load_dagster_webserver_for_workspace_cli_args(
n_pipelines=1, *, workspace_opts: WorkspaceOpts
):
with instance_for_test() as instance:
with get_workspace_process_context_from_kwargs(
instance, version="", read_only=False, kwargs=kwargs
with WorkspaceProcessContext(
instance,
version="",
read_only=False,
workspace_load_target=workspace_opts.to_load_target(),
) as workspace_process_context:
client = TestClient(
create_app_from_workspace_process_context(workspace_process_context)
Expand Down Expand Up @@ -132,7 +138,7 @@ def test_load_pipeline(
with pushd(path_to_tutorial_file(dirname)):
filepath = path_to_tutorial_file(os.path.join(dirname, filename))
load_dagster_webserver_for_workspace_cli_args(
python_file=(filepath,), fn_name=fn_name
workspace_opts=WorkspaceOpts(python_file=(filepath,), attribute=fn_name),
)


Expand Down
35 changes: 28 additions & 7 deletions python_modules/dagster-graphql/dagster_graphql/cli.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import asyncio
from collections.abc import Mapping
from io import TextIOWrapper
from typing import Optional
from urllib.parse import urljoin, urlparse

import click
import dagster._check as check
import dagster._seven as seven
import requests
from dagster._cli.utils import get_instance_for_cli, get_temporary_instance_for_cli
from dagster._cli.workspace import workspace_options
from dagster._cli.utils import (
assert_no_remaining_opts,
get_instance_for_cli,
get_temporary_instance_for_cli,
)
from dagster._cli.workspace.cli_target import (
WORKSPACE_TARGET_WARNING,
get_workspace_process_context_from_kwargs,
WorkspaceOpts,
workspace_options,
)
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster._utils import DEFAULT_WORKSPACE_YAML_FILENAME
Expand Down Expand Up @@ -127,7 +132,6 @@ def execute_query_against_remote(host, query, variables):
}


@workspace_options
@click.command(
name="ui",
help=(
Expand Down Expand Up @@ -180,9 +184,23 @@ def execute_query_against_remote(host, query, variables):
@click.option(
"--ephemeral-instance",
is_flag=True,
default=False,
help="Use an ephemeral DagsterInstance instead of resolving via DAGSTER_HOME",
)
def ui(text, file, predefined, variables, remote, output, ephemeral_instance, **kwargs):
@workspace_options
def ui(
text: Optional[str],
file: Optional[TextIOWrapper],
predefined: Optional[str],
variables: Optional[str],
remote: Optional[str],
output: Optional[str],
ephemeral_instance: bool,
**other_opts,
):
workspace_opts = WorkspaceOpts.extract_from_cli_options(other_opts)
assert_no_remaining_opts(other_opts)

query = None
if text is not None and file is None and predefined is None:
query = text.strip("'\" \n\t")
Expand All @@ -203,8 +221,11 @@ def ui(text, file, predefined, variables, remote, output, ephemeral_instance, **
with (
get_temporary_instance_for_cli() if ephemeral_instance else get_instance_for_cli()
) as instance:
with get_workspace_process_context_from_kwargs(
instance, version=__version__, read_only=False, kwargs=kwargs
with WorkspaceProcessContext(
instance=instance,
version=__version__,
read_only=False,
workspace_load_target=workspace_opts.to_load_target(),
) as workspace_process_context:
execute_query_from_cli(
workspace_process_context,
Expand Down
22 changes: 14 additions & 8 deletions python_modules/dagster-webserver/dagster_webserver/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import dagster._check as check
import uvicorn
from dagster._annotations import deprecated
from dagster._cli.utils import ClickArgValue, get_possibly_temporary_instance_for_cli
from dagster._cli.workspace import get_workspace_process_context_from_kwargs, workspace_options
from dagster._cli.workspace.cli_target import WORKSPACE_TARGET_WARNING
from dagster._cli.utils import assert_no_remaining_opts, get_possibly_temporary_instance_for_cli
from dagster._cli.workspace.cli_target import (
WORKSPACE_TARGET_WARNING,
WorkspaceOpts,
workspace_options,
)
from dagster._core.instance import InstanceRef
from dagster._core.telemetry import START_DAGSTER_WEBSERVER, log_action
from dagster._core.telemetry_upload import uploading_logging_thread
from dagster._core.workspace.context import IWorkspaceProcessContext
from dagster._core.workspace.context import IWorkspaceProcessContext, WorkspaceProcessContext
from dagster._serdes import deserialize_value
from dagster._utils import DEFAULT_WORKSPACE_YAML_FILENAME, find_free_port, is_port_in_use
from dagster._utils.log import configure_loggers
Expand Down Expand Up @@ -72,7 +75,6 @@ def create_dagster_webserver_cli():
"""
),
)
@workspace_options
@click.option(
"--host",
"-h",
Expand Down Expand Up @@ -178,6 +180,7 @@ def create_dagster_webserver_cli():
show_default=True,
)
@click.version_option(version=__version__, prog_name="dagster-webserver")
@workspace_options
def dagster_webserver(
host: str,
port: int,
Expand All @@ -192,8 +195,11 @@ def dagster_webserver(
code_server_log_level: str,
instance_ref: Optional[str],
live_data_poll_rate: int,
**kwargs: ClickArgValue,
**other_opts: object,
):
workspace_opts = WorkspaceOpts.extract_from_cli_options(other_opts)
assert_no_remaining_opts(other_opts)

if suppress_warnings:
os.environ["PYTHONWARNINGS"] = "ignore"

Expand All @@ -214,11 +220,11 @@ def dagster_webserver(
# Allow the instance components to change behavior in the context of a long running server process
instance.optimize_for_webserver(db_statement_timeout, db_pool_recycle)

with get_workspace_process_context_from_kwargs(
with WorkspaceProcessContext(
instance,
version=__version__,
read_only=read_only,
kwargs=kwargs,
workspace_load_target=workspace_opts.to_load_target(),
code_server_log_level=code_server_log_level,
) as workspace_process_context:
host_dagster_ui_with_workspace_process_context(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from dagster._cli.workspace import get_workspace_process_context_from_kwargs
from dagster._cli.workspace.cli_target import WorkspaceOpts
from dagster._core.test_utils import instance_for_test
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster_webserver import app
from starlette.testclient import TestClient

Expand Down Expand Up @@ -29,11 +30,13 @@
)
def test_smoke_app(gen_instance):
with gen_instance() as instance:
with get_workspace_process_context_from_kwargs(
with WorkspaceProcessContext(
instance,
version="",
read_only=False,
kwargs=dict(module_name=("dagster_webserver_tests.toy.bar_repo",), definition="bar"),
workspace_load_target=WorkspaceOpts(
module_name=("dagster_webserver_tests.toy.bar_repo",)
).to_load_target(),
) as workspace_process_context:
asgi_app = app.create_app_from_workspace_process_context(workspace_process_context)
client = TestClient(asgi_app)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from dagster import DagsterInstance, __version__
from dagster._cli.workspace.cli_target import get_workspace_process_context_from_kwargs
from dagster._cli.workspace.cli_target import WorkspaceOpts
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster_webserver.webserver import DagsterWebserver
from starlette.requests import Request
from starlette.responses import JSONResponse
Expand Down Expand Up @@ -29,11 +30,11 @@ def build_routes(self):

@pytest.fixture(scope="session")
def test_client(instance):
process_context = get_workspace_process_context_from_kwargs(
process_context = WorkspaceProcessContext(
instance=instance,
version=__version__,
read_only=False,
kwargs={"empty_workspace": True}, # pyright: ignore[reportArgumentType]
workspace_load_target=WorkspaceOpts(empty_workspace=True).to_load_target(),
)

app = TestDagsterWebserver(process_context).create_asgi_app(debug=True)
Expand Down
21 changes: 11 additions & 10 deletions python_modules/dagster/dagster/_cli/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import click

from dagster import __version__ as dagster_version
from dagster._cli.utils import ClickArgValue, get_possibly_temporary_instance_for_cli
from dagster._cli.utils import assert_no_remaining_opts, get_possibly_temporary_instance_for_cli
from dagster._cli.workspace.cli_target import (
get_auto_determined_workspace_from_kwargs,
get_workspace_from_kwargs,
WorkspaceOpts,
get_workspace_from_cli_opts,
workspace_options,
)
from dagster._utils.log import configure_loggers
Expand Down Expand Up @@ -58,8 +58,10 @@ def definitions_cli():
""",
)
def definitions_validate_command(
log_level: str, log_format: str, load_with_grpc: bool, **kwargs: ClickArgValue
log_level: str, log_format: str, load_with_grpc: bool, **other_opts: object
):
workspace_opts = WorkspaceOpts.extract_from_cli_options(other_opts)
assert_no_remaining_opts(other_opts)
os.environ["DAGSTER_IS_DEFS_VALIDATION_CLI"] = "1"

configure_loggers(formatter=log_format, log_level=log_level.upper())
Expand All @@ -69,12 +71,11 @@ def definitions_validate_command(
with get_possibly_temporary_instance_for_cli(
"dagster definitions validate", logger=logger
) as instance:
with (
get_workspace_from_kwargs(instance=instance, version=dagster_version, kwargs=kwargs)
if load_with_grpc
else get_auto_determined_workspace_from_kwargs(
instance=instance, version=dagster_version, kwargs=kwargs
)
with get_workspace_from_cli_opts(
instance=instance,
version=dagster_version,
workspace_opts=workspace_opts,
allow_in_process=not load_with_grpc,
) as workspace:
if logger.parent:
logger.parent.handlers.clear()
Expand Down
Loading

0 comments on commit 3f75b43

Please sign in to comment.