Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[components] Better error message for failing entry points (BUILD-647) #27530

Open
wants to merge 1 commit into
base: graphite-base/27530
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
)
from dagster_components.core.schema.base import ResolvableSchema
from dagster_components.core.schema.context import ResolutionContext
from dagster_components.utils import load_module_from_path
from dagster_components.utils import format_error_message, load_module_from_path


class ComponentsEntryPointLoadError(DagsterError):
pass


class ComponentDeclNode(ABC):
Expand Down Expand Up @@ -158,7 +162,16 @@ def from_entry_point_discovery(
):
continue

root_module = entry_point.load()
try:
root_module = entry_point.load()
except Exception as e:
raise ComponentsEntryPointLoadError(
format_error_message(f"""
Error loading entry point `{entry_point.name}` in group `{COMPONENTS_ENTRY_POINT_GROUP}`.
Please fix the error or uninstall the package that defines this entry point.
""")
) from e

if not isinstance(root_module, ModuleType):
raise DagsterError(
f"Invalid entry point {entry_point.name} in group {COMPONENTS_ENTRY_POINT_GROUP}. "
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import importlib.util
import subprocess
import sys
import textwrap
from collections.abc import Iterator, Mapping
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from types import ModuleType
from typing import Any, Optional
from typing import Any, Optional, TypeVar

import click
from dagster._core.definitions.asset_key import AssetKey
Expand All @@ -18,6 +20,8 @@
from dagster_components.core.schema.context import ResolutionContext
from dagster_components.core.schema.objects import AssetAttributesSchema

T = TypeVar("T")

CLI_BUILTIN_COMPONENT_LIB_KEY = "builtin_component_lib"


Expand All @@ -36,6 +40,13 @@ def exit_with_error(error_message: str) -> None:
sys.exit(1)


def format_error_message(message: str) -> str:
# width=10000 unwraps any hardwrapping
dedented = textwrap.dedent(message).strip()
paragraphs = [textwrap.fill(p, width=10000) for p in dedented.split("\n\n")]
return "\n\n".join(paragraphs)


# Temporarily places a path at the front of sys.path, ensuring that any modules in that path are
# importable.
@contextmanager
Expand Down Expand Up @@ -143,3 +154,34 @@ def load_module_from_path(module_name, path) -> ModuleType:
assert spec.loader, "Must have a loader"
spec.loader.exec_module(module)
return module


# ########################
# ##### PLATFORM
# ########################


def is_windows() -> bool:
return sys.platform == "win32"


def is_macos() -> bool:
return sys.platform == "darwin"


# ########################
# ##### VENV
# ########################


def get_venv_executable(venv_dir: Path, executable: str = "python") -> Path:
if is_windows():
return venv_dir / "Scripts" / f"{executable}.exe"
else:
return venv_dir / "bin" / executable


def install_to_venv(venv_dir: Path, install_args: list[str]) -> None:
executable = get_venv_executable(venv_dir)
command = ["uv", "pip", "install", "--python", str(executable), *install_args]
subprocess.run(command, check=True)
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import json
import os
import subprocess
import sys
import tempfile
import textwrap
from collections.abc import Iterator, Sequence
from contextlib import contextmanager
from pathlib import Path
from typing import Callable, Optional

from dagster_dg.component import GlobalComponentKey
from dagster._utils import pushd
from dagster_components.utils import ensure_dagster_components_tests_import
from dagster_dg.utils import get_venv_executable

ensure_dagster_components_tests_import()

from dagster_components_tests.utils import modify_toml, set_toml_value


@contextmanager
Expand All @@ -16,15 +23,17 @@ def _temp_venv(install_args: Sequence[str]) -> Iterator[Path]:
with tempfile.TemporaryDirectory() as tmpdir:
venv_dir = Path(tmpdir) / ".venv"
subprocess.check_call(["uv", "venv", str(venv_dir)])
python_executable = (
venv_dir
/ ("Scripts" if sys.platform == "win32" else "bin")
/ ("python.exe" if sys.platform == "win32" else "python")
)
subprocess.check_call(
["uv", "pip", "install", "--python", str(python_executable), *install_args]
[
"uv",
"pip",
"install",
"--python",
str(get_venv_executable(venv_dir, "python")),
*install_args,
]
)
yield python_executable
yield venv_dir


COMPONENT_PRINT_SCRIPT = """
Expand All @@ -36,14 +45,22 @@ def _temp_venv(install_args: Sequence[str]) -> Iterator[Path]:
"""


def _get_component_types_in_python_environment(python_executable: Path) -> Sequence[str]:
with tempfile.NamedTemporaryFile(mode="w") as f:
f.write(COMPONENT_PRINT_SCRIPT)
f.flush()
result = subprocess.run(
[str(python_executable), f.name], capture_output=True, text=True, check=False
)
return result.stdout.strip().split("\n")
def _get_component_print_script_result(venv_root: Path) -> subprocess.CompletedProcess:
assert venv_root.exists()
dagster_components_path = get_venv_executable(venv_root, "dagster-components")
assert dagster_components_path.exists()
result = subprocess.run(
[str(dagster_components_path), "list", "component-types"],
capture_output=True,
text=True,
check=False,
)
return result


def _get_component_types_in_python_environment(venv_root: Path) -> Sequence[str]:
result = _get_component_print_script_result(venv_root)
return list(json.loads(result.stdout).keys())


def _find_repo_root():
Expand Down Expand Up @@ -151,40 +168,65 @@ def test_all_dagster_components_have_defined_summary():
"""


def test_components_from_third_party_lib(tmpdir):
with tmpdir.as_cwd():
# Create test package that defines some components
os.makedirs("dagster-foo")
with open("dagster-foo/pyproject.toml", "w") as f:
f.write(DAGSTER_FOO_PYPROJECT_TOML)

os.makedirs("dagster-foo/dagster_foo/lib/sub")

with open("dagster-foo/dagster_foo/lib/__init__.py", "w") as f:
f.write(DAGSTER_FOO_LIB_ROOT)

with open("dagster-foo/dagster_foo/lib/sub/__init__.py", "w") as f:
f.write(_generate_test_component_source(2))

# Need pipes because dependency of dagster
deps = [
"-e",
_get_editable_package_root("dagster"),
"-e",
_get_editable_package_root("dagster-components"),
"-e",
_get_editable_package_root("dagster-pipes"),
"-e",
"dagster-foo",
]

with _temp_venv(deps) as python_executable:
component_types = _get_component_types_in_python_environment(python_executable)
assert (
GlobalComponentKey(name="test_component_1", namespace="dagster_foo").to_typename()
in component_types
)
assert (
GlobalComponentKey(name="test_component_2", namespace="dagster_foo").to_typename()
in component_types
@contextmanager
def isolated_venv_with_component_lib_dagster_foo(
pre_install_hook: Optional[Callable[[], None]] = None,
):
with tempfile.TemporaryDirectory() as tmpdir:
with pushd(tmpdir):
# Create test package that defines some components
os.makedirs("dagster-foo")
with open("dagster-foo/pyproject.toml", "w") as f:
f.write(DAGSTER_FOO_PYPROJECT_TOML)

os.makedirs("dagster-foo/dagster_foo/lib/sub")

with open("dagster-foo/dagster_foo/lib/__init__.py", "w") as f:
f.write(DAGSTER_FOO_LIB_ROOT)

with open("dagster-foo/dagster_foo/lib/sub/__init__.py", "w") as f:
f.write(_generate_test_component_source(2))

if pre_install_hook:
pre_install_hook()

# Need pipes because dependency of dagster
deps = [
"-e",
_get_editable_package_root("dagster"),
"-e",
_get_editable_package_root("dagster-components"),
"-e",
_get_editable_package_root("dagster-pipes"),
"-e",
"dagster-foo",
]

with _temp_venv(deps) as venv_root:
yield venv_root


def test_components_from_third_party_lib():
with isolated_venv_with_component_lib_dagster_foo() as venv_root:
component_types = _get_component_types_in_python_environment(venv_root)
assert "test_component_1@dagster_foo" in component_types
assert "test_component_2@dagster_foo" in component_types


def test_bad_entry_point_error_message():
# Modify the entry point to point to a non-existent module. This has to be done before the
# package is installed, which is why we use a pre-install hook.
def pre_install_hook():
with modify_toml(Path("dagster-foo/pyproject.toml")) as toml:
set_toml_value(
toml,
("project", "entry-points", "dagster.components", "dagster_foo"),
"fake.module",
)

with isolated_venv_with_component_lib_dagster_foo(pre_install_hook) as venv_root:
result = _get_component_print_script_result(venv_root)
assert (
"Error loading entry point `dagster_foo` in group `dagster.components`" in result.stderr
)
assert result.returncode != 0
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from contextlib import contextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import AbstractSet, Optional # noqa: UP035
from typing import AbstractSet, Any, Iterable, Optional, TypeVar # noqa: UP035

import tomlkit
from dagster import AssetKey, DagsterInstance
from dagster._utils import pushd
from dagster_components.core.component import (
Expand All @@ -17,6 +18,8 @@
ComponentTypeRegistry,
)

T = TypeVar("T")


def registry() -> ComponentTypeRegistry:
return ComponentTypeRegistry.from_entry_point_discovery()
Expand Down Expand Up @@ -137,3 +140,48 @@ def create_code_location_from_components(
)

yield code_location_dir


# ########################
# ##### TOML MANIPULATION
# ########################

# Copied from dagster-dg


@contextmanager
def modify_toml(path: Path) -> Iterator[tomlkit.TOMLDocument]:
with open(path) as f:
toml = tomlkit.parse(f.read())
yield toml
with open(path, "w") as f:
f.write(tomlkit.dumps(toml))


def get_toml_value(doc: tomlkit.TOMLDocument, path: Iterable[str], expected_type: type[T]) -> T:
"""Given a tomlkit-parsed document/table (`doc`),retrieve the nested value at `path` and ensure
it is of type `expected_type`. Returns the value if so, or raises a KeyError / TypeError if not.
"""
current: Any = doc
for key in path:
# If current is not a table/dict or doesn't have the key, error out
if not isinstance(current, dict) or key not in current:
raise KeyError(f"Key '{key}' not found in path: {'.'.join(path)}")
current = current[key]

# Finally, ensure the found value is of the expected type
if not isinstance(current, expected_type):
raise TypeError(
f"Expected '{'.'.join(path)}' to be {expected_type.__name__}, "
f"but got {type(current).__name__} instead."
)
return current


def set_toml_value(doc: tomlkit.TOMLDocument, path: Iterable[str], value: object) -> None:
"""Given a tomlkit-parsed document/table (`doc`),set a nested value at `path` to `value`. Raises
an error if the leading keys do not already lead to a dictionary.
"""
path_list = list(path)
inner_dict = get_toml_value(doc, path_list[:-1], dict)
inner_dict[path_list[-1]] = value
2 changes: 1 addition & 1 deletion python_modules/libraries/dagster-components/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ def get_version() -> str:
extras_require={
"sling": ["dagster-sling"],
"dbt": ["dagster-dbt"],
"test": ["dbt-duckdb", "dagster-dg"],
"test": ["dbt-duckdb", "dagster-dg", "tomlkit"],
},
)
Loading