Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nx-cugraph: dispatch graph method to gpu or cpu #17

Merged
merged 4 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions nx_cugraph/classes/digraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
__all__ = ["CudaDiGraph", "DiGraph"]

networkx_api = nxcg.utils.decorators.networkx_class(nx.DiGraph)
gpu_cpu_api = nxcg.utils.decorators._gpu_cpu_api(nx.DiGraph, __name__)


class DiGraph(nx.DiGraph, Graph):
Expand Down Expand Up @@ -105,6 +106,43 @@ def to_cudagraph_class(cls) -> type[CudaDiGraph]:
def to_networkx_class(cls) -> type[nx.DiGraph]:
return nx.DiGraph

##########################
# Networkx graph methods #
##########################

# Dispatch to nx.DiGraph or CudaDiGraph
__contains__ = gpu_cpu_api("__contains__")
__len__ = gpu_cpu_api("__len__")
__iter__ = gpu_cpu_api("__iter__")

@networkx_api
def clear(self) -> None:
cudagraph = self._cudagraph if self._is_on_gpu else None
if self._is_on_cpu:
super().clear()
if cudagraph is not None:
cudagraph.clear()
self._set_cudagraph(cudagraph, clear_cpu=False)

@networkx_api
def clear_edges(self) -> None:
cudagraph = self._cudagraph if self._is_on_gpu else None
if self._is_on_cpu:
super().clear_edges()
if cudagraph is not None:
cudagraph.clear_edges()
self._set_cudagraph(cudagraph, clear_cpu=False)

get_edge_data = gpu_cpu_api("get_edge_data", edge_data=True)
has_edge = gpu_cpu_api("has_edge")
neighbors = gpu_cpu_api("neighbors")
has_node = gpu_cpu_api("has_node")
nbunch_iter = gpu_cpu_api("nbunch_iter")
number_of_edges = Graph.number_of_edges
number_of_nodes = gpu_cpu_api("number_of_nodes")
order = gpu_cpu_api("order")
successors = gpu_cpu_api("successors")


class CudaDiGraph(CudaGraph):
#################
Expand Down Expand Up @@ -244,6 +282,7 @@ def to_undirected(self, reciprocal=False, as_view=False):
rv.graph.update(deepcopy(self.graph))
return rv

successors = CudaGraph.neighbors # Alias
# Many more methods to implement...

###################
Expand Down
82 changes: 79 additions & 3 deletions nx_cugraph/classes/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
__all__ = ["CudaGraph", "Graph"]

networkx_api = nxcg.utils.decorators.networkx_class(nx.Graph)
gpu_cpu_api = nxcg.utils.decorators._gpu_cpu_api(nx.Graph, __name__)

# The "everything" cache key is an internal implementation detail of NetworkX
# that may change between releases.
Expand All @@ -63,6 +64,8 @@
True, # Include all node values
# `.graph` attributes are always included now
)
_EDGE_KEY_INDEX = 0
_NODE_KEY_INDEX = 1

# Use to indicate when a full conversion to GPU failed so we don't try again.
_CANT_CONVERT_TO_GPU = "_CANT_CONVERT_TO_GPU"
Expand Down Expand Up @@ -211,8 +214,7 @@ def _cudagraph(self):
cache[_CACHE_KEY] = Gcg
return Gcg

@_cudagraph.setter
def _cudagraph(self, val, *, clear_cpu=True):
def _set_cudagraph(self, val, *, clear_cpu=True):
"""Set the full ``CudaGraph`` for this graph, or remove from device if None."""
if (cache := getattr(self, "__networkx_cache__", None)) is None:
# Should we warn?
Expand All @@ -229,6 +231,32 @@ def _cudagraph(self, val, *, clear_cpu=True):
for key in self._nx_attrs:
self.__dict__[key] = None

def _get_cudagraph(self, *, edge_data=False, node_data=False):
"""Get a valid cached ``CudaGraph``, optionally with edge or node data.

Returns None if no valid graph is found.

Parameters
----------
edge_data : bool, default False
Whether to return a CudaGraph with edge data.
node_data : bool, default False
Whether to return a CudaGraph with node data.
"""
nx_cache = getattr(self, "__networkx_cache__", None)
if nx_cache is None or _CANT_CONVERT_TO_GPU in nx_cache:
return None
cache = nx_cache.get("backends", {}).get("cugraph", {})
if _CACHE_KEY in cache:
# Always return the canonical CudaGraph if it exists
return cache[_CACHE_KEY]
for key, val in cache.items():
if (key[_EDGE_KEY_INDEX] is True or edge_data is False) and (
key[_NODE_KEY_INDEX] is True or node_data is False
):
return val
return None

@nx.Graph.name.setter
def name(self, s):
# Don't clear the cache when setting the name, since `.graph` is shared.
Expand Down Expand Up @@ -510,6 +538,54 @@ def from_dcsc(
**attr,
)

##########################
# Networkx graph methods #
##########################

# Dispatch to nx.Graph or CudaGraph
__contains__ = gpu_cpu_api("__contains__")
__len__ = gpu_cpu_api("__len__")
__iter__ = gpu_cpu_api("__iter__")

