Skip to content

Commit

Permalink
Added comments. Removed stubs of RLparallel. Ready to rebase and crea…
Browse files Browse the repository at this point in the history
…te pull request
  • Loading branch information
avalluvan committed Dec 13, 2024
1 parent 14c34fd commit 860b5ce
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 16 deletions.
32 changes: 26 additions & 6 deletions cosipy/image_deconvolution/RichardsonLucy.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,33 @@ def __init__(self, initial_model:Histogram, dataset:list, mask, parameter, comm=
else:
os.makedirs(self.save_results_directory)

# Specific to parallel implementation:
# 1. Assume numproc is known by the process that invoked `run_deconvolution()`
# 2. All processes have loaded event data, background, and created
# initial model (from model properties) independently
self.parallel = False
if comm is not None:
self.comm = comm
self.parallel = True
if self.comm.Get_size() > 1:
self.parallel = True
logger.info('Image Deconvolution set to run in parallel mode')
# Specific to parallel implementation:
# 1. Assume numproc is known by the process that invoked `run_deconvolution()`
# 2. All processes have loaded event data, background, and initial model independently

def initialization(self):
"""
initialization before performing image deconvolution
"""

# Parallel
if self.parallel:
self.numtasks = self.comm.Get_size()
self.taskid = self.comm.Get_rank()

# clear results
# Master
if (not self.parallel) or (self.parallel and (self.taskid == MASTER)):
# Clear results
self.results.clear()

# All
# clear counter
self.iteration_count = 0

Expand All @@ -108,9 +111,10 @@ def initialization(self):
# calculate exposure map
self.summed_exposure_map = self.calc_summed_exposure_map()

# Parallel
if self.parallel:
'''
Synchronization Barrier 0
Synchronization Barrier 0 (performed only once)
'''
total_exposure_map = np.empty_like(self.summed_exposure_map, dtype=np.float64)

Expand All @@ -120,6 +124,7 @@ def initialization(self):
# Reshape the received buffer back into the original array shape
self.summed_exposure_map[:] = total_exposure_map

# All
# mask setting
if self.mask is None and np.any(self.summed_exposure_map.contents == 0):
self.mask = Histogram(self.model.axes, contents = self.summed_exposure_map.contents > 0)
Expand Down Expand Up @@ -148,13 +153,16 @@ def Estep(self):
E-step (but it will be skipped).
"""

# All
# expected count histograms
expectation_list_slice = self.calc_expectation_list(model = self.model, dict_bkg_norm = self.dict_bkg_norm)
logger.info("The expected count histograms were calculated with the initial model map.")

# Serial
if not self.parallel:
self.expectation_list = expectation_list_slice # If single process, then full = slice

# Parallel
elif self.parallel:
'''
Synchronization Barrier 1
Expand Down Expand Up @@ -203,14 +211,17 @@ def Mstep(self):
M-step in RL algorithm.
"""

# All
ratio_list = [ data.event / expectation for data, expectation in zip(self.dataset, self.expectation_list) ]

# delta model
C_slice = self.calc_summed_T_product(ratio_list)

# Serial
if not self.parallel:
sum_T_product = C_slice

# Parallel
elif self.parallel:
'''
Synchronization Barrier 2
Expand All @@ -231,6 +242,7 @@ def Mstep(self):
# Gather all arrays into recvbuf
self.comm.Gatherv(C_slice.contents.value.flatten(), [recvbuf, all_sizes, displacements, MPI.DOUBLE]) # For multiple MPI processes, full = [slice1, ... sliceN]

# Master
if self.taskid == MASTER:
# Reshape the received buffer back into the original 2D array shape
C = np.concatenate([ recvbuf[displacements[i]:displacements[i] + all_sizes[i]].reshape((-1,) + C_slice.contents.shape[1:]) for i in range(self.numtasks) ], axis=0)
Expand All @@ -250,12 +262,14 @@ def Mstep(self):
# C_slice (only slice operated on by current node) --> sum_T_product (all )
sum_T_product = Histogram(Axes(axes), contents=C, unit=C_slice.unit) # TODO: Could maybe be simplified using Histogram.slice[]

# Master
if (not self.parallel) or ((self.parallel) and (self.taskid == MASTER)):
self.delta_model = self.model * (sum_T_product/self.summed_exposure_map - 1)

if self.mask is not None:
self.delta_model = self.delta_model.mask_pixels(self.mask)

# All
# background normalization optimization
if self.do_bkg_norm_optimization:
for key in self.dict_bkg_norm.keys():
Expand Down Expand Up @@ -286,6 +300,7 @@ def post_processing(self):
- acceleration of RL algirithm: the normalization of delta map is increased as long as the updated image has no non-negative components.
"""

# Master
if (not self.parallel) or ((self.parallel) and (self.taskid == MASTER)):
self.processed_delta_model = copy.deepcopy(self.delta_model)

Expand All @@ -310,6 +325,7 @@ def post_processing(self):
self.loglikelihood_list = self.calc_loglikelihood_list(self.expectation_list)
logger.debug("The loglikelihood list was updated with the new expected count histograms.")

# Parallel
if self.parallel:
'''
Synchronization Barrier 3
Expand Down Expand Up @@ -348,6 +364,7 @@ def register_result(self):
- loglikelihood: log-likelihood
"""

# Master
if (not self.parallel) or ((self.parallel) and (self.taskid == MASTER)):
this_result = {"iteration": self.iteration_count,
"model": copy.deepcopy(self.model),
Expand All @@ -374,6 +391,7 @@ def check_stopping_criteria(self):
bool
"""

# All
if self.iteration_count < self.iteration_max:
return False
return True
Expand All @@ -383,6 +401,7 @@ def finalization(self):
finalization after running the image deconvolution
"""

# Master
if (not self.parallel) or ((self.parallel) and (self.taskid == MASTER)):
if self.save_results == True:
logger.info('Saving results in {self.save_results_directory}')
Expand Down Expand Up @@ -427,6 +446,7 @@ def calc_alpha(self, delta_model, model):
Acceleration parameter
"""

# Master: Only invoked by master process
diff = -1 * (model / delta_model).contents

diff[(diff <= 0) | (delta_model.contents == 0)] = np.inf
Expand Down
28 changes: 19 additions & 9 deletions cosipy/image_deconvolution/dataIFWithParallelSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,23 @@ class DataIFWithParallel(ImageDeconvolutionDataInterfaceBase):

def __init__(self, event_filename, bkg_filename, drm_filename, name = None, comm = None):

ImageDeconvolutionDataInterfaceBase.__init__(self, name)

self._MPI_init(event_filename, bkg_filename, drm_filename, comm)

# None if using Galactic CDS, required if using local CDS
self._coordsys_conv_matrix = None

# Calculate exposure map
self._calc_exposure_map()

def _MPI_load_data(self, event_filename, bkg_filename, drm_filename, comm):

numtasks = comm.Get_size()
taskid = comm.Get_rank()

print(f'TaskID = {taskid}, Number of tasks = {numtasks}')

ImageDeconvolutionDataInterfaceBase.__init__(self, name)

# Calculate the indices in Rij that the process has to parse. My hunch is that calculating these scalars individually will be faster than the MPI send broadcast overhead.
self.averow = NUMROWS // numtasks
self.extra_rows = NUMROWS % numtasks
Expand All @@ -138,6 +148,8 @@ def __init__(self, event_filename, bkg_filename, drm_filename, name = None, comm
self._image_response = load_response_matrix(comm, self.start_col, self.end_col, filename=drm_filename)
self._image_response_T = load_response_matrix_transpose(comm, self.start_row, self.end_row, filename=drm_filename)

def _MPI_set_aux_data(self):

# Set variable _model_axes
# Derived from Parent class (ImageDeconvolutionDataInterfaceBase)
axes = [self._image_response.axes['NuLambda'], self._image_response.axes['Ei']]
Expand Down Expand Up @@ -175,18 +187,16 @@ def __init__(self, event_filename, bkg_filename, drm_filename, name = None, comm
## Create bkg_model_slice Histogram
self._bkg_models_slice = {}
for key in self._bkg_models:
# if self._bkg_models[key].is_sparse:
# self._bkg_models[key] = self._bkg_models[key].to_dense()
bkg_model = self._bkg_models[key]
self._summed_bkg_models[key] = np.sum(bkg_model)
self._bkg_models_slice[key] = bkg_model.slice[:, :, self.start_col:self.end_col]

# None if using Galactic CDS, required if using local CDS
self._coordsys_conv_matrix = None
def _MPI_init(self, event_filename, bkg_filename, drm_filename, comm):

self._MPI_load_data(event_filename, bkg_filename, drm_filename, comm)

self._MPI_set_aux_data()

# Calculate exposure map
self._calc_exposure_map()

def _calc_exposure_map(self):
"""
Calculate exposure_map, which is an intermidiate matrix used in RL algorithm.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ model_definition:
unit: "cm-2 s-1 sr-1" # do not change it as for now

deconvolution:
algorithm: "RLparallel" # Choose from RL, RLsimple and RLparallel
algorithm: "RL" # Choose from RL and RLsimple

parameter:
iteration_max: 50
Expand Down

0 comments on commit 860b5ce

Please sign in to comment.