Skip to content

Commit

Permalink
Merge branch 'feature/multislicecomm'
Browse files Browse the repository at this point in the history
  • Loading branch information
giadarol committed Apr 4, 2016
2 parents 0b5b63f + c62574c commit 44d0891
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 24 deletions.
26 changes: 26 additions & 0 deletions communication_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,32 @@
from PyHEADTAIL.particles.particles import Particles


def combine_float_buffers(list_of_buffers):
N_buffers = len(list_of_buffers)
len_buffers = np.array(map(lambda seq: float(len(seq)), list_of_buffers))
return np.concatenate([np.array([float(N_buffers)]), len_buffers]+list_of_buffers)

def split_float_buffers(megabuffer):
i_mbuf = 0

N_buffers = int(megabuffer[0])
i_mbuf += 1

len_buffers = megabuffer[i_mbuf:i_mbuf+N_buffers]
i_mbuf += N_buffers

list_of_buffers = []
for i_buf in xrange(N_buffers):
lenbuf = int(len_buffers[i_buf])
list_of_buffers.append(megabuffer[i_mbuf:i_mbuf+lenbuf])
i_mbuf += lenbuf

return list_of_buffers





def list_of_strings_2_buffer(strlist):
data = ''.join(map(lambda s:s+';', strlist))+'\n'
buf_to_send = np.atleast_1d(np.int_(np.array(map(ord, list(data)))))
Expand Down
50 changes: 29 additions & 21 deletions ring_of_CPUs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import communication_helpers as ch

class RingOfCPUs(object):
def __init__(self, sim_content, single_CPU_mode = False):
def __init__(self, sim_content, N_pieces_per_transfer=1, single_CPU_mode = False):

self.sim_content = sim_content
self.N_turns = sim_content.N_turns
self.N_pieces_per_transfer = N_pieces_per_transfer

self.sim_content.ring_of_CPUs = self

Expand Down Expand Up @@ -66,26 +67,32 @@ def run(self):
t_last_turn = time.mktime(time.localtime())
while True: #(it will be stopped with a break)
orders_from_master = []
# pop a piece
try:
piece_to_send = self.pieces_to_be_treated.pop() # pop starts for the last slices
# (it is what we want, for the HEADTAIL
# slice order convention, z = -beta*c*t)
except IndexError:
piece_to_send = None
list_of_buffers_to_send = []

for _ in xrange(self.N_pieces_per_transfer):
# pop a piece
try:
piece_to_send = self.pieces_to_be_treated.pop() # pop starts for the last slices
# (it is what we want, for the HEADTAIL
# slice order convention, z = -beta*c*t)
except IndexError:
piece_to_send = None

list_of_buffers_to_send.append(self.sim_content.piece_to_buffer(piece_to_send))

# send it to the first element of the ring and receive from the last
sendbuf = self.sim_content.piece_to_buffer(piece_to_send)
sendbuf = ch.combine_float_buffers(list_of_buffers_to_send)
if len(sendbuf) > self.N_buffer_float_size:
raise ValueError('Float buffer is too small!')
self.comm.Sendrecv(sendbuf, dest=0, sendtag=0,
recvbuf=self.buf_float, source=self.master_id-1, recvtag=self.myid)
piece_received = self.sim_content.buffer_to_piece(self.buf_float)

# treat received piece
if piece_received is not None:
self.sim_content.treat_piece(piece_received)
self.pieces_treated.append(piece_received)
list_received_pieces = map(self.sim_content.buffer_to_piece, ch.split_float_buffers(self.buf_float))

# treat received pieces
for piece_received in list_received_pieces:
if piece_received is not None:
self.sim_content.treat_piece(piece_received)
self.pieces_treated.append(piece_received)

# end of turn
if len(self.pieces_treated)==self.N_pieces:
Expand Down Expand Up @@ -128,23 +135,24 @@ def run(self):

elif self.I_am_a_worker:
# initialization
piece_to_send = None
list_of_buffers_to_send = [self.sim_content.piece_to_buffer(None)]

while True:

sendbuf = self.sim_content.piece_to_buffer(piece_to_send)
sendbuf = ch.combine_float_buffers(list_of_buffers_to_send)
if len(sendbuf) > self.N_buffer_float_size:
raise ValueError('Float buffer is too small!')
self.comm.Sendrecv(sendbuf, dest=self.right, sendtag=self.right,
recvbuf=self.buf_float, source=self.left, recvtag=self.myid)
piece_received = self.sim_content.buffer_to_piece(self.buf_float)
list_received_pieces = map(self.sim_content.buffer_to_piece, ch.split_float_buffers(self.buf_float))

# treat received piece
if piece_received is not None:
self.sim_content.treat_piece(piece_received)
for piece_received in list_received_pieces:
if piece_received is not None:
self.sim_content.treat_piece(piece_received) #the elements of the list are being mutated

# prepare for next iteration
piece_to_send = piece_received
list_of_buffers_to_send = map(self.sim_content.piece_to_buffer, list_received_pieces)

# receive orders from the master
self.comm.Bcast(self.buf_int, self.master_id)
Expand Down
2 changes: 1 addition & 1 deletion test_physics/000_test_with_ecloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
from Simulation_with_eclouds import Simulation
simulation_content = Simulation()

myCPUring = RingOfCPUs(simulation_content)
myCPUring = RingOfCPUs(simulation_content, N_pieces_per_transfer=5)

myCPUring.run()
2 changes: 1 addition & 1 deletion test_ring_with_objects/000_test_without_ecloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
from Simulation import Simulation
simulation_content = Simulation()

myCPUring = RingOfCPUs(simulation_content)
myCPUring = RingOfCPUs(simulation_content, N_pieces_per_transfer=5)

myCPUring.run()
2 changes: 1 addition & 1 deletion test_ring_with_objects/002_test_with_ecloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
from Simulation_with_eclouds import Simulation
simulation_content = Simulation()

myCPUring = RingOfCPUs(simulation_content)
myCPUring = RingOfCPUs(simulation_content, N_pieces_per_transfer=5)

myCPUring.run()

0 comments on commit 44d0891

Please sign in to comment.