Skip to content

Commit

Permalink
Add first tests for dynamic load balancing
Browse files Browse the repository at this point in the history
  • Loading branch information
IshaanDesai committed Jan 22, 2025
1 parent 239b8b3 commit f678756
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 32 deletions.
56 changes: 30 additions & 26 deletions micro_manager/adaptivity/global_adaptivity_lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,23 @@ def redistribute_sims(self, micro_sims: list) -> None:

def _get_ranks_of_sims(self) -> np.ndarray:
"""
Get the ranks of the simulations.
Get the ranks of all simulations.
Returns
-------
ranks_of_sim : np.ndarray
Array of ranks on which simulations exist.
"""
# Create a map of micro simulation global IDs and the ranks on which they are
micro_sims_on_this_rank = np.zeros(self._local_number_of_sims, dtype=np.intc)
for i in range(self._local_number_of_sims):
micro_sims_on_this_rank[i] = self._rank
local_gids_to_rank = dict()
for gid in self._global_ids:
local_gids_to_rank[gid] = self._rank

ranks_of_sim = np.zeros(self._global_number_of_sims, dtype=np.intc)
ranks_maps_as_list = self._comm.allgather(local_gids_to_rank)

self._comm.Allgatherv(micro_sims_on_this_rank, ranks_of_sim)
ranks_of_sim = np.zeros(self._global_number_of_sims, dtype=np.intc)
for ranks_map in ranks_maps_as_list:
for gid, rank in ranks_map.items():
ranks_of_sim[gid] = rank

return ranks_of_sim

Expand Down Expand Up @@ -117,7 +119,7 @@ def _redistribute_active_sims(self, micro_sims: list) -> None:
counter += 1

send_map: Dict[
int, list
int, int
] = (
dict()
) # keys are global IDs of sim states to send, values are ranks to send the sims to
Expand All @@ -127,7 +129,7 @@ def _redistribute_active_sims(self, micro_sims: list) -> None:
dict()
) # keys are global IDs of sim states to receive, values are ranks to receive from

for i in range(self._global_number_of_sims):
for i in range(np.count_nonzero(self._is_sim_active)):
if current_ranks_of_active_sims[i] != new_ranks_of_active_sims[i]:
if current_ranks_of_active_sims[i] == self._rank:
send_map[i] = new_ranks_of_active_sims[i]
Expand All @@ -136,13 +138,12 @@ def _redistribute_active_sims(self, micro_sims: list) -> None:

# Asynchronous send operations
send_reqs = []
for global_id, send_ranks in send_map.items():
for send_rank in send_ranks:
tag = self._create_tag(global_id, self._rank, send_rank)
req = self._comm.isend(
micro_sims[global_id].get_state(), dest=send_rank, tag=tag
)
send_reqs.append(req)
for global_id, send_rank in send_map.items():
tag = self._create_tag(global_id, self._rank, send_rank)
req = self._comm.isend(
micro_sims[global_id].get_state(), dest=send_rank, tag=tag
)
send_reqs.append(req)

# Asynchronous receive operations
recv_reqs = []
Expand All @@ -161,6 +162,7 @@ def _redistribute_active_sims(self, micro_sims: list) -> None:
for global_id in send_map.keys():
micro_sims[global_id] = None
self._local_number_of_sims -= 1
self._global_ids.remove(global_id)

# Create micro simulations and set them to the received states
for req in recv_reqs:
Expand All @@ -171,9 +173,12 @@ def _redistribute_active_sims(self, micro_sims: list) -> None:
)
micro_sims[global_id].set_state(output)
self._local_number_of_sims += 1
self._global_ids.append(global_id)

def _redistribute_inactive_sims(self, micro_sims):
""" """
"""
...
"""
ranks_of_sims = self._get_ranks_of_sims()

current_ranks_of_active_sims = []
Expand All @@ -199,7 +204,7 @@ def _redistribute_inactive_sims(self, micro_sims):
] = current_ranks_of_active_sims[active_gid]

send_map: Dict[
int, list
int, int
] = (
dict()
) # keys are global IDs of sim states to send, values are ranks to send the sims to
Expand All @@ -209,7 +214,7 @@ def _redistribute_inactive_sims(self, micro_sims):
dict()
) # keys are global IDs of sim states to receive, values are ranks to receive from

for i in range(self._global_number_of_sims):
for i in range(np.count_nonzero(self._is_sim_active == False)):
if current_ranks_of_inactive_sims[i] != new_ranks_of_inactive_sims[i]:
if current_ranks_of_active_sims[i] == self._rank:
send_map[i] = new_ranks_of_inactive_sims[i]
Expand All @@ -218,13 +223,12 @@ def _redistribute_inactive_sims(self, micro_sims):

# Asynchronous send operations
send_reqs = []
for global_id, send_ranks in send_map.items():
for send_rank in send_ranks:
tag = self._create_tag(global_id, self._rank, send_rank)
req = self._comm.isend(
micro_sims[global_id].get_state(), dest=send_rank, tag=tag
)
send_reqs.append(req)
for global_id, send_rank in send_map.items():
tag = self._create_tag(global_id, self._rank, send_rank)
req = self._comm.isend(
micro_sims[global_id].get_state(), dest=send_rank, tag=tag
)
send_reqs.append(req)

