Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] add more routines to partition graph #249

Merged
merged 8 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ process group config.
- Updated Frechet Inception Distance to use Wasserstein 2-norm with improved
stability.
- Molecular Dynamics example.
- Improved usage of GraphPartition, added more flexible ways of defining a partitioned graph.

### Changed

Expand Down
8 changes: 7 additions & 1 deletion modulus/models/gnn_layers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .distributed_graph import DistributedGraph
from .distributed_graph import (
DistributedGraph,
GraphPartition,
partition_graph_by_coordinate_bbox,
partition_graph_nodewise,
partition_graph_with_id_mapping,
)
from .graph import CuGraphCSC
578 changes: 474 additions & 104 deletions modulus/models/gnn_layers/distributed_graph.py

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion modulus/models/gnn_layers/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# for Python versions < 3.11
from typing_extensions import Self

from modulus.models.gnn_layers import DistributedGraph
from modulus.models.gnn_layers import DistributedGraph, GraphPartition

try:
from pylibcugraphops.pytorch import BipartiteCSC, StaticCSC
Expand Down Expand Up @@ -90,6 +90,7 @@ def __init__(
cache_graph: bool = True,
partition_size: Optional[int] = -1,
partition_group_name: Optional[str] = None,
graph_partition: Optional[GraphPartition] = None,
) -> None:
self.offsets = offsets
self.indices = indices
Expand Down Expand Up @@ -121,6 +122,7 @@ def __init__(
self.indices,
partition_size,
partition_group_name,
graph_partition=graph_partition,
)
# overwrite graph information with local graph after distribution
self.offsets = self.dist_graph.graph_partition.local_offsets
Expand Down
2 changes: 1 addition & 1 deletion test/distributed/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def run_distributed_model_config(rank, model_parallel_size, verbose):
@pytest.mark.multigpu
def test_distributed_model_config():
num_gpus = torch.cuda.device_count()
assert num_gpus == 2, "Not enough GPUs available for test"
assert num_gpus >= 2, "Not enough GPUs available for test"
model_parallel_size = 2
verbose = False # Change to True for debug

Expand Down
2 changes: 1 addition & 1 deletion test/distributed/test_distributed_fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def run_distributed_fft(rank, model_parallel_size, verbose):
@pytest.mark.multigpu
def test_distributed_fft():
num_gpus = torch.cuda.device_count()
assert num_gpus == 2, "Not enough GPUs available for test"
assert num_gpus >= 2, "Not enough GPUs available for test"
model_parallel_size = 2
verbose = False # Change to True for debug

Expand Down
2 changes: 1 addition & 1 deletion test/distributed/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def run_process_groups_from_config(rank, model_parallel_size, verbose):
@pytest.mark.multigpu
def test_process_groups_from_config():
num_gpus = torch.cuda.device_count()
assert num_gpus == 2, "Not enough GPUs available for test"
assert num_gpus >= 2, "Not enough GPUs available for test"
model_parallel_size = 2
verbose = False # Change to True for debug

Expand Down
2 changes: 1 addition & 1 deletion test/models/graphcast/test_graphcast_snmg.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def run_test_distributed_graphcast(


@pytest.mark.multigpu
@pytest.mark.parametrize("dtype", [torch.float32, torch.float16, torch.bfloat16])
@pytest.mark.parametrize("dtype", [torch.float32, torch.float16])
@pytest.mark.parametrize("do_concat_trick", [False, True])
@pytest.mark.parametrize("do_checkpointing", [False, True])
def test_distributed_graphcast(dtype, do_concat_trick, do_checkpointing):
Expand Down
61 changes: 56 additions & 5 deletions test/models/meshgraphnet/test_meshgraphnet_snmg.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
from torch.nn.parallel import DistributedDataParallel

from modulus.distributed import DistributedManager
from modulus.models.gnn_layers import (
partition_graph_by_coordinate_bbox,
partition_graph_with_id_mapping,
)


@import_or_fail("dgl")
def run_test_distributed_meshgraphnet(rank, world_size, dtype):
def run_test_distributed_meshgraphnet(rank, world_size, dtype, partition_scheme):
from modulus.models.gnn_layers.utils import CuGraphCSC
from modulus.models.meshgraphnet.meshgraphnet import MeshGraphNet

Expand Down Expand Up @@ -93,13 +96,59 @@ def run_test_distributed_meshgraphnet(rank, world_size, dtype):
num_nodes,
num_nodes,
)

graph_partition = None

if partition_scheme == "nodewise":
pass # nodewise is default

elif partition_scheme == "coordinate_bbox":
src_coordinates = torch.rand((num_nodes, 1), device=offsets.device)
dst_coordinates = src_coordinates

step_size = 1.0 / (world_size + 1)
coordinate_separators_min = [[step_size * p] for p in range(world_size)]
coordinate_separators_max = [[step_size * (p + 1)] for p in range(world_size)]

graph_partition = partition_graph_by_coordinate_bbox(
offsets,
indices,
src_coordinates,
dst_coordinates,
coordinate_separators_min,
coordinate_separators_max,
world_size,
manager.rank,
manager.device,
)

elif partition_scheme == "mapping":
mapping_src_ids_to_ranks = torch.randint(
0, world_size, (num_nodes,), device=offsets.device
)
mapping_dst_ids_to_ranks = mapping_src_ids_to_ranks

graph_partition = partition_graph_with_id_mapping(
offsets,
indices,
mapping_src_ids_to_ranks,
mapping_dst_ids_to_ranks,
world_size,
manager.rank,
manager.device,
)

else:
assert False # only schemes above are supported

graph_multi_gpu = CuGraphCSC(
offsets.to(manager.device),
indices.to(manager.device),
num_nodes,
num_nodes,
partition_size=world_size,
partition_group_name="graph_partition",
graph_partition=graph_partition,
)

nfeat_single_gpu = (
Expand Down Expand Up @@ -183,16 +232,18 @@ def run_test_distributed_meshgraphnet(rank, world_size, dtype):
DistributedManager.cleanup()


@import_or_fail("dgl")
@pytest.mark.multigpu
@pytest.mark.parametrize("dtype", [torch.float32, torch.float16, torch.bfloat16])
def test_distributed_meshgraphnet(dtype):
@pytest.mark.parametrize("partition_scheme", ["nodewise", "coordinate_bbox", "mapping"])
@pytest.mark.parametrize("dtype", [torch.float32, torch.float16])
def test_distributed_meshgraphnet(dtype, partition_scheme, pytestconfig):
num_gpus = torch.cuda.device_count()
assert num_gpus >= 2, "Not enough GPUs available for test"
world_size = num_gpus

torch.multiprocessing.spawn(
run_test_distributed_meshgraphnet,
args=(world_size, dtype),
args=(world_size, dtype, partition_scheme),
nprocs=world_size,
start_method="spawn",
)
Expand Down
Loading