Skip to content

Commit

Permalink
staring simple python parallel scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
srinathv committed Nov 15, 2016
1 parent 96e374b commit 9f086db
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 0 deletions.
5 changes: 5 additions & 0 deletions codingTests/python/helloMpi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#hello.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
print "hello world from process ", rank
95 changes: 95 additions & 0 deletions codingTests/python/matmatMpi4py.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/usr/bin/env python

from __future__ import division

import numpy as np
from mpi4py import MPI
from time import time

#=============================================================================#

my_N = 3000
my_M = 3000

#=============================================================================#

NORTH = 0
SOUTH = 1
EAST = 2
WEST = 3



def pprint(string, comm=MPI.COMM_WORLD):
if comm.rank == 0:
print(string)


if __name__ == "__main__":
comm = MPI.COMM_WORLD

mpi_rows = int(np.floor(np.sqrt(comm.size)))
mpi_cols = comm.size // mpi_rows
if mpi_rows*mpi_cols > comm.size:
mpi_cols -= 1
if mpi_rows*mpi_cols > comm.size:
mpi_rows -= 1

pprint("Creating a %d x %d processor grid..." % (mpi_rows, mpi_cols) )

ccomm = comm.Create_cart( (mpi_rows, mpi_cols), periods=(True, True), reorder=True)

my_mpi_row, my_mpi_col = ccomm.Get_coords( ccomm.rank )
neigh = [0,0,0,0]

neigh[NORTH], neigh[SOUTH] = ccomm.Shift(0, 1)
neigh[EAST], neigh[WEST] = ccomm.Shift(1, 1)


# Create matrices
my_A = np.random.normal(size=(my_N, my_M)).astype(np.float32)
my_B = np.random.normal(size=(my_N, my_M)).astype(np.float32)
my_C = np.zeros_like(my_A)

tile_A = my_A
tile_B = my_B
tile_A_ = np.empty_like(my_A)
tile_B_ = np.empty_like(my_A)
req = [None, None, None, None]

t0 = time()
for r in xrange(mpi_rows):
req[EAST] = ccomm.Isend(tile_A , neigh[EAST])
req[WEST] = ccomm.Irecv(tile_A_, neigh[WEST])
req[SOUTH] = ccomm.Isend(tile_B , neigh[SOUTH])
req[NORTH] = ccomm.Irecv(tile_B_, neigh[NORTH])

#t0 = time()
my_C += np.dot(tile_A, tile_B)
#t1 = time()

req[0].Waitall(req)
#t2 = time()
#print("Time computing %6.2f %6.2f" % (t1-t0, t2-t1))
comm.barrier()
t_total = time()-t0

t0 = time()
np.dot(tile_A, tile_B)
t_serial = time()-t0

pprint(78*"=")
pprint("Computed (serial) %d x %d x %d in %6.2f seconds" % (my_M, my_M, my_N, t_serial))
pprint(" ... expecting parallel computation to take %6.2f seconds" % (mpi_rows*mpi_rows*mpi_cols*t_serial / comm.size))
pprint("Computed (parallel) %d x %d x %d in %6.2f seconds" % (mpi_rows*my_M, mpi_rows*my_M, mpi_cols*my_N, t_total))


#print "[%d] (%d,%d): %s" % (comm.rank, my_mpi_row, my_mpi_col, neigh)

comm.barrier()






25 changes: 25 additions & 0 deletions codingTests/python/numpyMulti.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
for i in range(100):
matrix = matrix * matrix
return matrix

if __name__ == '__main__':

matrices = []
for i in range(4):
matrices.append(np.random.random_integers(100, size=(1000, 1000)))

print timeit(lambda: map(mmul, matrices), number=20)

# after importing numpy, reset the CPU affinity of the parent process so
# that it will use all cores
#os.system("taskset -p 0xff %d" % os.getpid())

pool = Pool(8)
print timeit(lambda: pool.map(mmul, matrices), number=20)
113 changes: 113 additions & 0 deletions codingTests/python/ompNumpy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import os
import time
import math
import numpy as np
from numpy.linalg import svd as svd
import multiprocessing


# If numpy is compiled for OpenMP, then make sure to control
# the number of OpenMP threads via the OMP_NUM_THREADS environment
# variable before running this benchmark.


MATRIX_SIZE = 1000
MATRIX_COUNT = 16


def rnd_matrix():
offset = np.random.randint(1,10)
stretch = 2*np.random.rand()+0.1
return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE)


print "Creating input matrices in parent process."
# Create input in memory. Children access this input.
INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)]


def worker_function(result_queue, worker_index, chunk_boundary):
"""Work on a certain chunk of the globally defined `INPUT` list.
"""
result_chunk = []
for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]:
# Perform single value decomposition (CPU intense).
u, s, v = svd(m)
# Build single numeric value as output.
output = int(np.sum(s))
result_chunk.append(output)
result_queue.put((worker_index, result_chunk))


def work(n_workers=1):
def calc_chunksize(l, n):
"""Rudimentary function to calculate the size of chunks for equal
distribution of a list `l` among `n` workers.
"""
return int(math.ceil(len(l)/float(n)))

# Build boundaries (indices for slicing) for chunks of `INPUT` list.
chunk_size = calc_chunksize(INPUT, n_workers)
chunk_boundaries = [
(i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)]

# When n_workers and input list size are of same order of magnitude,
# the above method might have created less chunks than workers available.
if n_workers != len(chunk_boundaries):
return None

result_queue = multiprocessing.Queue()
# Prepare child processes.
children = []
for worker_index in xrange(n_workers):
children.append(
multiprocessing.Process(
target=worker_function,
args=(
result_queue,
worker_index,
chunk_boundaries[worker_index],
)
)
)

# Run child processes.
for c in children:
c.start()

# Create result list of length of `INPUT`. Assign results upon arrival.
results = [None] * len(INPUT)

# Wait for all results to arrive.
for _ in xrange(n_workers):
worker_index, result_chunk = result_queue.get(block=True)
chunk_boundary = chunk_boundaries[worker_index]
# Store the chunk of results just received to the overall result list.
results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk

# Join child processes (clean up zombies).
for c in children:
c.join()
return results


def main():
durations = []
n_children = [1, 2, 4]
for n in n_children:
print "Crunching input with %s child(ren)." % n
t0 = time.time()
result = work(n)
if result is None:
continue
duration = time.time() - t0
print "Result computed by %s child process(es): %s" % (n, result)
print "Duration: %.2f s" % duration
durations.append(duration)
normalized_durations = [durations[0]/d for d in durations]
for n, normdur in zip(n_children, normalized_durations):
print "%s-children speedup: %.2f" % (n, normdur)


if __name__ == '__main__':
main()

0 comments on commit 9f086db

Please sign in to comment.