Skip to content

Commit

Permalink
[components] Add windows testing to dagster-dg
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 12, 2025
1 parent ae7df32 commit 6633929
Show file tree
Hide file tree
Showing 17 changed files with 464 additions and 175 deletions.
32 changes: 23 additions & 9 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ parameters:
type: object
default:
- "3.10"
- name: py3_env_suffixes
- name: dagster_core_py3_env_suffixes
type: object
default:
- api_tests
Expand All @@ -26,23 +26,37 @@ jobs:
strategy:
matrix:
${{ each py_version in parameters.py3_versions }}:
${{ each env_suffix in parameters.py3_env_suffixes }}:
${{ each env_suffix in parameters.dagster_core_py3_env_suffixes }}:
${{ replace(py_version, '.', '') }}-windows-${{ env_suffix }}:
TOXENV: "py${{ replace(py_version, '.', '') }}-windows-${{ env_suffix }}"
python.version: "${{ py_version }}"
PACKAGE_ROOT: "python_modules\\dagster"
PATH_BACK_TO_REPO_ROOT: "..\\.."
TOX_ENV: "py${{ replace(py_version, '.', '') }}-windows-${{ env_suffix }}"
PYTHON_VERSION: "${{ py_version }}"

${{ each py_version in parameters.py3_versions }}:
dagster-dg-${{ replace(py_version, '.', '') }}:
PACKAGE_ROOT: "python_modules\\libraries\\dagster-dg"
PATH_BACK_TO_REPO_ROOT: "..\\..\\.."
TOX_ENV: windows
PYTHON_VERSION: "${{ py_version }}"
variables:
PYTHONLEGACYWINDOWSSTDIO: "1"
PYTHONUTF8: "1"
# Use PowerShell (`powershell`) instead of cmd shell (`script`) for better UTF-8 support.
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: "$(python.version)"
versionSpec: "$(PYTHON_VERSION)"
architecture: "x64"
- script: pip install "tox<4.0.0" uv
- powershell: |
pip install "tox<4.0.0" uv
displayName: "Install tox & uv"
- script: cd python_modules\dagster && tox -e %TOXENV% && cd ..\..
- powershell: |
cd $env:PACKAGE_ROOT
tox -e $env:TOX_ENV
cd $env:PATH_BACK_TO_REPO_ROOT
displayName: "Run tests"
- task: PublishTestResults@2
inputs:
testResultsFiles: "**/test_results.xml"
testRunTitle: "dagster $(TOXENV)"
testRunTitle: "$(PACKAGE_ROOT) $(TOX_ENV)"
condition: succeededOrFailed()
14 changes: 7 additions & 7 deletions python_modules/libraries/dagster-dg/dagster_dg/cache.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import hashlib
import shutil
import sys
from collections.abc import Sequence
from pathlib import Path
from typing import Final, Literal, Optional, Union
from typing import Final, Literal, Optional

from typing_extensions import Self, TypeAlias

Expand All @@ -12,19 +11,19 @@
DEFAULT_FILE_EXCLUDE_PATTERNS,
hash_directory_metadata,
hash_file_metadata,
is_macos,
is_windows,
)

_CACHE_CONTAINER_DIR_NAME: Final = "dg-cache"

CachableDataType: TypeAlias = Union[
Literal["component_registry_data"], Literal["all_components_schema"]
]
CachableDataType: TypeAlias = Literal["component_registry_data", "all_components_schema"]


def get_default_cache_dir() -> Path:
if sys.platform == "win32":
if is_windows():
return Path.home() / "AppData" / "dg" / "cache"
elif sys.platform == "darwin":
elif is_macos():
return Path.home() / "Library" / "Caches" / "dg"
else:
return Path.home() / ".cache" / "dg"
Expand Down Expand Up @@ -78,6 +77,7 @@ def clear_key(self, key: tuple[str, ...]) -> None:

def clear_all(self) -> None:
shutil.rmtree(self._root_path)
assert not self._root_path.exists()
self.log(f"CACHE [clear-all]: {self._root_path}")

def get(self, key: tuple[str, ...]) -> Optional[str]:
Expand Down
61 changes: 28 additions & 33 deletions python_modules/libraries/dagster-dg/dagster_dg/cli/dev.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import os
import signal
import subprocess
import sys
import time
from collections.abc import Iterator, Mapping, Sequence
from collections.abc import Iterator, Mapping
from contextlib import contextmanager, nullcontext
from pathlib import Path
from tempfile import NamedTemporaryFile
Expand All @@ -17,7 +15,15 @@
from dagster_dg.config import normalize_cli_config
from dagster_dg.context import DgContext
from dagster_dg.error import DgError
from dagster_dg.utils import DgClickCommand, exit_with_error, get_uv_run_executable_path, pushd
from dagster_dg.utils import (
DgClickCommand,
exit_with_error,
get_uv_run_executable_path,
interrupt_subprocess,
is_windows,
open_subprocess,
pushd,
)

