diff --git a/integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/test_dagster_dev_command.py b/integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/test_dagster_dev_command.py deleted file mode 100644 index 4af99092a52ca..0000000000000 --- a/integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/test_dagster_dev_command.py +++ /dev/null @@ -1,308 +0,0 @@ -import os -import signal -import subprocess -import tempfile -import time -from collections import deque - -import pytest -import requests -import yaml -from dagster import ( - DagsterEventType, - DagsterInstance, - _seven as seven, -) -from dagster._core.test_utils import environ, new_cwd -from dagster._grpc.client import DagsterGrpcClient -from dagster._grpc.server import wait_for_grpc_server -from dagster._utils import find_free_port -from dagster_graphql import DagsterGraphQLClient - - -def _wait_for_dagit_running(dagit_port): - start_time = time.time() - while True: - try: - dagit_json = requests.get(f"http://localhost:{dagit_port}/server_info").json() - if dagit_json: - return - except: - print("Waiting for Dagit to be ready..") # noqa: T201 - - if time.time() - start_time > 30: - raise Exception("Timed out waiting for Dagit to serve requests") - - time.sleep(1) - - -def test_dagster_dev_command_workspace(): - with tempfile.TemporaryDirectory() as tempdir: - with environ({"DAGSTER_HOME": ""}): - with new_cwd(tempdir): - dagit_port = find_free_port() - dev_process = subprocess.Popen( - [ - "dagster", - "dev", - "-w", - os.path.join( - os.path.dirname(__file__), - "workspace.yaml", - ), - "--dagit-port", - str(dagit_port), - "--log-level", - "debug", - ], - cwd=tempdir, - ) - try: - _wait_for_dagit_running(dagit_port) - finally: - dev_process.send_signal(signal.SIGINT) - dev_process.communicate() - - -def test_dagster_dev_command_loads_toys(): - with tempfile.TemporaryDirectory() as tempdir: - with environ({"DAGSTER_HOME": ""}): - with new_cwd(tempdir): - dagit_port = find_free_port() - dev_process = subprocess.Popen( - [ - "dagster", - "dev", - "-m", - "dagster_test.toys.repo", - "--dagit-port", - str(dagit_port), - "--log-level", - "debug", - ], - cwd=tempdir, - ) - try: - _wait_for_dagit_running(dagit_port) - - client = DagsterGraphQLClient(hostname="localhost", port_number=dagit_port) - locations_and_names = client._get_repo_locations_and_names_with_pipeline( # noqa - "hammer" - ) - assert ( - len(locations_and_names) > 0 - ), "toys repo failed to load or was missing a job called 'hammer'" - finally: - dev_process.send_signal(signal.SIGINT) - dev_process.communicate() - - -# E2E test that spins up "dagster dev", accesses dagit, -# and waits for a schedule run to launch -def test_dagster_dev_command_no_dagster_home(): - with tempfile.TemporaryDirectory() as tempdir: - with environ( - { - "DAGSTER_HOME": "", # unset dagster home - "CHECK_DAGSTER_DEV": "1", # trigger target user code to check for DAGSTER_DEV env var - } - ): - with new_cwd(tempdir): - dagster_yaml = { - "run_coordinator": { - "module": "dagster.core.run_coordinator", - "class": "QueuedRunCoordinator", - }, - } - with open(os.path.join(str(tempdir), "dagster.yaml"), "w") as config_file: - yaml.dump(dagster_yaml, config_file) - - dagit_port = find_free_port() - - dev_process = subprocess.Popen( - [ - "dagster", - "dev", - "-f", - os.path.join( - os.path.dirname(__file__), - "repo.py", - ), - "--working-directory", - os.path.dirname(__file__), - "--dagit-port", - str(dagit_port), - "--dagit-host", - "127.0.0.1", - ], - cwd=tempdir, - ) - - _wait_for_dagit_running(dagit_port) - - instance = None - - try: - start_time = time.time() - instance_dir = None - while True: - if time.time() - start_time > 30: - raise Exception("Timed out waiting for instance files to exist") - subfolders = [ - name - for name in os.listdir(tempdir) - if name.startswith("tmp") - and os.path.exists(os.path.join(tempdir, name, "history")) - ] - - if len(subfolders): - assert len(subfolders) == 1 - instance_dir = os.path.join(str(tempdir), subfolders[0]) - break - - time.sleep(1) - - with DagsterInstance.from_config(instance_dir) as instance: - start_time = time.time() - while True: - if ( - len(instance.get_runs()) > 0 - and len( - instance.fetch_run_status_changes( - DagsterEventType.PIPELINE_ENQUEUED, limit=1 - ).records - ) - > 0 - ): - # Verify the run was queued (so the dagster.yaml was applied) - break - - if time.time() - start_time > 30: - raise Exception("Timed out waiting for queued run to exist") - - time.sleep(1) - - finally: - dev_process.send_signal(signal.SIGINT) - dev_process.communicate() - - -def test_dagster_dev_command_grpc_port(): - with tempfile.TemporaryDirectory() as tempdir: - dagit_port = find_free_port() - grpc_port = find_free_port() - - grpc_process = None - dev_process = None - - try: - subprocess_args = [ - "dagster", - "api", - "grpc", - "-f", - os.path.join( - os.path.dirname(__file__), - "repo.py", - ), - "--working-directory", - os.path.dirname(__file__), - "-p", - str(grpc_port), - ] - grpc_process = subprocess.Popen(subprocess_args) - - client = DagsterGrpcClient(port=grpc_port, host="localhost") - wait_for_grpc_server(grpc_process, client, subprocess_args) - dev_process = subprocess.Popen( - [ - "dagster", - "dev", - "--dagit-port", - str(dagit_port), - "--dagit-host", - "127.0.0.1", - "--grpc-port", - str(grpc_port), - "--grpc-host", - "localhost", - ], - cwd=tempdir, - ) - _wait_for_dagit_running(dagit_port) - client = DagsterGraphQLClient(hostname="localhost", port_number=dagit_port) - client.submit_job_execution("foo_job") - finally: - if grpc_process: - grpc_process.send_signal(signal.SIGINT) - grpc_process.communicate() - - if dev_process: - dev_process.send_signal(signal.SIGINT) - dev_process.communicate() - - -@pytest.mark.skipif(seven.IS_WINDOWS, reason="This test relies on unix signals") -def test_dagster_dev_command_legacy_code_server_behavior(): - with tempfile.TemporaryDirectory() as tempdir: - with environ({"DAGSTER_HOME": ""}): - with new_cwd(tempdir): - dagit_port = find_free_port() - dev_process = subprocess.Popen( - [ - "dagster", - "dev", - "-m", - "dagster_test.toys.repo", - "--dagit-port", - str(dagit_port), - "--log-level", - "debug", - "--use-legacy-code-server-behavior", - ], - cwd=tempdir, - ) - try: - _wait_for_dagit_running(dagit_port) - - client = DagsterGraphQLClient(hostname="localhost", port_number=dagit_port) - locations_and_names = client._get_repo_locations_and_names_with_pipeline( # noqa - "hammer" - ) - assert ( - len(locations_and_names) > 0 - ), "toys repo failed to load or was missing a job called 'hammer'" - child_processes = find_child_processes(dev_process.pid) - assert ( - len(child_processes) == 4 - ) # dagster-daemon, dagster-webserver, and a code server for each. - finally: - dev_process.send_signal(signal.SIGINT) - dev_process.communicate() - - -def find_child_processes(pid: int): - children = set() - # Get full process tree - cmd = ["ps", "-eo", "pid,ppid"] - output = subprocess.check_output(cmd, text=True) - - # Create pid -> parent_pid mapping - processes = {} - for line in output.splitlines()[1:]: # Skip header - parts = line.strip().split() - if len(parts) == 2: - child_pid, parent_pid = map(int, parts) - processes[child_pid] = parent_pid - - # Use BFS to find all descendants - queue = deque([pid]) - while queue: - current_pid = queue.popleft() - # Find all processes whose parent is current_pid - for child_pid, parent_pid in processes.items(): - if parent_pid == current_pid: - children.add(child_pid) - queue.append(child_pid) - - return children diff --git a/integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/workspace.yaml b/integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/workspace.yaml deleted file mode 100644 index 27183829e6978..0000000000000 --- a/integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/workspace.yaml +++ /dev/null @@ -1,2 +0,0 @@ -load_from: - - python_file: repo.py \ No newline at end of file diff --git a/python_modules/dagster/dagster/_grpc/server.py b/python_modules/dagster/dagster/_grpc/server.py index 4b0f003e33f87..5d59faa09fbb1 100644 --- a/python_modules/dagster/dagster/_grpc/server.py +++ b/python_modules/dagster/dagster/_grpc/server.py @@ -4,6 +4,7 @@ import multiprocessing import os import queue +import subprocess import sys import threading import time @@ -109,6 +110,8 @@ from multiprocessing.synchronize import Event as MPEvent from subprocess import Popen + from dagster._grpc.client import DagsterGrpcClient + EVENT_QUEUE_POLL_INTERVAL = 0.1 CLEANUP_TICK = 0.5 @@ -1328,12 +1331,12 @@ def __init__(self, port=None, socket=None): def wait_for_grpc_server( - server_process, - client, - subprocess_args, - timeout=60, + server_process: subprocess.Popen, + client: "DagsterGrpcClient", + subprocess_args: Sequence[str], + timeout: int = 60, additional_timeout_msg: Optional[str] = None, -): +) -> None: start_time = time.time() last_error = None diff --git a/python_modules/dagster/dagster/_serdes/ipc.py b/python_modules/dagster/dagster/_serdes/ipc.py index a7ec743ccc5ca..cb7d8ec27b145 100644 --- a/python_modules/dagster/dagster/_serdes/ipc.py +++ b/python_modules/dagster/dagster/_serdes/ipc.py @@ -211,6 +211,14 @@ def interrupt_ipc_subprocess(proc: "Popen[Any]") -> None: proc.send_signal(signal.SIGINT) +def interrupt_or_kill_ipc_subprocess(proc: "Popen[Any]", wait_time: int = 10) -> None: + interrupt_ipc_subprocess(proc) + try: + proc.wait(timeout=wait_time) + except subprocess.TimeoutExpired: + proc.kill() + + def interrupt_ipc_subprocess_pid(pid: int) -> None: """Send CTRL_BREAK_EVENT on Windows, SIGINT on other platforms.""" check.int_param(pid, "pid") diff --git a/integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/__init__.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/__init__.py similarity index 100% rename from integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/__init__.py rename to python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/__init__.py diff --git a/integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/bar.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/bar.py similarity index 100% rename from integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/bar.py rename to python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/bar.py diff --git a/integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/repo.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/repo.py similarity index 89% rename from integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/repo.py rename to python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/repo.py index 968098c6333d5..75f91eb6cd46d 100644 --- a/integration_tests/test_suites/daemon-test-suite/dagster_dev_command_tests/repo.py +++ b/python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/repo.py @@ -1,6 +1,6 @@ import os -from bar import foo_op # requires working_directory +from bar import foo_op # requires working_directory # type: ignore from dagster import DefaultSensorStatus, RunRequest, job, repository, sensor diff --git a/python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/test_dagster_dev_command.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/test_dagster_dev_command.py new file mode 100644 index 0000000000000..f2ee6d1002830 --- /dev/null +++ b/python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/test_dagster_dev_command.py @@ -0,0 +1,360 @@ +import os +import platform +import subprocess +import tempfile +import time +from collections.abc import Iterator +from contextlib import contextmanager +from pathlib import Path + +import psutil +import requests +import yaml +from dagster._core.events import DagsterEventType +from dagster._core.instance import DagsterInstance +from dagster._core.test_utils import environ +from dagster._grpc.client import DagsterGrpcClient +from dagster._grpc.server import wait_for_grpc_server +from dagster._serdes.ipc import ( + interrupt_ipc_subprocess, + interrupt_or_kill_ipc_subprocess, + open_ipc_subprocess, +) +from dagster._utils import find_free_port, pushd +from dagster_graphql import DagsterGraphQLClient + + +def test_dagster_dev_command_workspace(): + with tempfile.TemporaryDirectory() as tempdir: + with environ({"DAGSTER_HOME": ""}): + with pushd(tempdir): + webserver_port = find_free_port() + with _launch_dev_command( + [ + "-w", + str(Path(__file__).parent / "workspace.yaml"), + "--port", + str(webserver_port), + "--log-level", + "debug", + ], + ) as dev_process: + _wait_for_webserver_running(webserver_port) + _validate_job_available(webserver_port, "foo_job") + _validate_expected_child_processes(dev_process, 4) + + +def test_dagster_dev_command_module(): + with tempfile.TemporaryDirectory() as tempdir: + with environ({"DAGSTER_HOME": ""}): + with pushd(tempdir): + webserver_port = find_free_port() + with _launch_dev_command( + [ + "-m", + "repo", + "--working-directory", + str(Path(__file__).parent), + "--port", + str(webserver_port), + "--log-level", + "debug", + ], + ) as dev_process: + _wait_for_webserver_running(webserver_port) + _validate_job_available(webserver_port, "foo_job") + _validate_expected_child_processes(dev_process, 4) + + +# E2E test that spins up "dagster dev", accesses webserver, +# and waits for a schedule run to launch +def test_dagster_dev_command_no_dagster_home(): + environment_patch = { + "DAGSTER_HOME": "", # unset dagster home + "CHECK_DAGSTER_DEV": "1", # trigger target user code to check for DAGSTER_DEV env var + } + dagster_yaml = { + "run_coordinator": { + "module": "dagster.core.run_coordinator", + "class": "QueuedRunCoordinator", + }, + } + + with tempfile.TemporaryDirectory() as tempdir, environ(environment_patch), pushd(tempdir): + with open(os.path.join(str(tempdir), "dagster.yaml"), "w") as config_file: + yaml.dump(dagster_yaml, config_file) + + webserver_port = find_free_port() + with _launch_dev_command( + [ + "-f", + str(Path(__file__).parent / "repo.py"), + "--working-directory", + str(Path(__file__).parent), + "--port", + str(webserver_port), + "--dagit-host", + "127.0.0.1", + ], + ) as dev_process: + _wait_for_webserver_running(webserver_port) + _validate_job_available(webserver_port, "foo_job") + _validate_expected_child_processes(dev_process, 4) + + instance_dir = _wait_for_instance_dir_to_be_written(Path(tempdir)) + + # Wait for schedule to launch + with DagsterInstance.from_config(str(instance_dir)) as instance: + start_time = time.time() + while True: + if ( + len(instance.get_runs()) > 0 + and len( + instance.fetch_run_status_changes( + DagsterEventType.PIPELINE_ENQUEUED, limit=1 + ).records + ) + > 0 + ): + # Verify the run was queued (so the dagster.yaml was applied) + break + + if time.time() - start_time > 30: + raise Exception("Timed out waiting for queued run to exist") + + time.sleep(1) + + +def test_dagster_dev_command_grpc_port(): + with tempfile.TemporaryDirectory() as tempdir, pushd(tempdir): + webserver_port = find_free_port() + grpc_port = find_free_port() + + subprocess_args = [ + "dagster", + "api", + "grpc", + "-f", + str(Path(__file__).parent / "repo.py"), + "--working-directory", + str(Path(__file__).parent), + "-p", + str(grpc_port), + ] + grpc_process = open_ipc_subprocess(subprocess_args) + try: + client = DagsterGrpcClient(port=grpc_port, host="localhost") + wait_for_grpc_server(grpc_process, client, subprocess_args) + with _launch_dev_command( + [ + "--port", + str(webserver_port), + "--dagit-host", + "127.0.0.1", + "--grpc-port", + str(grpc_port), + "--grpc-host", + "localhost", + ], + ) as dev_process: + _wait_for_webserver_running(webserver_port) + _validate_job_available(webserver_port, "foo_job") + # daemon, webserver only since the grpc server is separate + _validate_expected_child_processes(dev_process, 2) + client = DagsterGraphQLClient(hostname="localhost", port_number=webserver_port) + client.submit_job_execution("foo_job") + + # For some reason if we don't shut down the gRPC server before the dev server, there + # is a database access error when shutting down the gRPC server. + interrupt_or_kill_ipc_subprocess(grpc_process) + grpc_process.communicate() + finally: + if psutil.pid_exists(grpc_process.pid): + interrupt_or_kill_ipc_subprocess(grpc_process) + + +def test_dagster_dev_command_legacy_code_server_behavior(): + environment_patch = { + "DAGSTER_HOME": "", # unset dagster home + } + with tempfile.TemporaryDirectory() as tempdir, environ(environment_patch), pushd(tempdir): + webserver_port = find_free_port() + with _launch_dev_command( + [ + "-m", + "repo", + "--working-directory", + str(Path(__file__).parent), + "--port", + str(webserver_port), + "--log-level", + "debug", + "--use-legacy-code-server-behavior", + ], + ) as dev_process: + _wait_for_webserver_running(webserver_port) + _validate_job_available(webserver_port, "foo_job") + + # 4 processes: + # - dagster-daemon + # - dagster-webserver + # - dagster api grpc (for daemon) + # - dagster api grpc (for webserver) + _validate_expected_child_processes(dev_process, 4) + + +# ######################## +# ##### HELPERS +# ######################## + + +@contextmanager +def _launch_dev_command( + options: list[str], capture_output: bool = False +) -> Iterator[subprocess.Popen]: + proc = open_ipc_subprocess( + ["dagster", "dev", *options], + stdout=subprocess.PIPE if capture_output else None, + stderr=subprocess.PIPE if capture_output else None, + cwd=os.getcwd(), + ) + try: + yield proc + finally: + child_processes = _get_child_processes(proc.pid) + interrupt_ipc_subprocess(proc) + proc.wait(timeout=10) + # The `dagster dev` command exits before the gRPC servers it spins up have shutdown. Wait + # for the child processes to exit here to make sure we don't leave any hanging processes. + # + # We disable this check on Windows because interrupt signal propagation does not work in a + # CI environment. Interrupt propagation is dependent on processes sharing a console (which + # is the case in a user terminal session, but not in a CI environment). So on windows, we + # force kill the processes after a timeout. + _wait_for_child_processes_to_exit( + child_processes, timeout=30, force_kill=platform.system() == "Windows" + ) + + +def _wait_for_webserver_running(dagit_port): + start_time = time.time() + while True: + try: + server_info = requests.get(f"http://localhost:{dagit_port}/server_info").json() + if server_info: + return + except: + print("Waiting for Dagit to be ready..") # noqa: T201 + + if time.time() - start_time > 30: + raise Exception("Timed out waiting for Dagit to serve requests") + + time.sleep(1) + + +def _wait_for_instance_dir_to_be_written(parent_dir: Path) -> Path: + # Wait for instance files to exist + start_time = time.time() + while True: + if time.time() - start_time > 30: + raise Exception("Timed out waiting for instance files to exist") + subfolders = [ + child + for child in parent_dir.iterdir() + if child.name.startswith("tmp") and (child / "history").exists() + ] + + if len(subfolders): + assert len(subfolders) == 1 + break + + time.sleep(1) + return subfolders[0] + + +def _validate_job_available(port: int, job_name: str): + client = DagsterGraphQLClient(hostname="localhost", port_number=port) + locations_and_names = client._get_repo_locations_and_names_with_pipeline(job_name) # noqa: SLF001 + assert ( + len(locations_and_names) > 0 + ), f"repo failed to load or was missing a job called '{job_name}'" + + +def _validate_expected_child_processes(dev_process: subprocess.Popen, expected_count: int) -> None: + # Skip windows here-- it behaves strangely with respect to child processes and cmdline(). But we + # are still checking that all child processes shut down later. + if platform.system() != "Windows": + # 4 processes: + # - dagster-daemon + # - dagster-webserver + # - dagster code-server start + # - dagster api grpc (started by dagster code-server start) + + child_processes = _get_child_processes(dev_process.pid, exclude_trackers=True) + assert len(child_processes) == expected_count + + +def _get_child_processes(pid, exclude_trackers: bool = False) -> list[psutil.Process]: + parent = psutil.Process(pid) + children = parent.children(recursive=True) + if exclude_trackers: + return [c for c in children if not _is_tracker_process(c)] + else: + return children + + +def _is_tracker_process(proc: psutil.Process) -> bool: + return any(x.startswith("from multiprocessing") for x in proc.cmdline()) + + +def _wait_for_child_processes_to_exit( + child_procs: list[psutil.Process], timeout: int, force_kill: bool = False +) -> None: + start_time = time.time() + while True: + running_child_procs = [proc for proc in child_procs if proc.is_running()] + if not running_child_procs: + break + if time.time() - start_time > timeout: + stopped_child_procs = [proc for proc in child_procs if not proc.is_running()] + stopped_proc_lines = [_get_proc_repr(proc) for proc in stopped_child_procs] + running_proc_lines = [_get_proc_repr(proc) for proc in running_child_procs] + desc = "\n".join( + [ + "STOPPED:", + *stopped_proc_lines, + "RUNNING:", + *running_proc_lines, + ] + ) + if force_kill: + for proc in running_child_procs: + try: + proc.kill() + # Can happen if the process shut down from another shutting down in the + # iteration. + except psutil.NoSuchProcess: + pass + else: + raise Exception( + f"Timed out waiting for all child processes to exit. Remaining:\n{desc}" + ) + time.sleep(0.5) + + +def _get_proc_repr(proc: psutil.Process) -> str: + return f"PID [{proc.pid}] PPID [{_get_ppid(proc)}]: {_get_cmdline(proc)}" + + +def _get_ppid(proc: psutil.Process) -> str: + try: + return str(proc.ppid()) + except psutil.NoSuchProcess: + return "IRRETRIEVABLE" + + +def _get_cmdline(proc: psutil.Process) -> str: + try: + return str(proc.cmdline()) + except psutil.NoSuchProcess: + return "CMDLINE IRRETRIEVABLE" diff --git a/python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/workspace.yaml b/python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/workspace.yaml new file mode 100644 index 0000000000000..37fb8432034d3 --- /dev/null +++ b/python_modules/dagster/dagster_tests/cli_tests/command_tests/dagster_dev_command_tests/workspace.yaml @@ -0,0 +1,3 @@ +load_from: + - python_file: + relative_path: repo.py diff --git a/python_modules/dagster/setup.py b/python_modules/dagster/setup.py index e68a16937697b..2fae89371aac9 100644 --- a/python_modules/dagster/setup.py +++ b/python_modules/dagster/setup.py @@ -133,6 +133,7 @@ def get_version() -> str: "fsspec<2024.5.0", # morefs incompatibly "rapidfuzz", "flaky", + "psutil", ], "mypy": ["mypy==1.8.0"], "pyright": [ diff --git a/python_modules/dagster/tox.ini b/python_modules/dagster/tox.ini index c8860d676c563..9b4cc09b3db9d 100644 --- a/python_modules/dagster/tox.ini +++ b/python_modules/dagster/tox.ini @@ -27,6 +27,11 @@ deps = -e ../dagster-test -e .[mypy,test,pyright] -e ../dagster-pipes + + # These are required for dagster dev tests, but we leave them out of `dagster[test]` so that + # all of our other libs that depend on dagster[test] don't have to install them. + -e ../dagster-graphql + -e ../dagster-webserver allowlist_externals = /bin/bash uv @@ -35,7 +40,7 @@ commands = api_tests: pytest -c ../../pyproject.toml -vv ./dagster_tests/api_tests {env:COVERAGE_ARGS} --durations 10 {posargs} asset_defs_tests: pytest -c ../../pyproject.toml -vv ./dagster_tests/asset_defs_tests {env:COVERAGE_ARGS} --durations 10 {posargs} - cli_tests: pytest -c ../../pyproject.toml -vv ./dagster_tests/cli_tests {env:COVERAGE_ARGS} --durations 10 {posargs} + cli_tests: pytest -x -c ../../pyproject.toml -vv ./dagster_tests/cli_tests/ {env:COVERAGE_ARGS} --durations 10 {posargs} core_tests: pytest -c ../../pyproject.toml -vv ./dagster_tests/core_tests {env:COVERAGE_ARGS} --durations 10 {posargs} daemon_sensor_tests: pytest -c ../../pyproject.toml -vv ./dagster_tests/daemon_sensor_tests {env:COVERAGE_ARGS} --durations 10 {posargs} daemon_tests: pytest -c ../../pyproject.toml -vv ./dagster_tests/daemon_tests {env:COVERAGE_ARGS} --durations 10 {posargs} diff --git a/python_modules/libraries/dagster-dg/setup.py b/python_modules/libraries/dagster-dg/setup.py index 7087181d0c9cd..4a1aa228c963f 100644 --- a/python_modules/libraries/dagster-dg/setup.py +++ b/python_modules/libraries/dagster-dg/setup.py @@ -56,7 +56,6 @@ def get_version() -> str: "test": [ "click", "dagster-components", - "dagster-graphql", "pydantic", "pytest", ],