From 9f086dbb21335a11b60f8d37f2984a9d1b3f46be Mon Sep 17 00:00:00 2001 From: Srinath Vadlamani Date: Tue, 15 Nov 2016 12:56:12 -0700 Subject: [PATCH] staring simple python parallel scripts --- codingTests/python/helloMpi.py | 5 ++ codingTests/python/matmatMpi4py.py | 95 ++++++++++++++++++++++++ codingTests/python/numpyMulti.py | 25 +++++++ codingTests/python/ompNumpy.py | 113 +++++++++++++++++++++++++++++ 4 files changed, 238 insertions(+) create mode 100644 codingTests/python/helloMpi.py create mode 100755 codingTests/python/matmatMpi4py.py create mode 100644 codingTests/python/numpyMulti.py create mode 100644 codingTests/python/ompNumpy.py diff --git a/codingTests/python/helloMpi.py b/codingTests/python/helloMpi.py new file mode 100644 index 0000000..130baeb --- /dev/null +++ b/codingTests/python/helloMpi.py @@ -0,0 +1,5 @@ +#hello.py +from mpi4py import MPI +comm = MPI.COMM_WORLD +rank = comm.Get_rank() +print "hello world from process ", rank diff --git a/codingTests/python/matmatMpi4py.py b/codingTests/python/matmatMpi4py.py new file mode 100755 index 0000000..420e190 --- /dev/null +++ b/codingTests/python/matmatMpi4py.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python + +from __future__ import division + +import numpy as np +from mpi4py import MPI +from time import time + +#=============================================================================# + +my_N = 3000 +my_M = 3000 + +#=============================================================================# + +NORTH = 0 +SOUTH = 1 +EAST = 2 +WEST = 3 + + + +def pprint(string, comm=MPI.COMM_WORLD): + if comm.rank == 0: + print(string) + + +if __name__ == "__main__": + comm = MPI.COMM_WORLD + + mpi_rows = int(np.floor(np.sqrt(comm.size))) + mpi_cols = comm.size // mpi_rows + if mpi_rows*mpi_cols > comm.size: + mpi_cols -= 1 + if mpi_rows*mpi_cols > comm.size: + mpi_rows -= 1 + + pprint("Creating a %d x %d processor grid..." % (mpi_rows, mpi_cols) ) + + ccomm = comm.Create_cart( (mpi_rows, mpi_cols), periods=(True, True), reorder=True) + + my_mpi_row, my_mpi_col = ccomm.Get_coords( ccomm.rank ) + neigh = [0,0,0,0] + + neigh[NORTH], neigh[SOUTH] = ccomm.Shift(0, 1) + neigh[EAST], neigh[WEST] = ccomm.Shift(1, 1) + + + # Create matrices + my_A = np.random.normal(size=(my_N, my_M)).astype(np.float32) + my_B = np.random.normal(size=(my_N, my_M)).astype(np.float32) + my_C = np.zeros_like(my_A) + + tile_A = my_A + tile_B = my_B + tile_A_ = np.empty_like(my_A) + tile_B_ = np.empty_like(my_A) + req = [None, None, None, None] + + t0 = time() + for r in xrange(mpi_rows): + req[EAST] = ccomm.Isend(tile_A , neigh[EAST]) + req[WEST] = ccomm.Irecv(tile_A_, neigh[WEST]) + req[SOUTH] = ccomm.Isend(tile_B , neigh[SOUTH]) + req[NORTH] = ccomm.Irecv(tile_B_, neigh[NORTH]) + + #t0 = time() + my_C += np.dot(tile_A, tile_B) + #t1 = time() + + req[0].Waitall(req) + #t2 = time() + #print("Time computing %6.2f %6.2f" % (t1-t0, t2-t1)) + comm.barrier() + t_total = time()-t0 + + t0 = time() + np.dot(tile_A, tile_B) + t_serial = time()-t0 + + pprint(78*"=") + pprint("Computed (serial) %d x %d x %d in %6.2f seconds" % (my_M, my_M, my_N, t_serial)) + pprint(" ... expecting parallel computation to take %6.2f seconds" % (mpi_rows*mpi_rows*mpi_cols*t_serial / comm.size)) + pprint("Computed (parallel) %d x %d x %d in %6.2f seconds" % (mpi_rows*my_M, mpi_rows*my_M, mpi_cols*my_N, t_total)) + + + #print "[%d] (%d,%d): %s" % (comm.rank, my_mpi_row, my_mpi_col, neigh) + + comm.barrier() + + + + + + diff --git a/codingTests/python/numpyMulti.py b/codingTests/python/numpyMulti.py new file mode 100644 index 0000000..c566cea --- /dev/null +++ b/codingTests/python/numpyMulti.py @@ -0,0 +1,25 @@ +import numpy as np +import os +from timeit import timeit +from multiprocessing import Pool + + +def mmul(matrix): + for i in range(100): + matrix = matrix * matrix + return matrix + +if __name__ == '__main__': + + matrices = [] + for i in range(4): + matrices.append(np.random.random_integers(100, size=(1000, 1000))) + + print timeit(lambda: map(mmul, matrices), number=20) + + # after importing numpy, reset the CPU affinity of the parent process so + # that it will use all cores + #os.system("taskset -p 0xff %d" % os.getpid()) + + pool = Pool(8) + print timeit(lambda: pool.map(mmul, matrices), number=20) diff --git a/codingTests/python/ompNumpy.py b/codingTests/python/ompNumpy.py new file mode 100644 index 0000000..ef33a8c --- /dev/null +++ b/codingTests/python/ompNumpy.py @@ -0,0 +1,113 @@ +import os +import time +import math +import numpy as np +from numpy.linalg import svd as svd +import multiprocessing + + +# If numpy is compiled for OpenMP, then make sure to control +# the number of OpenMP threads via the OMP_NUM_THREADS environment +# variable before running this benchmark. + + +MATRIX_SIZE = 1000 +MATRIX_COUNT = 16 + + +def rnd_matrix(): + offset = np.random.randint(1,10) + stretch = 2*np.random.rand()+0.1 + return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE) + + +print "Creating input matrices in parent process." +# Create input in memory. Children access this input. +INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)] + + +def worker_function(result_queue, worker_index, chunk_boundary): + """Work on a certain chunk of the globally defined `INPUT` list. + """ + result_chunk = [] + for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]: + # Perform single value decomposition (CPU intense). + u, s, v = svd(m) + # Build single numeric value as output. + output = int(np.sum(s)) + result_chunk.append(output) + result_queue.put((worker_index, result_chunk)) + + +def work(n_workers=1): + def calc_chunksize(l, n): + """Rudimentary function to calculate the size of chunks for equal + distribution of a list `l` among `n` workers. + """ + return int(math.ceil(len(l)/float(n))) + + # Build boundaries (indices for slicing) for chunks of `INPUT` list. + chunk_size = calc_chunksize(INPUT, n_workers) + chunk_boundaries = [ + (i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)] + + # When n_workers and input list size are of same order of magnitude, + # the above method might have created less chunks than workers available. + if n_workers != len(chunk_boundaries): + return None + + result_queue = multiprocessing.Queue() + # Prepare child processes. + children = [] + for worker_index in xrange(n_workers): + children.append( + multiprocessing.Process( + target=worker_function, + args=( + result_queue, + worker_index, + chunk_boundaries[worker_index], + ) + ) + ) + + # Run child processes. + for c in children: + c.start() + + # Create result list of length of `INPUT`. Assign results upon arrival. + results = [None] * len(INPUT) + + # Wait for all results to arrive. + for _ in xrange(n_workers): + worker_index, result_chunk = result_queue.get(block=True) + chunk_boundary = chunk_boundaries[worker_index] + # Store the chunk of results just received to the overall result list. + results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk + + # Join child processes (clean up zombies). + for c in children: + c.join() + return results + + +def main(): + durations = [] + n_children = [1, 2, 4] + for n in n_children: + print "Crunching input with %s child(ren)." % n + t0 = time.time() + result = work(n) + if result is None: + continue + duration = time.time() - t0 + print "Result computed by %s child process(es): %s" % (n, result) + print "Duration: %.2f s" % duration + durations.append(duration) + normalized_durations = [durations[0]/d for d in durations] + for n, normdur in zip(n_children, normalized_durations): + print "%s-children speedup: %.2f" % (n, normdur) + + +if __name__ == '__main__': + main()