@networkx_api
def clear(self) -> None:
cudagraph = self._cudagraph if self._is_on_gpu else None
if self._is_on_cpu:
super().clear()
if cudagraph is not None:
cudagraph.clear()
self._set_cudagraph(cudagraph, clear_cpu=False)

@networkx_api
def clear_edges(self) -> None:
cudagraph = self._cudagraph if self._is_on_gpu else None
if self._is_on_cpu:
super().clear_edges()
if cudagraph is not None:
cudagraph.clear_edges()
self._set_cudagraph(cudagraph, clear_cpu=False)

get_edge_data = gpu_cpu_api("get_edge_data", edge_data=True)
has_edge = gpu_cpu_api("has_edge")
neighbors = gpu_cpu_api("neighbors")
has_node = gpu_cpu_api("has_node")
nbunch_iter = gpu_cpu_api("nbunch_iter")

@networkx_api
def number_of_edges(
self, u: NodeKey | None = None, v: NodeKey | None = None
) -> int:
if u is not None or v is not None:
# NotImplemented by CudaGraph
nx_class = self.to_networkx_class()
return nx_class.number_of_edges(self, u, v)
return self._number_of_edges(u, v)

_number_of_edges = gpu_cpu_api("number_of_edges")
number_of_nodes = gpu_cpu_api("number_of_nodes")
order = gpu_cpu_api("order")
# Future work: implement more graph methods, and handle e.g. `copy`


class CudaGraph:
# Tell networkx to dispatch calls with this object to nx-cugraph
Expand Down Expand Up @@ -805,7 +881,7 @@ def to_undirected(self, as_view: bool = False) -> CudaGraph:

def _to_compat_graph(self) -> Graph:
rv = self._to_compat_graph_class()()
rv._cudagraph = self
rv._set_cudagraph(self)
return rv

# Not implemented...
Expand Down
21 changes: 21 additions & 0 deletions nx_cugraph/classes/multidigraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
__all__ = ["CudaMultiDiGraph", "MultiDiGraph"]

networkx_api = nxcg.utils.decorators.networkx_class(nx.MultiDiGraph)
gpu_cpu_api = nxcg.utils.decorators._gpu_cpu_api(nx.MultiDiGraph, __name__)


class MultiDiGraph(nx.MultiDiGraph, MultiGraph, DiGraph):
Expand Down Expand Up @@ -50,6 +51,26 @@ def to_cudagraph_class(cls) -> type[CudaMultiDiGraph]:
def to_networkx_class(cls) -> type[nx.MultiDiGraph]:
return nx.MultiDiGraph

##########################
# Networkx graph methods #
##########################

# Dispatch to nx.MultiDiGraph or CudaMultiDiGraph
__contains__ = gpu_cpu_api("__contains__")
__len__ = gpu_cpu_api("__len__")
__iter__ = gpu_cpu_api("__iter__")
clear = DiGraph.clear
clear_edges = DiGraph.clear_edges
get_edge_data = gpu_cpu_api("get_edge_data", edge_data=True)
has_edge = gpu_cpu_api("has_edge")
neighbors = gpu_cpu_api("neighbors")
has_node = gpu_cpu_api("has_node")
nbunch_iter = gpu_cpu_api("nbunch_iter")
number_of_edges = MultiGraph.number_of_edges
number_of_nodes = gpu_cpu_api("number_of_nodes")
order = gpu_cpu_api("order")
successors = gpu_cpu_api("successors")


class CudaMultiDiGraph(CudaMultiGraph, CudaDiGraph):
is_directed = classmethod(MultiDiGraph.is_directed.__func__)
Expand Down
42 changes: 36 additions & 6 deletions nx_cugraph/classes/multigraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
__all__ = ["MultiGraph", "CudaMultiGraph"]

networkx_api = nxcg.utils.decorators.networkx_class(nx.MultiGraph)
gpu_cpu_api = nxcg.utils.decorators._gpu_cpu_api(nx.MultiGraph, __name__)


class MultiGraph(nx.MultiGraph, Graph):
Expand Down Expand Up @@ -277,6 +278,36 @@ def from_dcsc(
**attr,
)

##########################
# Networkx graph methods #
##########################

# Dispatch to nx.MultiGraph or CudaMultiGraph
__contains__ = gpu_cpu_api("__contains__")
__len__ = gpu_cpu_api("__len__")
__iter__ = gpu_cpu_api("__iter__")
clear = Graph.clear
clear_edges = Graph.clear_edges
get_edge_data = gpu_cpu_api("get_edge_data", edge_data=True)
has_edge = gpu_cpu_api("has_edge")
neighbors = gpu_cpu_api("neighbors")
has_node = gpu_cpu_api("has_node")
nbunch_iter = gpu_cpu_api("nbunch_iter")

@networkx_api
def number_of_edges(
self, u: NodeKey | None = None, v: NodeKey | None = None
) -> int:
if u is not None or v is not None:
# NotImplemented by CudaGraph
nx_class = self.to_networkx_class()
return nx_class.number_of_edges(self, u, v)
return self._number_of_edges(u, v)

