Skip to content

Commit

Permalink
Properly set up SIGBREAK handling for dagster dev on windows (#27769)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Previously I set up the `dagster dev` tests on Windows to manually
terminate the full process tree. I did this because I could not get the
full tree to terminate on its own from issuing an interrupt event. The
theory was that `CTRL_BREAK_EVENT` was not propagating correctly in a
windows CI environment due to the process tree not sharing a console.

I think this theory was wrong, and in fact what was happening is that
`CTRL_BREAK_EVENT` (aka `SIGBREAK`) does not trigger a
`KeyboardInterrupt` exception by default-- instead it just wipes out the
process. The logic that handles graceful shutdown of the process tree
relies on a `KeyboardException` being triggered.

This PR sets up the correct signal handling in `dagster dev` for
`SIGBREAK` to trigger a `KeyboardInterrupt`. This causes the full
process tree to shut down gracefully. The one caveat is that, for some
reason in the legacy code server case (where webserver and daemon each
spawn their own copy of the code server), it takes inordinately long on
windows for the code server spawned from the webserver (but not daemon!)
to self-delete after not receiving a heartbeat. I don't know why that
is, but if we set timeout long enough (set to 120 seconds in this PR--
previously experimented up to 60, which wasn't enough) then it
terminates successfully.

## How I Tested These Changes

Remove the explicit killing of child processes in the windows `dagster
dev` tests, they now pass without that.
  • Loading branch information
smackesey authored Feb 12, 2025
1 parent a80ff5d commit 1825d72
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 45 deletions.
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_cli/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dagster._grpc.server import GrpcServerCommand
from dagster._serdes import serialize_value
from dagster._serdes.ipc import interrupt_ipc_subprocess, open_ipc_subprocess
from dagster._utils.interrupts import setup_interrupt_handlers
from dagster._utils.log import configure_loggers

_SUBPROCESS_WAIT_TIMEOUT = 60
Expand Down Expand Up @@ -132,6 +133,9 @@ def dev_command(
" unless it is placed in the same folder as DAGSTER_HOME."
)

# Essential on windows-- will set up windows signals to raise KeyboardInterrupt
setup_interrupt_handlers()

with get_possibly_temporary_instance_for_cli("dagster dev", logger=logger) as instance:
with _optionally_create_temp_workspace(
use_legacy_code_server_behavior=use_legacy_code_server_behavior,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,16 @@ def _launch_dev_command(
# The `dagster dev` command exits before the gRPC servers it spins up have shutdown. Wait
# for the child processes to exit here to make sure we don't leave any hanging processes.
#
# We disable this check on Windows because interrupt signal propagation does not work in a
# CI environment. Interrupt propagation is dependent on processes sharing a console (which
# is the case in a user terminal session, but not in a CI environment). So on windows, we
# force kill the processes after a timeout.
_wait_for_child_processes_to_exit(
child_processes, timeout=30, force_kill=platform.system() == "Windows"
)
# In legacy code server mode, each of the webserver and daemon spin up a gRPC server. The webserver and
# daemon processes exit before the gRPC servers do, and do not directly shut down the
# servers. Instead, we rely on the servers to shut themselves down after not receiving a
# heartbeat from the parent process. The heartbeat timeout is configured at 45 seconds, but
# for some reason on windows the webserver (not daemon) gRPC server can take longer to shut
# down. We wait for up to 120 seconds here to be safe.
_wait_for_child_processes_to_exit(child_processes, timeout=120)


def _wait_for_webserver_running(dagit_port):
def _wait_for_webserver_running(dagit_port: int) -> None:
start_time = time.time()
while True:
try:
Expand Down Expand Up @@ -273,7 +273,7 @@ def _wait_for_instance_dir_to_be_written(parent_dir: Path) -> Path:
return subfolders[0]


def _validate_job_available(port: int, job_name: str):
def _validate_job_available(port: int, job_name: str) -> None:
client = DagsterGraphQLClient(hostname="localhost", port_number=port)
locations_and_names = client._get_repo_locations_and_names_with_pipeline(job_name) # noqa: SLF001
assert (
Expand All @@ -282,43 +282,56 @@ def _validate_job_available(port: int, job_name: str):


def _validate_expected_child_processes(dev_process: subprocess.Popen, expected_count: int) -> None:
# Skip windows here-- it spawns a lot more processes than Unix when running through tox, due to
# Note that on Windows, each
# tox shimming creating persistent processes. We are still checking that all child processes
# shut down later.
if platform.system() != "Windows":
# 4 processes:
# - dagster-daemon
# - dagster-webserver
# - dagster code-server start
# - dagster api grpc (started by dagster code-server start)
#
# Some of the tests above execute jobs, which result in additional child processes that may
# or may not be running/cleaned up by the time we get here. We aren't interested in these,
# exclude them.
child_processes = _get_child_processes(dev_process.pid, exclude_job_processes=True)
if len(child_processes) != expected_count:
proc_info = "\n".join([_get_proc_repr(proc) for proc in child_processes])
raise Exception(
f"Expected {expected_count} child processes, found {len(child_processes)}:\n{proc_info}"
)

# 4 processes:
# - dagster-daemon
# - dagster-webserver
# - dagster code-server start
# - dagster api grpc (started by dagster code-server start)
#
# Some of the tests above execute jobs, which result in additional child processes that may
# or may not be running/cleaned up by the time we get here. These are identifiable because
# they are spawned by the Python multiprocessing module. We aren't interested in these,
# exclude them.
child_processes = _get_child_processes(dev_process.pid, exclude_multiprocessing_processes=True)

# On Windows, we make two adjustments:
# - Exclude any processes that are themselves `dagster dev` commands. This happens because
# windows will sometimes return a PID as a child of itself.
# - Exclude any processes that are tox shims. When executing these tests through tox, each
# launched process runs through a tox shim that is itself a persistent process.
if platform.system() == "Windows":
child_processes = [
proc
for proc in child_processes
if not ("dev" in proc.cmdline() or ".tox\\" in proc.cmdline()[0])
]
if len(child_processes) != expected_count:
proc_info = "\n".join([_get_proc_repr(proc) for proc in child_processes])
raise Exception(
f"Expected {expected_count} child processes, found {len(child_processes)}:\n{proc_info}"
)


def _get_child_processes(pid, exclude_job_processes: bool = False) -> list[psutil.Process]:
def _get_child_processes(
pid, exclude_multiprocessing_processes: bool = False
) -> list[psutil.Process]:
parent = psutil.Process(pid)
children = parent.children(recursive=True)
if exclude_job_processes:
return [c for c in children if not _is_job_execution_process(c)]
if exclude_multiprocessing_processes:
return [c for c in children if not _is_multiprocessing_process(c)]
else:
return children


def _is_job_execution_process(proc: psutil.Process) -> bool:
def _is_multiprocessing_process(proc: psutil.Process) -> bool:
return any(x.startswith("from multiprocessing") for x in proc.cmdline())


def _wait_for_child_processes_to_exit(
child_procs: list[psutil.Process], timeout: int, force_kill: bool = False
) -> None:
def _wait_for_child_processes_to_exit(child_procs: list[psutil.Process], timeout: int) -> None:
start_time = time.time()
while True:
running_child_procs = [proc for proc in child_procs if proc.is_running()]
Expand All @@ -336,18 +349,9 @@ def _wait_for_child_processes_to_exit(
*running_proc_lines,
]
)
if force_kill:
for proc in running_child_procs:
try:
proc.kill()
# Can happen if the process shut down from another shutting down in the
# iteration.
except psutil.NoSuchProcess:
pass
else:
raise Exception(
f"Timed out waiting for all child processes to exit. Remaining:\n{desc}"
)
raise Exception(
f"Timed out waiting for all child processes to exit. Remaining:\n{desc}"
)
time.sleep(0.5)


Expand Down

0 comments on commit 1825d72

Please sign in to comment.