-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmpiworker
executable file
·28 lines (25 loc) · 1000 Bytes
/
mpiworker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python
import sys
from mpi4py import MPI
from neurogenesis.util import Logger, PrintColors
from neurogenesis.cluster import MPITags, Cluster, TaskQueue
from subprocess import call
def main():
comm = MPI.Comm.Get_parent().Merge()
print("started worker %s" % (comm.Get_rank()))
comm.Barrier()
while True:
reception_status = MPI.Status()
task = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=reception_status)
# execute task
if reception_status.Get_tag() == MPITags.WORK:
task.last_exit_code = call(task.executable_path, shell=True)
task.last_executed_on_rank = comm.Get_rank()
comm.send(task, Cluster.MASTER_CONTROLLER, MPITags.FEEDBACK)
elif reception_status.Get_tag() == MPITags.DIE:
Logger.debug("rank %s exiting" % (comm.Get_rank()))
sys.exit(0)
else:
Logger.error("Unknown TAG received!")
if __name__ == '__main__':
main()