# Asynchronous receive operations
recv_reqs = []
Expand Down
10 changes: 5 additions & 5 deletions micro_manager/micro_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def __init__(self, config_file: str) -> None:

self._is_adaptivity_on = self._config.turn_on_adaptivity()

self._is_adaptivity_with_load_balancing = (
self._config.is_adaptivity_with_load_balancing()
)

if self._is_adaptivity_on:
self._data_for_adaptivity: Dict[str, np.ndarray] = dict()

Expand All @@ -120,10 +124,6 @@ def __init__(self, config_file: str) -> None:
)
self._micro_sims_active_steps = None

self._is_adaptivity_with_load_balancing = (
self._config.is_adaptivity_with_load_balancing()
)

if self._is_adaptivity_with_load_balancing:
self._load_balancing_n = self._config.get_load_balancing_n()

Expand Down Expand Up @@ -414,7 +414,7 @@ def initialize(self) -> None:
)
)
elif self._config.get_adaptivity_type() == "global":
if self._config._adaptivity_is_load_balancing():
if self._config._is_adaptivity_with_load_balancing():
self._adaptivity_controller: GlobalAdaptivityLBCalculator = (
GlobalAdaptivityLBCalculator(
self._config,
Expand Down
1 change: 0 additions & 1 deletion tests/unit/test_adaptivity_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from mpi4py import MPI

from micro_manager.adaptivity.global_adaptivity import GlobalAdaptivityCalculator
from micro_manager.adaptivity.global_adaptivity_lb import GlobalAdaptivityLBCalculator


class TestGlobalAdaptivity(TestCase):
Expand Down
105 changes: 105 additions & 0 deletions tests/unit/test_global_adaptivity_lb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import unittest
from unittest import TestCase
from unittest.mock import MagicMock

import numpy as np
from mpi4py import MPI

from micro_manager.adaptivity.global_adaptivity_lb import GlobalAdaptivityLBCalculator


class MicroSimulation:
def __init__(self, global_id) -> None:
self._global_id = global_id
self._state = [global_id] * 3

def get_global_id(self):
return self._global_id

def set_state(self, state):
self._state = state

def get_state(self):
return self._state.copy()


class TestGlobalAdaptivityLB(TestCase):
def setUp(self):
self._comm = MPI.COMM_WORLD
self._rank = self._comm.Get_rank()
self._size = self._comm.Get_size()

self._configurator = MagicMock()
self._configurator.get_micro_file_name = MagicMock(
return_value="test_global_adaptivity_lb"
)
self._configurator.get_adaptivity_similarity_measure = MagicMock(
return_value="L1"
)
self._configurator.get_output_dir = MagicMock(return_value="output_dir")

def test_get_ranks_of_sims(self):
""" """
if self._rank == 0:
global_ids = [0, 1, 2]
expected_ranks_of_sims = [0, 0, 0, 1, 1]
elif self._rank == 1:
global_ids = [3, 4]
expected_ranks_of_sims = [0, 0, 0, 1, 1]

adaptivity_controller = GlobalAdaptivityLBCalculator(
self._configurator, 5, global_ids, rank=self._rank, comm=self._comm
)

actual_ranks_of_sims = adaptivity_controller._get_ranks_of_sims()

self.assertTrue(np.array_equal(expected_ranks_of_sims, actual_ranks_of_sims))

def test_redistribute_active_sims(self):
"""
Test load balancing functionality to redistribute active simulations.
Run this test in parallel using MPI with 2 ranks.
"""
global_number_of_sims = 5

if self._rank == 0:
global_ids = [0, 1, 2]
expected_global_ids = [0, 2]
elif self._rank == 1:
global_ids = [3, 4]
expected_global_ids = [1, 3, 4]

expected_ranks_of_sims = [0, 1, 0, 1, 1]

adaptivity_controller = GlobalAdaptivityLBCalculator(
self._configurator,
global_number_of_sims,
global_ids,
rank=self._rank,
comm=self._comm,
)

adaptivity_controller._is_sim_active = np.array(
[True, True, False, False, False]
)
adaptivity_controller._sim_is_associated_to = [-2, -2, 0, 1, 0]

micro_sims = []
for i in range(global_number_of_sims):
if i in global_ids:
micro_sims.append(MicroSimulation(i))
else:
micro_sims.append(None)

adaptivity_controller._redistribute_active_sims(micro_sims)

actual_global_ids = []
for i in range(global_number_of_sims):
if micro_sims[i] is not None:
actual_global_ids.append(micro_sims[i].get_global_id())

self.assertEqual(actual_global_ids, expected_global_ids)

actual_ranks_of_sims = adaptivity_controller._get_ranks_of_sims()

self.assertTrue(np.array_equal(expected_ranks_of_sims, actual_ranks_of_sims))

0 comments on commit f678756

Please sign in to comment.