T = TypeVar("T")

Expand Down Expand Up @@ -90,6 +96,11 @@ def dev_command(
cli_config = normalize_cli_config(global_options, context)
dg_context = DgContext.from_config_file_discovery_and_cli_config(Path.cwd(), cli_config)

# Essential on windows-- we need SIGBREAK to raise a KeyboardInterrupt like SIGINT.
if is_windows():
# Pyright doesn't always know about SIGBREAK
signal.signal(signal.SIGBREAK, signal.default_int_handler) # type: ignore

forward_options = [
*_format_forwarded_option("--code-server-log-level", code_server_log_level),
*_format_forwarded_option("--log-level", log_level),
Expand Down Expand Up @@ -135,7 +146,7 @@ def dev_command(
print(f"Using {cmd_location}") # noqa: T201
if workspace_file: # only non-None deployment context
cmd.extend(["--workspace", workspace_file])
uv_run_dagster_dev_process = _open_subprocess(cmd)
uv_run_dagster_dev_process = open_subprocess(cmd)
try:
while True:
time.sleep(_CHECK_SUBPROCESS_INTERVAL)
Expand All @@ -154,7 +165,7 @@ def dev_command(
# directly to the `dagster dev` process (the only child of the `uv run` process). This
# will cause `dagster dev` to terminate which in turn will cause `uv run` to terminate.
dagster_dev_pid = _get_child_process_pid(uv_run_dagster_dev_process)
_interrupt_subprocess(dagster_dev_pid)
interrupt_subprocess(dagster_dev_pid)

try:
uv_run_dagster_dev_process.wait(timeout=10)
Expand All @@ -165,7 +176,11 @@ def dev_command(

@contextmanager
def _temp_workspace_file(dg_context: DgContext) -> Iterator[str]:
with NamedTemporaryFile(mode="w+", delete=True) as temp_workspace_file:
# Note that we can't rely on delete=True here because the NamedTemporaryFile context manager
# will create a file lock on Windows that will prevent the child `dagster dev` process we spawn
# from being able to read the file. So we use delete=False, exit the context manager before
# yielding, and manually delete the file after the context manager exits.
with NamedTemporaryFile(mode="w+", delete=False) as temp_workspace_file:
entries = []
for location in dg_context.get_code_location_names():
code_location_root = dg_context.get_code_location_path(location)
Expand All @@ -180,39 +195,19 @@ def _temp_workspace_file(dg_context: DgContext) -> Iterator[str]:
entries.append({"python_file": entry})
yaml.dump({"load_from": entries}, temp_workspace_file)
temp_workspace_file.flush()
try:
yield temp_workspace_file.name
finally:
Path(temp_workspace_file.name).unlink()


def _format_forwarded_option(option: str, value: object) -> list[str]:
return [] if value is None else [option, str(value)]


def _get_child_process_pid(proc: "subprocess.Popen") -> int:
children = psutil.Process(proc.pid).children(recursive=False)
def _get_child_process_pid(proc: subprocess.Popen) -> int:
# Windows will sometimes return the parent process as its own child. Filter this out.
children = [p for p in psutil.Process(proc.pid).children(recursive=False) if p.pid != proc.pid]
if len(children) != 1:
raise ValueError(f"Expected exactly one child process, but found {len(children)}")
return children[0].pid


# Windows subprocess termination utilities. See here for why we send CTRL_BREAK_EVENT on Windows:
# https://stefan.sofa-rockers.org/2013/08/15/handling-sub-process-hierarchies-python-linux-os-x/


def _interrupt_subprocess(pid: int) -> None:
"""Send CTRL_BREAK_EVENT on Windows, SIGINT on other platforms."""
if sys.platform == "win32":
os.kill(pid, signal.CTRL_BREAK_EVENT)
else:
os.kill(pid, signal.SIGINT)


def _open_subprocess(command: Sequence[str]) -> "subprocess.Popen":
"""Sets the correct flags to support graceful termination."""
creationflags = 0
if sys.platform == "win32":
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP

return subprocess.Popen(
command,
creationflags=creationflags,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def install_completion(context: click.Context):
shell, _ = shellingham.detect_shell()
comp_class = click.shell_completion.get_completion_class(shell)
if comp_class is None:
exit_with_error(f"Shell {shell} is not supported.")
exit_with_error(f"Shell `{shell}` is not supported.")
else:
comp_inst = comp_class(
cli=context.command, ctx_args={}, prog_name="dg", complete_var="_DG_COMPLETE"
Expand Down
7 changes: 3 additions & 4 deletions python_modules/libraries/dagster-dg/dagster_dg/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import sys
from collections.abc import Mapping
from dataclasses import dataclass, replace
from pathlib import Path
Expand All @@ -9,17 +8,17 @@
from click.core import ParameterSource

from dagster_dg.error import DgError, DgValidationError
from dagster_dg.utils import get_toml_value
from dagster_dg.utils import get_toml_value, is_macos, is_windows

T = TypeVar("T")

DEFAULT_BUILTIN_COMPONENT_LIB = "dagster_components"


def _get_default_cache_dir() -> Path:
if sys.platform == "win32":
if is_windows():
return Path.home() / "AppData" / "dg" / "cache"
elif sys.platform == "darwin":
elif is_macos():
return Path.home() / "Library" / "Caches" / "dg"
else:
return Path.home() / ".cache" / "dg"
Expand Down
24 changes: 19 additions & 5 deletions python_modules/libraries/dagster-dg/dagster_dg/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
get_executable_path,
get_path_for_module,
get_path_for_package,
get_uv_command_env,
get_uv_run_executable_path,
get_venv_executable,
is_executable_available,
is_package_installed,
pushd,
strip_activated_venv_from_env_vars,
)

# Deployment
Expand Down Expand Up @@ -130,7 +131,7 @@ def has_cache(self) -> bool:
return self._cache is not None

def get_cache_key(self, data_type: CachableDataType) -> tuple[str, str, str]:
path_parts = [str(part) for part in self.root_path.parts if part != "/"]
path_parts = [str(part) for part in self.root_path.parts if part != self.root_path.anchor]
paths_to_hash = [
self.root_path / "uv.lock",
*([self.components_lib_path] if self.is_component_library else []),
Expand Down Expand Up @@ -214,7 +215,7 @@ def code_location_python_executable(self) -> Path:
raise DgError(
"`code_location_python_executable` is only available in a code location context"
)
return self.root_path / ".venv" / "bin" / "python"
return self.root_path / get_venv_executable(Path(".venv"))

@cached_property
def components_package_name(self) -> str:
Expand Down Expand Up @@ -302,7 +303,7 @@ def components_lib_path(self) -> Path:
def external_components_command(self, command: list[str], log: bool = True) -> str:
if self.use_dg_managed_environment:
code_location_command_prefix = ["uv", "run", "dagster-components"]
env = get_uv_command_env()
env = strip_activated_venv_from_env_vars()
executable_path = get_uv_run_executable_path("dagster-components")
else:
code_location_command_prefix = ["dagster-components"]
Expand All @@ -327,7 +328,7 @@ def ensure_uv_lock(self, path: Optional[Path] = None) -> None:
path = path or self.root_path
with pushd(path):
if not (path / "uv.lock").exists():
subprocess.run(["uv", "sync"], check=True, env=get_uv_command_env())
subprocess.run(["uv", "sync"], check=True, env=strip_activated_venv_from_env_vars())

@property
def use_dg_managed_environment(self) -> bool:
Expand All @@ -344,3 +345,16 @@ def validate_registry_command_environment(self) -> None:
"""
if not is_executable_available("dagster-components"):
exit_with_error(MISSING_DAGSTER_COMPONENTS_ERROR_MESSAGE)

@cached_property
def venv_path(self) -> Path:
path = self.root_path
while path != path.parent:
if (path / ".venv").exists():
return path
path = path.parent
raise DgError("Cannot find .venv")

@cached_property
def dagster_components_executable(self) -> Path:
return get_venv_executable(self.venv_path, "dagster-components")
4 changes: 2 additions & 2 deletions python_modules/libraries/dagster-dg/dagster_dg/scaffold.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def get_pyproject_toml_dev_dependencies(use_editable_dagster: bool) -> str:

def get_pyproject_toml_uv_sources(editable_dagster_root: Path) -> str:
lib_lines = [
f'{path.name} = {{ path = "{path}", editable = true }}'
f"{path.name} = {{ path = '{path}', editable = true }}"
for path in _gather_dagster_packages(editable_dagster_root)
]
return "\n".join(
Expand Down Expand Up @@ -140,7 +140,7 @@ def scaffold_component_type(dg_context: DgContext, name: str) -> None:
scaffold_subtree(
path=root_path,
name_placeholder="COMPONENT_TYPE_NAME_PLACEHOLDER",
templates_path=os.path.join(os.path.dirname(__file__), "templates", "COMPONENT_TYPE"),
templates_path=str(Path(__file__).parent / "templates" / "COMPONENT_TYPE"),
project_name=name,
component_type_class_name=camelcase(name),
name=name,
Expand Down
Loading

0 comments on commit 6633929

Please sign in to comment.