From 63527db49b1c6a901c95a6861df2bd2653e2c181 Mon Sep 17 00:00:00 2001 From: avalluvan <62253557+avalluvan@users.noreply.github.com> Date: Sun, 15 Dec 2024 12:08:17 -0800 Subject: [PATCH] Migrated RichardsonLucy slicing and message passing to DataIFWithParallel Three instances (all pertaining to saving results) remain in RichardsonLucy class. --- cosipy/image_deconvolution/RichardsonLucy.py | 232 +++--------------- .../dataIFWithParallelSupport.py | 114 ++++++++- 2 files changed, 144 insertions(+), 202 deletions(-) diff --git a/cosipy/image_deconvolution/RichardsonLucy.py b/cosipy/image_deconvolution/RichardsonLucy.py index d3f31236..1c22e021 100644 --- a/cosipy/image_deconvolution/RichardsonLucy.py +++ b/cosipy/image_deconvolution/RichardsonLucy.py @@ -1,5 +1,4 @@ import os -from pathlib import Path import copy import logging @@ -11,14 +10,10 @@ import pandas as pd import astropy.units as u from astropy.io import fits -from mpi4py import MPI -from histpy import Histogram, HealpixAxis, Axes +from histpy import Histogram from .deconvolution_algorithm_base import DeconvolutionAlgorithmBase -# Define MPI variable -MASTER = 0 # Indicates master process - class RichardsonLucy(DeconvolutionAlgorithmBase): """ A class for the RichardsonLucy algorithm. @@ -75,33 +70,23 @@ def __init__(self, initial_model:Histogram, dataset:list, mask, parameter, comm= else: os.makedirs(self.save_results_directory) - # Specific to parallel implementation: - # 1. Assume numproc is known by the process that invoked `run_deconvolution()` - # 2. All processes have loaded event data, background, and created - # initial model (from model properties) independently - self.parallel = False - if comm is not None: - self.comm = comm - if self.comm.Get_size() > 1: - self.parallel = True - logger.info('Image Deconvolution set to run in parallel mode') + if comm is None: + self.MASTER = True + elif comm.Get_rank() == 0: + self.MASTER = True + else: + self.MASTER = False def initialization(self): """ initialization before performing image deconvolution """ - # Parallel - if self.parallel: - self.numtasks = self.comm.Get_size() - self.taskid = self.comm.Get_rank() - # Master - if (not self.parallel) or (self.parallel and (self.taskid == MASTER)): + if self.MASTER: # Clear results self.results.clear() - # All # clear counter self.iteration_count = 0 @@ -111,20 +96,6 @@ def initialization(self): # calculate exposure map self.summed_exposure_map = self.calc_summed_exposure_map() - # Parallel - if self.parallel: - ''' - Synchronization Barrier 0 (performed only once) - ''' - total_exposure_map = np.empty_like(self.summed_exposure_map, dtype=np.float64) - - # Gather all arrays into recvbuf - self.comm.Allreduce(self.summed_exposure_map.contents, total_exposure_map, op=MPI.SUM) # For multiple MPI processes, full = [slice1, ... sliceN] - - # Reshape the received buffer back into the original array shape - self.summed_exposure_map[:] = total_exposure_map - - # All # mask setting if self.mask is None and np.any(self.summed_exposure_map.contents == 0): self.mask = Histogram(self.model.axes, contents = self.summed_exposure_map.contents > 0) @@ -153,56 +124,10 @@ def Estep(self): E-step (but it will be skipped). """ - # All # expected count histograms - expectation_list_slice = self.calc_expectation_list(model = self.model, dict_bkg_norm = self.dict_bkg_norm) + self.expectation_list = self.calc_expectation_list(model = self.model, dict_bkg_norm = self.dict_bkg_norm) logger.info("The expected count histograms were calculated with the initial model map.") - # Serial - if not self.parallel: - self.expectation_list = expectation_list_slice # If single process, then full = slice - - # Parallel - elif self.parallel: - ''' - Synchronization Barrier 1 - ''' - - self.expectation_list = [] - for data, epsilon_slice in zip(self.dataset, expectation_list_slice): - # Gather the sizes of local arrays from all processes - local_size = np.array([epsilon_slice.contents.size], dtype=np.int32) - all_sizes = np.zeros(self.numtasks, dtype=np.int32) - self.comm.Allgather(local_size, all_sizes) - - # Calculate displacements - displacements = np.insert(np.cumsum(all_sizes), 0, 0)[0:-1] - - # Create a buffer to receive the gathered data - total_size = int(np.sum(all_sizes)) - recvbuf = np.empty(total_size, dtype=np.float64) # Receive buffer - - # Gather all arrays into recvbuf - self.comm.Allgatherv(epsilon_slice.contents.flatten(), [recvbuf, all_sizes, displacements, MPI.DOUBLE]) # For multiple MPI processes, full = [slice1, ... sliceN] - - # Reshape the received buffer back into the original 3D array shape - epsilon = np.concatenate([ recvbuf[displacements[i]:displacements[i] + all_sizes[i]].reshape((-1,) + epsilon_slice.contents.shape[1:]) for i in range(self.numtasks) ], axis=-1) - - # Create Histogram that will be appended to self.expectation_list - axes = [] - for axis in data.event.axes: - if axis.label == 'PsiChi': - axes.append(HealpixAxis(edges = axis.edges, - label = axis.label, - scale = axis._scale, - coordsys = axis._coordsys, - nside = axis.nside)) - else: - axes.append(axis) - - # Add to list that manages multiple datasets - self.expectation_list.append(Histogram(Axes(axes), contents=epsilon, unit=data.event.unit)) # TODO: Could maybe be simplified using Histogram.slice[] - # At the end of this function, all processes should have a complete `self.expectation_list` # to proceed to the Mstep function @@ -211,65 +136,15 @@ def Mstep(self): M-step in RL algorithm. """ - # All ratio_list = [ data.event / expectation for data, expectation in zip(self.dataset, self.expectation_list) ] # delta model - C_slice = self.calc_summed_T_product(ratio_list) - - # Serial - if not self.parallel: - sum_T_product = C_slice - - # Parallel - elif self.parallel: - ''' - Synchronization Barrier 2 - ''' - - # Gather the sizes of local arrays from all processes - local_size = np.array([C_slice.contents.size], dtype=np.int32) - all_sizes = np.zeros(self.numtasks, dtype=np.int32) - self.comm.Allgather(local_size, all_sizes) - - # Calculate displacements - displacements = np.insert(np.cumsum(all_sizes), 0, 0)[0:-1] - - # Create a buffer to receive the gathered data - total_size = int(np.sum(all_sizes)) - recvbuf = np.empty(total_size, dtype=np.float64) # Receive buffer - - # Gather all arrays into recvbuf - self.comm.Gatherv(C_slice.contents.value.flatten(), [recvbuf, all_sizes, displacements, MPI.DOUBLE]) # For multiple MPI processes, full = [slice1, ... sliceN] - - # Master - if self.taskid == MASTER: - # Reshape the received buffer back into the original 2D array shape - C = np.concatenate([ recvbuf[displacements[i]:displacements[i] + all_sizes[i]].reshape((-1,) + C_slice.contents.shape[1:]) for i in range(self.numtasks) ], axis=0) - - # Create Histogram object for sum_T_product - axes = [] - for axis in self.model.axes: - if axis.label == 'lb': - axes.append(HealpixAxis(edges = axis.edges, - label = axis.label, - scale = axis._scale, - coordsys = axis._coordsys, - nside = axis.nside)) - else: - axes.append(axis) - - # C_slice (only slice operated on by current node) --> sum_T_product (all ) - sum_T_product = Histogram(Axes(axes), contents=C, unit=C_slice.unit) # TODO: Could maybe be simplified using Histogram.slice[] - - # Master - if (not self.parallel) or ((self.parallel) and (self.taskid == MASTER)): - self.delta_model = self.model * (sum_T_product/self.summed_exposure_map - 1) + sum_T_product = self.calc_summed_T_product(ratio_list) + self.delta_model = self.model * (sum_T_product/self.summed_exposure_map - 1) - if self.mask is not None: - self.delta_model = self.delta_model.mask_pixels(self.mask) + if self.mask is not None: + self.delta_model = self.delta_model.mask_pixels(self.mask) - # All # background normalization optimization if self.do_bkg_norm_optimization: for key in self.dict_bkg_norm.keys(): @@ -286,11 +161,10 @@ def Mstep(self): self.dict_bkg_norm[key] = bkg_norm - # Alternately, let MASTER node calculate it and broadcast the value - # self.comm.bcast(self.dict_bkg_norm[key], root=MASTER) # This synchronization barrier is not required during the final iteration - - # At the end of this function, just the MASTER MPI process needs to have a full - # copy of delta_model + # At the end of this function, all the nodes will have a full + # copy of delta_model. Although this is not necessary, this is + # the easiest way without editing RichardsonLucy.py. + # The same applies for self.dict_bkg_norm def post_processing(self): """ @@ -300,57 +174,33 @@ def post_processing(self): - acceleration of RL algirithm: the normalization of delta map is increased as long as the updated image has no non-negative components. """ - # Master - if (not self.parallel) or ((self.parallel) and (self.taskid == MASTER)): - self.processed_delta_model = copy.deepcopy(self.delta_model) + # All + self.processed_delta_model = copy.deepcopy(self.delta_model) - if self.do_response_weighting: - self.processed_delta_model[:] *= self.response_weighting_filter + if self.do_response_weighting: + self.processed_delta_model[:] *= self.response_weighting_filter - if self.do_smoothing: - self.processed_delta_model = self.processed_delta_model.smoothing(fwhm = self.smoothing_fwhm) - - if self.do_acceleration: - self.alpha = self.calc_alpha(self.processed_delta_model, self.model) - else: - self.alpha = 1.0 - - self.model = self.model + self.processed_delta_model * self.alpha - self.model[:] = np.where(self.model.contents < self.minimum_flux, self.minimum_flux, self.model.contents) - - if self.mask is not None: - self.model = self.model.mask_pixels(self.mask) - - # update loglikelihood_list - self.loglikelihood_list = self.calc_loglikelihood_list(self.expectation_list) - logger.debug("The loglikelihood list was updated with the new expected count histograms.") - - # Parallel - if self.parallel: - ''' - Synchronization Barrier 3 - ''' - # Initialize new variable as MPI only sends values and not units - if self.taskid == MASTER: - buffer = self.model.contents.value - else: - buffer = np.empty(self.model.contents.shape, dtype=np.float64) + if self.do_smoothing: + self.processed_delta_model = self.processed_delta_model.smoothing(fwhm = self.smoothing_fwhm) + + if self.do_acceleration: + self.alpha = self.calc_alpha(self.processed_delta_model, self.model) + else: + self.alpha = 1.0 - self.comm.Bcast([buffer, MPI.DOUBLE], root=MASTER) # This synchronization barrier is not required during the final iteration + self.model = self.model + self.processed_delta_model * self.alpha + self.model[:] = np.where(self.model.contents < self.minimum_flux, self.minimum_flux, self.model.contents) - if self.taskid > MASTER: - # Reconstruct ModelBase object for self.model - new_model = self.model.__class__(nside = self.model.axes['lb'].nside, # self.model.__class__ will return the Class of which `model` is an object - energy_edges = self.model.axes['Ei'].edges, - scheme = self.model.axes['lb']._scheme, - coordsys = self.model.axes['lb'].coordsys, - unit = self.model.unit) - - new_model[:] = buffer * self.model.unit - self.model = new_model + if self.mask is not None: + self.model = self.model.mask_pixels(self.mask) + + # update loglikelihood_list + self.loglikelihood_list = self.calc_loglikelihood_list(self.expectation_list) + logger.debug("The loglikelihood list was updated with the new expected count histograms.") # At the end of this function, all MPI processes needs to have a full - # copy of updated model. + # copy of updated model. They calculate it from delta_model (which is + # distributed by MPI.Bcast) independently def register_result(self): """ @@ -365,7 +215,7 @@ def register_result(self): """ # Master - if (not self.parallel) or ((self.parallel) and (self.taskid == MASTER)): + if self.MASTER: this_result = {"iteration": self.iteration_count, "model": copy.deepcopy(self.model), "delta_model": copy.deepcopy(self.delta_model), @@ -391,7 +241,6 @@ def check_stopping_criteria(self): bool """ - # All if self.iteration_count < self.iteration_max: return False return True @@ -402,7 +251,7 @@ def finalization(self): """ # Master - if (not self.parallel) or ((self.parallel) and (self.taskid == MASTER)): + if self.MASTER: if self.save_results == True: logger.info('Saving results in {self.save_results_directory}') @@ -446,7 +295,6 @@ def calc_alpha(self, delta_model, model): Acceleration parameter """ - # Master: Only invoked by master process diff = -1 * (model / delta_model).contents diff[(diff <= 0) | (delta_model.contents == 0)] = np.inf diff --git a/cosipy/image_deconvolution/dataIFWithParallelSupport.py b/cosipy/image_deconvolution/dataIFWithParallelSupport.py index 987036a5..4723abd1 100644 --- a/cosipy/image_deconvolution/dataIFWithParallelSupport.py +++ b/cosipy/image_deconvolution/dataIFWithParallelSupport.py @@ -6,6 +6,7 @@ import numpy as np import astropy.units as u +from mpi4py import MPI import h5py from histpy import Histogram, Axes, Axis, HealpixAxis @@ -17,6 +18,7 @@ # and will be supported at a later release NUMROWS = 3072 NUMCOLS = 3072 +MASTER = 0 # Define data paths DRM_DIR = Path('/Users/penguin/Documents/Grad School/Research/COSI/COSIpy/docs/tutorials/data') @@ -109,6 +111,19 @@ def __init__(self, event_filename, bkg_filename, drm_filename, name = None, comm ImageDeconvolutionDataInterfaceBase.__init__(self, name) + # Specific to parallel implementation: + # 1. Assume numproc is known by the process that invoked `run_deconvolution()` + # 2. All processes have loaded event data, background, and created + # initial model (from model properties) independently + self.parallel = False + if comm is not None: + self.numtasks = comm.Get_size() + self.taskid = comm.Get_rank() + self._comm = comm + if self.numtasks > 1: + self.parallel = True + logger.info('Image Deconvolution set to run in parallel mode') + self._MPI_init(event_filename, bkg_filename, drm_filename, comm) # None if using Galactic CDS, required if using local CDS @@ -119,8 +134,8 @@ def __init__(self, event_filename, bkg_filename, drm_filename, name = None, comm def _MPI_load_data(self, event_filename, bkg_filename, drm_filename, comm): - numtasks = comm.Get_size() - taskid = comm.Get_rank() + numtasks = self.numtasks + taskid = self.taskid print(f'TaskID = {taskid}, Number of tasks = {numtasks}') @@ -148,6 +163,9 @@ def _MPI_load_data(self, event_filename, bkg_filename, drm_filename, comm): self._image_response = load_response_matrix(comm, self.start_col, self.end_col, filename=drm_filename) self._image_response_T = load_response_matrix_transpose(comm, self.start_row, self.end_row, filename=drm_filename) + self.col_size = 1 # TODO: This can change for more sophisticated model space contents + self.row_size = np.prod(self.event.contents.shape[:-1]) + def _MPI_set_aux_data(self): # Set variable _model_axes @@ -216,6 +234,18 @@ def _calc_exposure_map(self): # [NuLambda, Ei, Em, Phi, PsiChi] -> [NuLambda, Ei] # [lb, NuLambda] x [NuLambda, Ei] -> [lb, Ei] + if self.parallel: + ''' + Synchronization Barrier 0 (performed only once) + ''' + total_exposure_map = np.empty_like(self.exposure_map.contents, dtype=np.float64) + + # Gather all arrays into recvbuf + self._comm.Allreduce(self.exposure_map.contents, total_exposure_map, op=MPI.SUM) # For multiple MPI processes, full = [slice1, ... sliceN] + + # Reshape the received buffer back into the original array shape + self.exposure_map[:] = total_exposure_map + logger.info("Finished...") def calc_expectation(self, model, dict_bkg_norm = None, almost_zero = 1e-12): @@ -246,10 +276,10 @@ def calc_expectation(self, model, dict_bkg_norm = None, almost_zero = 1e-12): # However it is likely that it will have such an axis in the future in order to consider background variability depending on time and pointign direction etc. # Then, the implementation here will not work. Thus, keep in mind that we need to modify it once the response format is fixed. - expectation = Histogram(self._data_axes_slice) + expectation_slice = Histogram(self._data_axes_slice) if self._coordsys_conv_matrix is None: - expectation[:] = np.tensordot( model.contents, self._image_response.contents, axes = ([0,1],[0,1])) * model.axes['lb'].pixarea() + expectation_slice[:] = np.tensordot( model.contents, self._image_response.contents, axes = ([0,1],[0,1])) * model.axes['lb'].pixarea() # ['lb', 'Ei'] x [NuLambda(lb), Ei, Em, Phi, PsiChi] -> [Em, Phi, PsiChi] else: map_rotated = np.tensordot(self._coordsys_conv_matrix.contents, model.contents, axes = ([1], [0])) @@ -258,15 +288,53 @@ def calc_expectation(self, model, dict_bkg_norm = None, almost_zero = 1e-12): map_rotated *= model.axes['lb'].pixarea() # data.coordsys_conv_matrix.contents is sparse, so the unit should be restored. # the unit of map_rotated is 1/cm2 ( = s * 1/cm2/s/sr * sr) - expectation[:] = np.tensordot( map_rotated, self._image_response.contents, axes = ([1,2], [0,1])) + expectation_slice[:] = np.tensordot( map_rotated, self._image_response.contents, axes = ([1,2], [0,1])) # [Time/ScAtt, NuLambda, Ei] x [NuLambda, Ei, Em, Phi, PsiChi] -> [Time/ScAtt, Em, Phi, PsiChi] if dict_bkg_norm is not None: for key in self.keys_bkg_models(): - expectation += self.bkg_model_slice(key) * dict_bkg_norm[key] + expectation_slice += self.bkg_model_slice(key) * dict_bkg_norm[key] + + expectation_slice += almost_zero + + # Parallel + if self.parallel: + ''' + Synchronization Barrier 1 + ''' + + # Calculate all_sizes and displacements + all_sizes = [(self.averow) * self.row_size] * (self.numtasks-1) + [(self.averow + self.extra_rows) * self.row_size] + displacements = np.insert(np.cumsum(all_sizes), 0, 0)[0:-1] + + # Create a receive buffer to receive the gathered data + recvbuf = np.empty(int(np.sum(all_sizes)), dtype=np.float64) + + # Gather the sizes of local arrays from all processes + local_size = np.array([expectation_slice.contents.size], dtype=np.int32) + all_sizes = np.zeros(self.numtasks, dtype=np.int32) + self._comm.Allgather(local_size, all_sizes) + + # Calculate displacements + displacements = np.insert(np.cumsum(all_sizes), 0, 0)[0:-1] + + # Create a buffer to receive the gathered data + total_size = int(np.sum(all_sizes)) + recvbuf = np.empty(total_size, dtype=np.float64) # Receive buffer - expectation += almost_zero + # Gather all arrays into recvbuf + self._comm.Allgatherv(expectation_slice.contents.flatten(), [recvbuf, all_sizes, displacements, MPI.DOUBLE]) # For multiple MPI processes, full = [slice1, ... sliceN] + + # Reshape the received buffer back into the original 3D array shape + epsilon = np.concatenate([ recvbuf[displacements[i]:displacements[i] + all_sizes[i]].reshape((-1,) + expectation_slice.contents.shape[1:]) for i in range(self.numtasks) ], axis=-1) + + # Add to list that manages multiple datasets + expectation = Histogram(self.event.axes, contents=epsilon, unit=self.event.unit) + # Serial + else: + expectation = expectation_slice # If single process, then full = slice + return expectation def calc_T_product(self, dataspace_histogram): @@ -292,19 +360,45 @@ def calc_T_product(self, dataspace_histogram): if dataspace_histogram.unit is not None: hist_unit *= dataspace_histogram.unit - hist = Histogram(self._model_axes_slice, unit = hist_unit) + hist_slice = Histogram(self._model_axes_slice, unit = hist_unit) if self._coordsys_conv_matrix is None: - hist[:] = np.tensordot(dataspace_histogram.contents, self._image_response_T.contents, axes = ([0,1,2], [2,3,4])) * self.model_axes['lb'].pixarea() + hist_slice[:] = np.tensordot(dataspace_histogram.contents, self._image_response_T.contents, axes = ([0,1,2], [2,3,4])) * self.model_axes['lb'].pixarea() # [Em, Phi, PsiChi] x [NuLambda (lb), Ei, Em, Phi, PsiChi] -> [NuLambda (lb), Ei] else: _ = np.tensordot(dataspace_histogram.contents, self._image_response_T.contents, axes = ([1,2,3], [2,3,4])) # [Time/ScAtt, Em, Phi, PsiChi] x [NuLambda, Ei, Em, Phi, PsiChi] -> [Time/ScAtt, NuLambda, Ei] - hist[:] = np.tensordot(self._coordsys_conv_matrix.contents, _, axes = ([0,2], [0,1])) \ + hist_slice[:] = np.tensordot(self._coordsys_conv_matrix.contents, _, axes = ([0,2], [0,1])) \ * _.unit * self._coordsys_conv_matrix.unit * self.model_axes['lb'].pixarea() # [Time/ScAtt, lb, NuLambda] x [Time/ScAtt, NuLambda, Ei] -> [lb, Ei] # note that coordsys_conv_matrix is sparse, so the unit should be recovered separately. + # Parallel + if self.parallel: + ''' + Synchronization Barrier 2 + ''' + + # Calculate all_sizes and displacements + all_sizes = [(self.avecol) * self.col_size] * (self.numtasks-1) + [(self.avecol + self.extra_cols) * self.col_size] + displacements = np.insert(np.cumsum(all_sizes), 0, 0)[0:-1] + + # Create a receive buffer to receive the gathered data + recvbuf = np.empty(int(np.sum(all_sizes)), dtype=np.float64) + + # Gather all arrays into recvbuf + self._comm.Allgatherv(hist_slice.contents.value.flatten(), [recvbuf, all_sizes, displacements, MPI.DOUBLE]) # For multiple MPI processes, full = [slice1, ... sliceN] + + # Reshape the received buffer back into the original 2D array shape + C = np.concatenate([ recvbuf[displacements[i]:displacements[i] + all_sizes[i]].reshape((-1,) + hist_slice.contents.shape[1:]) for i in range(self.numtasks) ], axis=0) + + # hist_slice (only slice operated on by current node) --> sum_T_product (all) + hist = Histogram(self.model_axes, contents=C, unit=hist_slice.unit) + + # Serial + else: + hist = hist_slice + return hist def calc_bkg_model_product(self, key, dataspace_histogram):