Skip to content

Commit

Permalink
[components] Add dg dev command
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 4, 2025
1 parent 230a2fd commit 81af794
Show file tree
Hide file tree
Showing 8 changed files with 486 additions and 10 deletions.
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_serdes/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def ipc_read_event_stream(
message = _process_line(file_pointer)


# Windows subprocess termination utilities
# Windows subprocess termination utilities. See here for why we send CTRL_BREAK_EVENT on Windows:
# https://stefan.sofa-rockers.org/2013/08/15/handling-sub-process-hierarchies-python-linux-os-x/


Expand Down Expand Up @@ -212,7 +212,7 @@ def interrupt_ipc_subprocess(proc: "Popen[Any]") -> None:


def interrupt_ipc_subprocess_pid(pid: int) -> None:
"""Send CTRL_BREAK on Windows, SIGINT on other platforms."""
"""Send CTRL_BREAK_EVENT on Windows, SIGINT on other platforms."""
check.int_param(pid, "pid")

if sys.platform == "win32":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster_dg.cli.component import component_group
from dagster_dg.cli.component_type import component_type_group
from dagster_dg.cli.deployment import deployment_group
from dagster_dg.cli.dev import dev_command
from dagster_dg.cli.global_options import dg_global_options
from dagster_dg.component import RemoteComponentRegistry
from dagster_dg.config import normalize_cli_config
Expand All @@ -21,9 +22,10 @@ def create_dg_cli():
name="dg",
commands={
"code-location": code_location_group,
"deployment": deployment_group,
"component": component_group,
"component-type": component_type_group,
"deployment": deployment_group,
"dev": dev_command,
},
context_settings={
"max_content_width": DG_CLI_MAX_OUTPUT_WIDTH,
Expand Down
215 changes: 215 additions & 0 deletions python_modules/libraries/dagster-dg/dagster_dg/cli/dev.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import os
import signal
import subprocess
import sys
import time
from collections.abc import Iterator, Mapping, Sequence
from contextlib import contextmanager, nullcontext
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Optional, TypeVar

import click
import psutil
import yaml

from dagster_dg.cli.global_options import dg_global_options
from dagster_dg.config import normalize_cli_config
from dagster_dg.context import DgContext
from dagster_dg.error import DgError
from dagster_dg.utils import DgClickCommand, exit_with_error, pushd

T = TypeVar("T")

_CHECK_SUBPROCESS_INTERVAL = 5


@click.command(name="dev", cls=DgClickCommand)
@click.option(
"--code-server-log-level",
help="Set the log level for code servers spun up by dagster services.",
show_default=True,
default="warning",
type=click.Choice(["critical", "error", "warning", "info", "debug"], case_sensitive=False),
)
@click.option(
"--log-level",
help="Set the log level for dagster services.",
show_default=True,
default="info",
type=click.Choice(["critical", "error", "warning", "info", "debug"], case_sensitive=False),
)
@click.option(
"--log-format",
type=click.Choice(["colored", "json", "rich"], case_sensitive=False),
show_default=True,
required=False,
default="colored",
help="Format of the logs for dagster services",
)
@click.option(
"--port",
"-p",
type=int,
help="Port to use for the Dagster webserver.",
required=False,
)
@click.option(
"--host",
"-h",
type=str,
help="Host to use for the Dagster webserver.",
required=False,
)
@click.option(
"--live-data-poll-rate",
help="Rate at which the dagster UI polls for updated asset data (in milliseconds)",
type=int,
default=2000,
show_default=True,
required=False,
)
@dg_global_options
@click.pass_context
def dev_command(
context: click.Context,
code_server_log_level: str,
log_level: str,
log_format: str,
port: Optional[int],
host: Optional[str],
live_data_poll_rate: int,
**global_options: Mapping[str, object],
) -> None:
"""Start a local deployment of your Dagster project.
If run inside a deployment directory, this command will launch all code locations in the
deployment. If launched inside a code location directory, it will launch only that code
location.
"""
cli_config = normalize_cli_config(global_options, context)
dg_context = DgContext.from_config_file_discovery_and_cli_config(Path.cwd(), cli_config)

forward_options = [
*_format_forwarded_option("--code-server-log-level", code_server_log_level),
*_format_forwarded_option("--log-level", log_level),
*_format_forwarded_option("--log-format", log_format),
*_format_forwarded_option("--port", port),
*_format_forwarded_option("--host", host),
*_format_forwarded_option("--live-data-poll-rate", live_data_poll_rate),
]

# In a code location context, we can just run `dagster dev` directly, using `dagster` from the
# code location's environment.
if dg_context.is_code_location:
cmd = ["uv", "run", "dagster", "dev", *forward_options]
temp_workspace_file_cm = nullcontext()

# In a deployment context, dg dev will construct a temporary
# workspace file that points at all defined code locations and invoke:
#
# uv tool run --with dagster-webserver dagster dev
#
# The `--with dagster-webserver` is necessary here to ensure that dagster-webserver is
# installed in the isolated environment that `uv` will install `dagster` in.
# `dagster-webserver` is not a dependency of `dagster` but is required to run the `dev`
# command.
elif dg_context.is_deployment:
cmd = [
"uv",
"tool",
"run",
"--with",
"dagster-webserver",
"dagster",
"dev",
*forward_options,
]
temp_workspace_file_cm = _temp_workspace_file(dg_context)
else:
exit_with_error("This command must be run inside a code location or deployment directory.")

with pushd(dg_context.root_path), temp_workspace_file_cm as workspace_file:
if workspace_file: # only non-None deployment context
cmd.extend(["--workspace", workspace_file])
uv_run_dagster_dev_process = _open_subprocess(cmd)
try:
while True:
time.sleep(_CHECK_SUBPROCESS_INTERVAL)
if uv_run_dagster_dev_process.poll() is not None:
raise DgError(
f"dagster-dev process shut down unexpectedly with return code {uv_run_dagster_dev_process.returncode}."
)
except KeyboardInterrupt:
click.secho(
"Received keyboard interrupt. Shutting down dagster-dev process.", fg="yellow"
)
finally:
# For reasons not fully understood, directly interrupting the `uv run` process does not
# work as intended. The interrupt signal is not correctly propagated to the `dagster
# dev` process, and so that process never shuts down. Therefore, we send the signal
# directly to the `dagster dev` process (the only child of the `uv run` process). This
# will cause `dagster dev` to terminate which in turn will cause `uv run` to terminate.
dagster_dev_pid = _get_child_process_pid(uv_run_dagster_dev_process)
_interrupt_subprocess(dagster_dev_pid)

try:
uv_run_dagster_dev_process.wait(timeout=10)
except subprocess.TimeoutExpired:
click.secho("`dagster dev` did not terminate in time. Killing it.")
uv_run_dagster_dev_process.kill()


@contextmanager
def _temp_workspace_file(dg_context: DgContext) -> Iterator[str]:
with NamedTemporaryFile(mode="w+", delete=True) as temp_workspace_file:
entries = []
for location in dg_context.get_code_location_names():
code_location_root = dg_context.get_code_location_path(location)
loc_context = dg_context.with_root_path(code_location_root)
entry = {
"working_directory": str(dg_context.deployment_root_path),
"relative_path": str(loc_context.definitions_path),
"location_name": loc_context.code_location_name,
}
if loc_context.use_dg_managed_environment:
entry["executable_path"] = str(loc_context.code_location_python_executable)
entries.append({"python_file": entry})
yaml.dump({"load_from": entries}, temp_workspace_file)
temp_workspace_file.flush()
yield temp_workspace_file.name


def _format_forwarded_option(option: str, value: object) -> list[str]:
return [] if value is None else [option, str(value)]


def _get_child_process_pid(proc: "subprocess.Popen") -> int:
children = psutil.Process(proc.pid).children(recursive=False)
if len(children) != 1:
raise ValueError(f"Expected exactly one child process, but found {len(children)}")
return children[0].pid


# Windows subprocess termination utilities. See here for why we send CTRL_BREAK_EVENT on Windows:
# https://stefan.sofa-rockers.org/2013/08/15/handling-sub-process-hierarchies-python-linux-os-x/


def _interrupt_subprocess(pid: int) -> None:
"""Send CTRL_BREAK_EVENT on Windows, SIGINT on other platforms."""
if sys.platform == "win32":
os.kill(pid, signal.CTRL_BREAK_EVENT)
else:
os.kill(pid, signal.SIGINT)


def _open_subprocess(command: Sequence[str]) -> "subprocess.Popen":
"""Sets the correct flags to support graceful termination."""
creationflags = 0
if sys.platform == "win32":
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP

return subprocess.Popen(
command,
creationflags=creationflags,
)
36 changes: 36 additions & 0 deletions python_modules/libraries/dagster-dg/dagster_dg/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
NOT_DEPLOYMENT_ERROR_MESSAGE,
ensure_loadable_path,
exit_with_error,
get_path_for_module,
get_path_for_package,
get_uv_command_env,
is_executable_available,
Expand Down Expand Up @@ -177,6 +178,12 @@ def code_location_root_path(self) -> Path:
def get_code_location_names(self) -> Iterable[str]:
return [loc.name for loc in sorted(self.code_location_root_path.iterdir())]

def get_code_location_path(self, name: str) -> Path:
return self.code_location_root_path / name

def get_code_location_root_module(self, name: str) -> Path:
return self.code_location_root_path / name

# ########################
# ##### GENERAL PYTHON PACKAGE METHODS
# ########################
Expand All @@ -193,6 +200,20 @@ def root_package_name(self) -> str:
def is_code_location(self) -> bool:
return self.config.is_code_location

@property
def code_location_name(self) -> str:
if not self.is_code_location:
raise DgError("`code_location_name` is only available in a code location context")
return self.root_path.name

@property
def code_location_python_executable(self) -> Path:
if not self.is_code_location:
raise DgError(
"`code_location_python_executable` is only available in a code location context"
)
return self.root_path / ".venv" / "bin" / "python"

@cached_property
def components_package_name(self) -> str:
if not self.is_code_location:
Expand All @@ -219,6 +240,21 @@ def get_component_names(self) -> Iterable[str]:
def has_component(self, name: str) -> bool:
return (self.components_path / name).is_dir()

@property
def definitions_package_name(self) -> str:
if not self.is_code_location:
raise DgError("`definitions_package_name` is only available in a code location context")
return f"{self.root_package_name}.definitions"

@cached_property
def definitions_path(self) -> Path:
with ensure_loadable_path(self.root_path):
if not is_package_installed(self.definitions_package_name):
raise DgError(
f"Definitions package `{self.definitions_package_name}` is not installed in the current environment."
)
return Path(get_path_for_module(self.definitions_package_name))

# ########################
# ##### COMPONENT LIBRARY METHODS
# ########################
Expand Down
25 changes: 19 additions & 6 deletions python_modules/libraries/dagster-dg/dagster_dg/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
import textwrap
from collections.abc import Iterator, Mapping, Sequence
from fnmatch import fnmatch
from importlib.machinery import ModuleSpec
from pathlib import Path
from typing import Any, Optional, TypeVar, Union

import click
import jinja2
from typer.rich_utils import rich_format_help
from typing_extensions import TypeAlias
from typing_extensions import Never, TypeAlias

from dagster_dg.error import DgError
from dagster_dg.version import __version__ as dagster_version
Expand Down Expand Up @@ -46,11 +47,23 @@ def is_package_installed(package_name: str) -> bool:
return False


def get_path_for_package(package_name: str) -> str:
spec = importlib.util.find_spec(package_name)
def _get_spec_for_module(module_name: str) -> ModuleSpec:
spec = importlib.util.find_spec(module_name)
if not spec:
raise DgError(f"Cannot find package: {package_name}")
# file_path = spec.origin
raise DgError(f"Cannot find module: {module_name}")
return spec


def get_path_for_module(module_name: str) -> str:
spec = _get_spec_for_module(module_name)
file_path = spec.origin
if not file_path:
raise DgError(f"Cannot find file path for module: {module_name}")
return file_path


def get_path_for_package(package_name: str) -> str:
spec = _get_spec_for_module(package_name)
submodule_search_locations = spec.submodule_search_locations
if not submodule_search_locations:
raise DgError(f"Package does not have any locations for submodules: {package_name}")
Expand Down Expand Up @@ -265,7 +278,7 @@ def not_none(value: Optional[T]) -> T:
return value


def exit_with_error(error_message: str) -> None:
def exit_with_error(error_message: str) -> Never:
click.echo(click.style(error_message, fg="red"))
sys.exit(1)

Expand Down
Loading

0 comments on commit 81af794

Please sign in to comment.