_number_of_edges = gpu_cpu_api("number_of_edges")
number_of_nodes = gpu_cpu_api("number_of_nodes")
order = gpu_cpu_api("order")


class CudaMultiGraph(CudaGraph):
# networkx properties
Expand Down Expand Up @@ -390,14 +421,13 @@ def get_edge_data(
mask = (self.src_indices == u) & (self.dst_indices == v)
if not mask.any():
return default
if self.edge_keys is None:
if self.edge_keys is None and key is not None:
if self.edge_indices is None:
self._calculate_edge_indices()
if key is not None:
try:
mask = mask & (self.edge_indices == key)
except TypeError:
return default
try:
mask = mask & (self.edge_indices == key)
except TypeError:
return default
indices = cp.nonzero(mask)[0]
if indices.size == 0:
return default
Expand Down
50 changes: 50 additions & 0 deletions nx_cugraph/tests/test_graph_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from .testing_utils import assert_graphs_equal

CREATE_USING = [nxcg.Graph, nxcg.DiGraph, nxcg.MultiGraph, nxcg.MultiDiGraph]


def _create_Gs():
rv = []
Expand Down Expand Up @@ -65,3 +67,51 @@ def test_multidigraph_to_undirected():
Gcg = nxcg.CudaMultiDiGraph(Gnx)
with pytest.raises(NotImplementedError):
Gcg.to_undirected()


@pytest.mark.parametrize("create_using", CREATE_USING)
@pytest.mark.parametrize(
"method",
[
("__iter__", ()),
("__len__", ()),
("clear", ()),
("clear_edges", ()),
("nbunch_iter", ()),
("number_of_edges", ()),
("number_of_nodes", ()),
("order", ()),
("__contains__", (0,)),
("neighbors", (0,)),
("has_node", (0,)),
("successors", (0,)),
("get_edge_data", (0, 1)),
("has_edge", (0, 1)),
("nbunch_iter", ([0, 1],)),
],
)
@pytest.mark.parametrize("where", ["gpu", "cpu"])
def test_method_does_not_convert_to_cpu_or_gpu(create_using, method, where):
attr, args = method
if attr == "successors" and not create_using.is_directed():
return
G = nxcg.complete_graph(3, create_using=create_using)
is_on_gpu = where == "gpu"
is_on_cpu = where == "cpu"
if is_on_cpu:
G.add_edge(10, 20)
assert G._is_on_gpu == is_on_gpu
assert G._is_on_cpu == is_on_cpu
getattr(G, attr)(*args)
assert G._is_on_gpu == is_on_gpu
assert G._is_on_cpu == is_on_cpu
# Also usable from the class and dispatches correctly
func = getattr(create_using, attr)
func(G, *args)
assert G._is_on_gpu == is_on_gpu
assert G._is_on_cpu == is_on_cpu
# Basic "looks like networkx" checks
nx_class = create_using.to_networkx_class()
nx_func = getattr(nx_class, attr)
assert func.__name__ == nx_func.__name__
assert func.__module__.startswith("nx_cugraph")
28 changes: 28 additions & 0 deletions nx_cugraph/utils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,31 @@ def _default_should_run(*args, **kwargs):

def _restore_networkx_dispatched(name):
return getattr(BackendInterface, name)


def _gpu_cpu_api(nx_class, module_name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion, feel free to ignore - I think I'd find these names a bit more self-documenting:

Suggested change
def _gpu_cpu_api(nx_class, module_name):
def create_method_dispatcher_factory(nx_class, module_name):

which then gets called like:

create_method_dispatcher = 
    create_method_dispatcher_factory(...)

which then gets used like:

a = create_method_dispatcher("a")

...but I can also see how these feel like ctors and could be named with nouns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cycled through a few names for these myself. The current @_gpu_cpu_api matches the style we already have with @networkx_api. Naming decorators can be descriptive, so I think they both read perfectly fine. If we were assigning the values to variables and not using decorators, then a factory-like name like your suggestion would probably be more appropriate.

def _gpu_cpu_graph_method(attr, *, edge_data=False, node_data=False):
"""Dispatch property to NetworkX or CudaGraph based on cache.

For example, this will use any cached CudaGraph for ``len(G)``, which
prevents creating NetworkX data structures.
"""
nx_func = getattr(nx_class, attr)

def inner(self, *args, **kwargs):
rlratzel marked this conversation as resolved.
Show resolved Hide resolved
cuda_graph = self._get_cudagraph(edge_data=edge_data, node_data=node_data)
if cuda_graph is None:
return nx_func(self, *args, **kwargs)
return getattr(cuda_graph, attr)(*args, **kwargs)

inner.__name__ = nx_func.__name__
inner.__doc__ = nx_func.__doc__
inner.__qualname__ = nx_func.__qualname__
inner.__defaults__ = nx_func.__defaults__
inner.__kwdefaults__ = nx_func.__kwdefaults__
inner.__module__ = module_name
inner.__dict__.update(nx_func.__dict__)
inner.__wrapped__ = nx_func
return inner

return _gpu_cpu_graph_method
Loading