Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RDFLib Path to SHACL path utility and corresponding tests #2990

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 118 additions & 1 deletion rdflib/extras/shacl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,28 @@

from typing import TYPE_CHECKING

from rdflib import Graph, Literal, URIRef, paths
from rdflib import BNode, Graph, Literal, URIRef, paths
from rdflib.collection import Collection
from rdflib.namespace import RDF, SH
from rdflib.paths import Path

if TYPE_CHECKING:
from rdflib.graph import _ObjectType
from rdflib.term import IdentifiedNode


class SHACLPathError(Exception):
pass


# Map the variable length path operators to the corresponding SHACL path predicates
_PATH_MOD_TO_PRED = {
paths.ZeroOrMore: SH.zeroOrMorePath,
paths.OneOrMore: SH.oneOrMorePath,
paths.ZeroOrOne: SH.zeroOrOnePath,
}


# This implementation is roughly based on
# pyshacl.helper.sparql_query_helper::SPARQLQueryHelper._shacl_path_to_sparql_path
def parse_shacl_path(
Expand Down Expand Up @@ -93,3 +103,110 @@ def parse_shacl_path(
raise SHACLPathError(f"Cannot parse {repr(path_identifier)} as a SHACL Path.")

return path


def _build_path_component(
graph: Graph, path_component: URIRef | Path
) -> IdentifiedNode:
"""
Helper method that implements the recursive component of SHACL path
triple construction.

:param graph: A :class:`~rdflib.graph.Graph` into which to insert triples
:param graph_component: A :class:`~rdflib.term.URIRef` or
:class:`~rdflib.paths.Path` that is part of a path expression
:return: The :class:`~rdflib.term.IdentifiedNode of the resource in the
graph that corresponds to the provided path_component
"""
# Literals or other types are not allowed
if not isinstance(path_component, (URIRef, Path)):
raise TypeError(
f"Objects of type {type(path_component)} are not valid "
+ "components of a SHACL path."
)

# If the path component is a URI, return it
elif isinstance(path_component, URIRef):
return path_component
# Otherwise, the path component is represented as a blank node
bnode = BNode()

# Handle Sequence Paths
if isinstance(path_component, paths.SequencePath):
# Sequence paths are a Collection directly with at least two items
if len(path_component.args) < 2:
raise SHACLPathError(
"A list of SHACL Sequence Paths must contain at least two path items."
)
Collection(
graph,
bnode,
[_build_path_component(graph, arg) for arg in path_component.args],
)

# Handle Inverse Paths
elif isinstance(path_component, paths.InvPath):
graph.add(
(bnode, SH.inversePath, _build_path_component(graph, path_component.arg))
)

# Handle Alternative Paths
elif isinstance(path_component, paths.AlternativePath):
# Alternative paths are a Collection but referenced by sh:alternativePath
# with at least two items
if len(path_component.args) < 2:
raise SHACLPathError(
"List of SHACL alternate paths must have at least two path items."
)
coll = Collection(
graph,
BNode(),
[_build_path_component(graph, arg) for arg in path_component.args],
)
graph.add((bnode, SH.alternativePath, coll.uri))

# Handle Variable Length Paths
elif isinstance(path_component, paths.MulPath):
# Get the predicate corresponding to the path modifiier
pred = _PATH_MOD_TO_PRED.get(path_component.mod)
if pred is None:
raise SHACLPathError(f"Unknown path modifier {path_component.mod}")
graph.add((bnode, pred, _build_path_component(graph, path_component.path)))

# Return the blank node created for the provided path_component
return bnode


def build_shacl_path(
path: URIRef | Path, target_graph: Graph | None = None
) -> tuple[IdentifiedNode, Graph | None]:
"""
Build the SHACL Path triples for a path given by a :class:`~rdflib.term.URIRef` for
simple paths or a :class:`~rdflib.paths.Path` for complex paths.

Returns an :class:`~rdflib.term.IdentifiedNode` for the path (which should be
the object of a triple with predicate sh:path) and the graph into which any
new triples were added.

:param path: A :class:`~rdflib.term.URIRef` or a :class:`~rdflib.paths.Path`
:param target_graph: Optionally, a :class:`~rdflib.graph.Graph` into which to put
constructed triples. If not provided, a new graph will be created
:return: A (path_identifier, graph) tuple where:
- path_identifier: If path is a :class:`~rdflib.term.URIRef`, this is simply
the provided path. If path is a :class:`~rdflib.paths.Path`, this is
the :class:`~rdflib.term.BNode` corresponding to the root of the SHACL
path expression added to the graph.
- graph: None if path is a :class:`~rdflib.term.URIRef` (as no new triples
are constructed). If path is a :class:`~rdflib.paths.Path`, this is either the
target_graph provided or a new graph into which the path triples were added.
"""
# If a path is a URI, that's the whole path. No graph needs to be constructed.
if isinstance(path, URIRef):
return path, None

# Create a graph if one was not provided
if target_graph is None:
target_graph = Graph()

# Recurse through the path to build the graph representation
return _build_path_component(target_graph, path), target_graph
86 changes: 82 additions & 4 deletions test/test_extras/test_shacl_extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import pytest

from rdflib import Graph, URIRef
from rdflib.extras.shacl import SHACLPathError, parse_shacl_path
from rdflib import Graph, Literal, URIRef, paths
from rdflib.compare import graph_diff
from rdflib.extras.shacl import SHACLPathError, build_shacl_path, parse_shacl_path
from rdflib.namespace import SH, Namespace
from rdflib.paths import Path

Expand Down Expand Up @@ -109,7 +110,32 @@ def path_source_data():
) ;
] ;
.
ex:TestPropShape10
ex:TestPropShape10a
sh:path (
[
sh:zeroOrMorePath [
sh:inversePath ex:pred1 ;
] ;
]
[
sh:alternativePath (
[
sh:zeroOrMorePath [
sh:inversePath ex:pred1 ;
] ;
]
ex:pred1
[
sh:oneOrMorePath ex:pred2 ;
]
[
sh:zeroOrMorePath ex:pred3 ;
]
) ;
]
) ;
.
ex:TestPropShape10b
sh:path (
[
sh:zeroOrMorePath [
Expand Down Expand Up @@ -192,7 +218,13 @@ def path_source_data():
~EX.pred1 | EX.pred1 / EX.pred2 | EX.pred1 | EX.pred2 | EX.pred3,
),
(
EX.TestPropShape10,
EX.TestPropShape10a,
~EX.pred1
* "*"
/ (~EX.pred1 * "*" | EX.pred1 | EX.pred2 * "+" | EX.pred3 * "*"), # type: ignore[operator]
),
(
EX.TestPropShape10b,
~EX.pred1
* "*"
/ (~EX.pred1 * "*" | EX.pred1 | EX.pred2 * "+" | EX.pred3 * "*"), # type: ignore[operator]
Expand All @@ -217,3 +249,49 @@ def test_parse_shacl_path(
parse_shacl_path(path_source_data, path_root) # type: ignore[arg-type]
else:
assert parse_shacl_path(path_source_data, path_root) == expected # type: ignore[arg-type]


@pytest.mark.parametrize(
("resource", "path"),
(
# Single SHACL Path
(EX.TestPropShape1, EX.pred1),
(EX.TestPropShape2a, EX.pred1 / EX.pred2 / EX.pred3),
(EX.TestPropShape3, ~EX.pred1),
(EX.TestPropShape4a, EX.pred1 | EX.pred2 | EX.pred3),
(EX.TestPropShape5, EX.pred1 * "*"), # type: ignore[operator]
(EX.TestPropShape6, EX.pred1 * "+"), # type: ignore[operator]
(EX.TestPropShape7, EX.pred1 * "?"), # type: ignore[operator]
# SHACL Path Combinations
(EX.TestPropShape8, ~EX.pred1 * "*"),
(
EX.TestPropShape10a,
~EX.pred1
* "*"
/ (~EX.pred1 * "*" | EX.pred1 | EX.pred2 * "+" | EX.pred3 * "*"), # type: ignore[operator]
),
(TypeError, Literal("Not a valid path")),
(SHACLPathError, paths.SequencePath(SH.targetClass)),
(SHACLPathError, paths.AlternativePath(SH.targetClass)),
),
)
def test_build_shacl_path(
path_source_data: Graph, resource: URIRef | type, path: Union[URIRef, Path]
):
if isinstance(resource, type):
with pytest.raises(resource):
build_shacl_path(path)
else:
expected_path_root = path_source_data.value(resource, SH.path)
actual_path_root, actual_path_graph = build_shacl_path(path)
if isinstance(expected_path_root, URIRef):
assert actual_path_root == expected_path_root
assert actual_path_graph is None
else:
assert isinstance(actual_path_graph, Graph)
expected_path_graph = path_source_data.cbd(expected_path_root) # type: ignore[arg-type]
in_both, in_first, in_second = graph_diff(
expected_path_graph, actual_path_graph
)
assert len(in_first) == 0
assert len(in_second) == 0
Loading