Skip to content

Commit

Permalink
Add bfs_edges, bfs_tree, generic_bfs_edges
Browse files Browse the repository at this point in the history
  • Loading branch information
eriknw committed Dec 1, 2023
1 parent da1c3a1 commit 601f8b3
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 26 deletions.
6 changes: 6 additions & 0 deletions python/nx-cugraph/_nx_cugraph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
"ancestors",
"barbell_graph",
"betweenness_centrality",
"bfs_edges",
"bfs_predecessors",
"bfs_successors",
"bfs_tree",
"bull_graph",
"caveman_graph",
"chvatal_graph",
Expand All @@ -57,6 +59,7 @@
"from_pandas_edgelist",
"from_scipy_sparse_array",
"frucht_graph",
"generic_bfs_edges",
"heawood_graph",
"hits",
"house_graph",
Expand Down Expand Up @@ -103,11 +106,14 @@
"extra_docstrings": {
# BEGIN: extra_docstrings
"betweenness_centrality": "`weight` parameter is not yet supported.",
"bfs_edges": "`sort_neighbors` parameter is not yet supported.",
"bfs_predecessors": "`sort_neighbors` parameter is not yet supported.",
"bfs_successors": "`sort_neighbors` parameter is not yet supported.",
"bfs_tree": "`sort_neighbors` parameter is not yet supported.",
"edge_betweenness_centrality": "`weight` parameter is not yet supported.",
"eigenvector_centrality": "`nstart` parameter is not used, but it is checked for validity.",
"from_pandas_edgelist": "cudf.DataFrame inputs also supported.",
"generic_bfs_edges": "`neighbors` parameter is not yet supported.",
"k_truss": (
"Currently raises `NotImplementedError` for graphs with more than one connected\n"
"component when k >= 3. We expect to fix this soon."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,153 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import repeat

import cupy as cp
import networkx as nx
import pylibcugraph as plc

import nx_cugraph as nxcg
from nx_cugraph.convert import _to_graph
from nx_cugraph.utils import _groupby, index_dtype, networkx_algorithm

__all__ = [
"bfs_edges",
"bfs_tree",
"bfs_predecessors",
"bfs_successors",
"generic_bfs_edges",
]


@networkx_algorithm
def generic_bfs_edges(G, source, neighbors=None, depth_limit=None, sort_neighbors=None):
"""`neighbors` parameter is not yet supported."""
return bfs_edges(source, depth_limit=depth_limit)


@generic_bfs_edges._can_run
def _(G, source, neighbors=None, depth_limit=None, sort_neighbors=None):
return neighbors is None and sort_neighbors is None


@networkx_algorithm
def bfs_edges(G, source, reverse=False, depth_limit=None, sort_neighbors=None):
"""`sort_neighbors` parameter is not yet supported."""
# DRY warning: see also bfs_predecessors and bfs_tree
G = _to_graph(G)
if source not in G:
hash(source) # To raise TypeError if appropriate
raise nx.NetworkXError(
f"The node {source} is not in the {G.__class__.__name__.lower()}."
)
if depth_limit is not None and depth_limit < 1:
return

src_index = source if G.key_to_id is None else G.key_to_id[source]
distances, predecessors, node_ids = plc.bfs(
handle=plc.ResourceHandle(),
graph=G._get_plc_graph(switch_indices=reverse),
sources=cp.array([src_index], dtype=index_dtype),
direction_optimizing=False,
depth_limit=-1 if depth_limit is None else depth_limit,
compute_predecessors=True,
do_expensive_check=False,
)
mask = predecessors >= 0
distances = distances[mask]
predecessors = predecessors[mask]
node_ids = node_ids[mask]
groups = _groupby([distances, predecessors], node_ids)
id_to_key = G.id_to_key
for key in sorted(groups):
children_ids = groups[key]
parent_id = key[1]
parent = id_to_key[parent_id] if id_to_key is not None else parent_id
yield from zip(
repeat(parent, children_ids.size),
G._nodeiter_to_iter(children_ids.tolist()),
)


@bfs_edges._can_run
def _(G, source, reverse=False, depth_limit=None, sort_neighbors=None):
return sort_neighbors is None


@networkx_algorithm
def bfs_tree(G, source, reverse=False, depth_limit=None, sort_neighbors=None):
"""`sort_neighbors` parameter is not yet supported."""
# DRY warning: see also bfs_edges and bfs_predecessors
G = _to_graph(G)
if source not in G:
hash(source) # To raise TypeError if appropriate
raise nx.NetworkXError(
f"The node {source} is not in the {G.__class__.__name__.lower()}."
)
if depth_limit is not None and depth_limit < 1:
return nxcg.DiGraph.from_coo(
1,
cp.array([], dtype=index_dtype),
cp.array([], dtype=index_dtype),
id_to_key=[source],
)

src_index = source if G.key_to_id is None else G.key_to_id[source]
distances, predecessors, node_ids = plc.bfs(
handle=plc.ResourceHandle(),
graph=G._get_plc_graph(switch_indices=reverse),
sources=cp.array([src_index], dtype=index_dtype),
direction_optimizing=False,
depth_limit=-1 if depth_limit is None else depth_limit,
compute_predecessors=True,
do_expensive_check=False,
)
mask = predecessors >= 0
predecessors = predecessors[mask]
if predecessors.size == 0:
return nxcg.DiGraph.from_coo(
1,
cp.array([], dtype=index_dtype),
cp.array([], dtype=index_dtype),
id_to_key=[source],
)
node_ids = node_ids[mask]
# TODO: create renumbering helper function(s)
unique_node_ids = cp.unique(cp.hstack((predecessors, node_ids)))
# Renumber edges
# Option 1
src_indices = cp.searchsorted(unique_node_ids, predecessors)
dst_indices = cp.searchsorted(unique_node_ids, node_ids)
# Option 2
# mapper = cp.zeros(len(G), index_dtype)
# mapper[unique_node_ids] = cp.arange(unique_node_ids.size, dtype=mapper.dtype)
# src_indices = mapper[predecessors]
# dst_indices = mapper[node_ids]
# Renumber nodes
if (id_to_key := G.id_to_key) is not None:
key_to_id = {
id_to_key[old_index]: new_index
for new_index, old_index in enumerate(unique_node_ids.tolist())
}
else:
key_to_id = {
old_index: new_index
for new_index, old_index in enumerate(unique_node_ids.tolist())
}
return nxcg.DiGraph.from_coo(
unique_node_ids.size,
src_indices,
dst_indices,
key_to_id=key_to_id,
)


@bfs_tree._can_run
def _(G, source, reverse=False, depth_limit=None, sort_neighbors=None):
return sort_neighbors is None


@networkx_algorithm
def bfs_successors(G, source, depth_limit=None, sort_neighbors=None):
"""`sort_neighbors` parameter is not yet supported."""
Expand Down Expand Up @@ -50,11 +184,11 @@ def bfs_successors(G, source, depth_limit=None, sort_neighbors=None):
distances = distances[mask]
predecessors = predecessors[mask]
node_ids = node_ids[mask]
groups = _groupby(distances, [predecessors, node_ids])
groups = _groupby([distances, predecessors], node_ids)
id_to_key = G.id_to_key
for key in range(1, len(groups) + 1):
parent_ids, children_ids = groups[key]
parent_id = parent_ids[0].tolist()
for key in sorted(groups):
children_ids = groups[key]
parent_id = key[1]
parent = id_to_key[parent_id] if id_to_key is not None else parent_id
children = G._nodearray_to_list(children_ids)
yield (parent, children)
Expand All @@ -68,6 +202,7 @@ def _(G, source, depth_limit=None, sort_neighbors=None):
@networkx_algorithm
def bfs_predecessors(G, source, depth_limit=None, sort_neighbors=None):
"""`sort_neighbors` parameter is not yet supported."""
# DRY warning: see also bfs_edges and bfs_tree
G = _to_graph(G)
if source not in G:
hash(source) # To raise TypeError if appropriate
Expand All @@ -91,12 +226,15 @@ def bfs_predecessors(G, source, depth_limit=None, sort_neighbors=None):
distances = distances[mask]
predecessors = predecessors[mask]
node_ids = node_ids[mask]
groups = _groupby(distances, [predecessors, node_ids])
for key in range(1, len(groups) + 1):
parent_ids, children_ids = groups[key]
groups = _groupby([distances, predecessors], node_ids)
id_to_key = G.id_to_key
for key in sorted(groups):
children_ids = groups[key]
parent_id = key[1]
parent = id_to_key[parent_id] if id_to_key is not None else parent_id
yield from zip(
G._nodeiter_to_iter(children_ids.tolist()),
G._nodeiter_to_iter(parent_ids.tolist()),
repeat(parent, children_ids.size),
)


Expand Down
1 change: 1 addition & 0 deletions python/nx-cugraph/nx_cugraph/convert_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def from_pandas_edgelist(
graph_class, inplace = _create_using_class(create_using)
src_array = df[source].to_numpy()
dst_array = df[target].to_numpy()
# TODO: create renumbering helper function(s)
# Renumber step 0: node keys
nodes = np.unique(np.concatenate([src_array, dst_array]))
N = nodes.size
Expand Down
52 changes: 34 additions & 18 deletions python/nx-cugraph/nx_cugraph/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ def pairwise(it):


def _groupby(
groups: cp.ndarray,
groups: cp.ndarray | list[cp.ndarray],
values: cp.ndarray | list[cp.ndarray],
groups_are_canonical: bool = False,
) -> dict[int, cp.ndarray]:
"""Perform a groupby operation given an array of group IDs and array of values.
Parameters
----------
groups : cp.ndarray
Array that holds the group IDs.
groups : cp.ndarray or list of cp.ndarray
Array or list of arrays that holds the group IDs.
values : cp.ndarray or list of cp.ndarray
Array or list of arrays of values to be grouped according to groups.
Must be the same size as groups array.
Expand All @@ -78,27 +78,43 @@ def _groupby(
-------
dict with group IDs as keys and cp.ndarray as values.
"""
if groups.size == 0:
return {}
sort_indices = cp.argsort(groups)
sorted_groups = groups[sort_indices]
if not isinstance(values, list):
sorted_values = values[sort_indices]
if isinstance(groups, list):
if groups_are_canonical:
raise ValueError(
"`groups_are_canonical=True` is not allowed when `groups` is a list."
)
if len(groups) == 0 or (size := groups[0].size) == 0:
return {}
sort_indices = cp.lexsort(cp.vstack(groups[::-1]))
sorted_groups = cp.vstack([group[sort_indices] for group in groups])
prepend = sorted_groups[:, 0].max() + 1
changed = cp.abs(cp.diff(sorted_groups, prepend=prepend)).sum(axis=0)
changed[0] = 1
left_bounds = cp.nonzero(changed)[0]
else:
if (size := groups.size) == 0:
return {}
sort_indices = cp.argsort(groups)
sorted_groups = groups[sort_indices]
prepend = 1 if groups_are_canonical else sorted_groups[0] + 1
left_bounds = cp.nonzero(cp.diff(sorted_groups, prepend=prepend))[0]
if isinstance(values, list):
sorted_values = [vals[sort_indices] for vals in values]
prepend = 1 if groups_are_canonical else sorted_groups[0] + 1
left_bounds = cp.nonzero(cp.diff(sorted_groups, prepend=prepend))[0]
boundaries = pairwise(itertools.chain(left_bounds.tolist(), [groups.size]))
else:
sorted_values = values[sort_indices]
boundaries = pairwise(itertools.chain(left_bounds.tolist(), [size]))
if groups_are_canonical:
it = enumerate(boundaries)
elif isinstance(groups, list):
it = zip(map(tuple, sorted_groups.T[left_bounds].tolist()), boundaries)
else:
it = zip(sorted_groups[left_bounds].tolist(), boundaries)
if not isinstance(values, list):
return {group: sorted_values[start:end] for group, (start, end) in it}
return {
group: [sorted_vals[start:end] for sorted_vals in sorted_values]
for group, (start, end) in it
}
if isinstance(values, list):
return {
group: [sorted_vals[start:end] for sorted_vals in sorted_values]
for group, (start, end) in it
}
return {group: sorted_values[start:end] for group, (start, end) in it}


def _seed_to_int(seed: int | Random | None) -> int:
Expand Down

0 comments on commit 601f8b3

Please sign in to comment.