Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Richardson Lucy Parallelization V2 #274

Open
wants to merge 50 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
79109f3
Added new option "RLparallel" to ImageDeconvolution.deconvolution_alg…
avalluvan Aug 5, 2024
c918ab5
Renamed RLparallel.py to RichardsonLucyParallel.py and adapted code s…
avalluvan Aug 5, 2024
015f648
Installed mpi4py to cosipy venv and added it to skeleton
avalluvan Aug 5, 2024
f259d36
Created subprocess call. Ported MPI to separate script RLparallelscri…
avalluvan Aug 12, 2024
4cde4dd
Added code to register results if save_results_flag is set to True
avalluvan Aug 13, 2024
e98fb79
End-to-end working script version commit
avalluvan Aug 14, 2024
cd45e44
Unclear what files are causing conflicts
avalluvan Aug 14, 2024
ef279f4
Add modified files and new RL parallel script
avalluvan Aug 27, 2024
871c9a8
Another tiny change
avalluvan Aug 27, 2024
27c8afe
Modified configuration file for RLparallel
avalluvan Aug 29, 2024
f079988
Merge branch 'cositools:develop' into develop
avalluvan Oct 18, 2024
f044589
git fetch changes unclear. Performing a commit to avoid data loss.
avalluvan Oct 18, 2024
3036a7a
This merge may result in data losses. Copies of files have been creat…
avalluvan Oct 18, 2024
35b8293
Create LMDR.ipynb and add response creation attempt with h5py.
avalluvan Oct 24, 2024
d04f3c3
Created general code for multidimensional interpolation
avalluvan Oct 25, 2024
ac47f23
Attempted listmode response on the fly generation
avalluvan Oct 30, 2024
c257676
Add ListModeResponse.py
avalluvan Nov 3, 2024
7de27fd
Used histpy functions to dramatically simplify the codebase
avalluvan Nov 6, 2024
4c23188
Modified code to accept len(dr.axes) or len(dr.axes)-1 number of inputs.
avalluvan Nov 7, 2024
71085fd
Added comments to get_point_source_response()
avalluvan Nov 9, 2024
dc35034
Tested new interpolation scheme on LMDR.ipynb
avalluvan Nov 11, 2024
f8bbaa4
Adhoc fix for interpolated psr calculation with deepcopy
avalluvan Nov 11, 2024
cdcb9b0
Simplified __getitem__ and get_point_source_response()
avalluvan Nov 11, 2024
aaff648
Ported ListModeResponse interpolation functionalities to DetectorResp…
avalluvan Nov 11, 2024
caa2d44
Removed ListModeResponse.py. All developed features incorporated in D…
avalluvan Nov 11, 2024
2c80caa
Merge pull request #1 from avalluvan/feature/general_response
avalluvan Nov 11, 2024
dafd4f8
Added code to compress empty axes, i.e., those with axis shape = (1,)
avalluvan Nov 21, 2024
de31c0a
Create new file RichardsonLucyWithParallelSupport.py. Stash changes t…
avalluvan Dec 10, 2024
ebdf1ca
Skeleton for RLWithParellelSupport.py. Need to update MPI.DOUBLE with…
avalluvan Dec 10, 2024
9aeba42
Looks like RLWithParallelSupport.py is complete. Need to create data …
avalluvan Dec 10, 2024
b1d61a4
Everything up to run_deconvolution works with new data interface WPS
avalluvan Dec 11, 2024
61afc80
Dry run with single MPI process works with RichardsonLucy and dataIFW…
avalluvan Dec 11, 2024
75b0b81
RichardsonLucyWPS.py works with both dataIF_DC2 and dataIFWPS for sin…
avalluvan Dec 11, 2024
5d5eb3e
Working set up for small numproc MPI runs
avalluvan Dec 12, 2024
59f9ce8
Syntactically correct. Algorithm has some discrepancies
avalluvan Dec 12, 2024
406cf9a
Archive old version of RLparallelscript.py and RichardsonLucyParallel.py
avalluvan Dec 12, 2024
2eb7e25
Single and multi process produce same, exact outputs!
avalluvan Dec 13, 2024
1cd0d2c
Replaced old RichardsonLucy.py with new RichardsonLucy.py that now su…
avalluvan Dec 13, 2024
1f4eeed
Minor bug in image_deconvolution now that RichardsonLucyParallel.py h…
avalluvan Dec 13, 2024
14c34fd
Removed unnecessary imports in new RichardsonLucy.py
avalluvan Dec 13, 2024
860b5ce
Added comments. Removed stubs of RLparallel. Ready to rebase and crea…
avalluvan Dec 13, 2024
63527db
Migrated RichardsonLucy slicing and message passing to DataIFWithPara…
avalluvan Dec 15, 2024
82fc4f8
Merge branch 'develop' into feature/RLparallel
avalluvan Dec 15, 2024
c085c17
Bug fix in reshaping and concatenating epsilon and C slices
avalluvan Dec 15, 2024
279f6f2
Replacing superfluous edits with files from cositools/develop
avalluvan Dec 16, 2024
04eb674
Undo changes to dataIF_COSI_DC2.py and allskyimage.py
avalluvan Dec 16, 2024
b99ff6a
Undoing changes to RichardsonLucy.py Estep structure
avalluvan Dec 16, 2024
cddaa24
Incorporated review suggestions about placement of parallel_computati…
avalluvan Dec 16, 2024
7a2f65c
Rename dataIFWithParallel.py to dataIF_Parallel.py. Undo changes to i…
avalluvan Dec 16, 2024
17aae47
Calculate parallel and master_node flags in RLparallelscript.py
avalluvan Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions cosipy/image_deconvolution/RLparallelscript.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from pathlib import Path

import logging
logger = logging.getLogger(__name__)

from mpi4py import MPI
from histpy import Histogram

from cosipy.response import FullDetectorResponse
from cosipy.image_deconvolution import ImageDeconvolution, DataIFWithParallel, DataIF_COSI_DC2

# Define MPI variables
MASTER = 0 # Indicates master process
DRM_DIR = Path('/Users/penguin/Documents/Grad School/Research/COSI/COSIpy/docs/tutorials/data')
DATA_DIR = Path('/Users/penguin/Documents/Grad School/Research/COSI/COSIpy/docs/tutorials/image_deconvolution/511keV/GalacticCDS')

def main():
'''
Main script to create a parallel execution-compatible
dataset using DataIFWithParallel and call ImageDeconvolution
'''

# Set up MPI
comm = MPI.COMM_WORLD

# Create dataset
dataset = DataIFWithParallel(name = '511keV',
event_filename = '511keV_dc2_galactic_event.hdf5',
bkg_filename = '511keV_dc2_galactic_bkg.hdf5',
drm_filename = 'psr_gal_511_DC2.h5',
comm = comm) # Convert dataset to a list of datasets before passing to RichardsonLucy class

# bkg = Histogram.open(DATA_DIR / '511keV_dc2_galactic_bkg.hdf5')
# event = Histogram.open(DATA_DIR / '511keV_dc2_galactic_event.hdf5')
# image_response = Histogram.open(DRM_DIR / 'psr_gal_511_DC2.h5')
# dataset = DataIF_COSI_DC2.load(name = "511keV", # Create a dataset compatible with ImageDeconvolution: name (unique identifier), event data, background model, response, coordinate system conversion matrix (if detector response is not in galactic coordinates)
# event_binned_data = event.project(['Em', 'Phi', 'PsiChi']),
# dict_bkg_binned_data = {"total": bkg.project(['Em', 'Phi', 'PsiChi'])},
# rsp = image_response)

# Create image deconvolution object
image_deconvolution = ImageDeconvolution()

# set data_interface to image_deconvolution
image_deconvolution.set_dataset([dataset])

# set a parameter file for the image deconvolution
parameter_filepath = DATA_DIR / 'imagedeconvolution_parfile_gal_511keV.yml'
image_deconvolution.read_parameterfile(parameter_filepath)

# Initialize model
image_deconvolution.initialize(comm=comm)

# Execute deconvolution
image_deconvolution.run_deconvolution()

# MPI Shutdown
MPI.Finalize()

if __name__ == "__main__":
main()
142 changes: 85 additions & 57 deletions cosipy/image_deconvolution/RichardsonLucy.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import os
import copy
import numpy as np
import astropy.units as u
import astropy.io.fits as fits
import logging

# logging setup
logger = logging.getLogger(__name__)

# Import third party libraries
import numpy as np
import pandas as pd
import astropy.units as u
from astropy.io import fits
from histpy import Histogram

from .deconvolution_algorithm_base import DeconvolutionAlgorithmBase
Expand Down Expand Up @@ -36,7 +40,7 @@ class RichardsonLucy(DeconvolutionAlgorithmBase):

"""

def __init__(self, initial_model, dataset, mask, parameter):
def __init__(self, initial_model:Histogram, dataset:list, mask, parameter, comm=None):

DeconvolutionAlgorithmBase.__init__(self, initial_model, dataset, mask, parameter)

Expand Down Expand Up @@ -66,16 +70,26 @@ def __init__(self, initial_model, dataset, mask, parameter):
else:
os.makedirs(self.save_results_directory)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that RL needs to know if it is performed on the master node and needs this kind of parameter. I would suggest preparing two parameters alternatively, something like

  • self.parallel_computation = True / False
  • self.master_node = True / False

I want to prepare a parameter that explicitly tells if the computation is in parallel or not. I will add some suggestions regarding these changes at other lines.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the ideas we discussed in a previous meeting was to let the program directly infer if it was being run in serial or parallel mode. In fact, the suggested flag variables were what I used in the initial V2 pull request code. Do you recommend making this modification, i.e, inferring self.parallel_computation in image_deconvolution.py or in RichardsonLucy.py. The issue with inferring this in the image deconvolution class is - what happens when we have multiple input datasets? --> [dataset1, dataset2, ...], each dataset will have its own "sub_comm" object.

if comm is None:
self.MASTER = True
elif comm.Get_rank() == 0:
self.MASTER = True
else:
self.MASTER = False

def initialization(self):
"""
initialization before running the image deconvolution
initialization before performing image deconvolution
"""

# Master
if self.MASTER:
# Clear results
self.results.clear()

# clear counter
self.iteration_count = 0

# clear results
self.results.clear()

# copy model
self.model = copy.deepcopy(self.initial_model)

Expand All @@ -93,10 +107,6 @@ def initialization(self):
self.response_weighting_filter = (self.summed_exposure_map.contents / np.max(self.summed_exposure_map.contents))**self.response_weighting_index
logger.info("The response weighting filter was calculated.")

# expected count histograms
self.expectation_list = self.calc_expectation_list(model = self.initial_model, dict_bkg_norm = self.dict_bkg_norm)
logger.info("The expected count histograms were calculated with the initial model map.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to keep these lines? To use the updated model for the likelihood calculation, I wanted to perform the expected count calculation at the post-processing and the initialization step and skip it in the Estep.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can undo these changes. Do you plan on moving this to Estep() in the future / removing Estep() altogether?

# calculate summed background models for M-step
if self.do_bkg_norm_optimization:
self.dict_summed_bkg_model = {}
Expand All @@ -112,9 +122,14 @@ def pre_processing(self):
def Estep(self):
"""
E-step (but it will be skipped).
Note that self.expectation_list is updated in self.post_processing().
"""
pass

# expected count histograms
self.expectation_list = self.calc_expectation_list(model = self.model, dict_bkg_norm = self.dict_bkg_norm)
logger.info("The expected count histograms were calculated with the initial model map.")

# At the end of this function, all processes should have a complete `self.expectation_list`
# to proceed to the Mstep function
avalluvan marked this conversation as resolved.
Show resolved Hide resolved

def Mstep(self):
"""
Expand All @@ -129,7 +144,7 @@ def Mstep(self):

if self.mask is not None:
self.delta_model = self.delta_model.mask_pixels(self.mask)

# background normalization optimization
if self.do_bkg_norm_optimization:
for key in self.dict_bkg_norm.keys():
Expand All @@ -146,6 +161,11 @@ def Mstep(self):

self.dict_bkg_norm[key] = bkg_norm

# At the end of this function, all the nodes will have a full
# copy of delta_model. Although this is not necessary, this is
# the easiest way without editing RichardsonLucy.py.
# The same applies for self.dict_bkg_norm

def post_processing(self):
"""
Here three processes will be performed.
Expand All @@ -154,6 +174,7 @@ def post_processing(self):
- acceleration of RL algirithm: the normalization of delta map is increased as long as the updated image has no non-negative components.
"""

# All
self.processed_delta_model = copy.deepcopy(self.delta_model)

if self.do_response_weighting:
Expand All @@ -172,15 +193,15 @@ def post_processing(self):

if self.mask is not None:
self.model = self.model.mask_pixels(self.mask)

# update expectation_list
self.expectation_list = self.calc_expectation_list(self.model, dict_bkg_norm = self.dict_bkg_norm)
logger.debug("The expected count histograms were updated with the new model map.")
avalluvan marked this conversation as resolved.
Show resolved Hide resolved

# update loglikelihood_list
self.loglikelihood_list = self.calc_loglikelihood_list(self.expectation_list)
logger.debug("The loglikelihood list was updated with the new expected count histograms.")

# At the end of this function, all MPI processes needs to have a full
# copy of updated model. They calculate it from delta_model (which is
# distributed by MPI.Bcast) independently

def register_result(self):
"""
The values below are stored at the end of each iteration.
Expand All @@ -193,21 +214,23 @@ def register_result(self):
- loglikelihood: log-likelihood
"""

this_result = {"iteration": self.iteration_count,
"model": copy.deepcopy(self.model),
"delta_model": copy.deepcopy(self.delta_model),
"processed_delta_model": copy.deepcopy(self.processed_delta_model),
"background_normalization": copy.deepcopy(self.dict_bkg_norm),
"alpha": self.alpha,
"loglikelihood": copy.deepcopy(self.loglikelihood_list)}

# show intermediate results
logger.info(f' alpha: {this_result["alpha"]}')
logger.info(f' background_normalization: {this_result["background_normalization"]}')
logger.info(f' loglikelihood: {this_result["loglikelihood"]}')
# Master
if self.MASTER:
this_result = {"iteration": self.iteration_count,
"model": copy.deepcopy(self.model),
"delta_model": copy.deepcopy(self.delta_model),
"processed_delta_model": copy.deepcopy(self.processed_delta_model),
"background_normalization": copy.deepcopy(self.dict_bkg_norm),
"alpha": self.alpha,
"loglikelihood": copy.deepcopy(self.loglikelihood_list)}

# show intermediate results
logger.info(f' alpha: {this_result["alpha"]}')
logger.info(f' background_normalization: {this_result["background_normalization"]}')
logger.info(f' loglikelihood: {this_result["loglikelihood"]}')
avalluvan marked this conversation as resolved.
Show resolved Hide resolved

# register this_result in self.results
self.results.append(this_result)
# register this_result in self.results
self.results.append(this_result)

def check_stopping_criteria(self):
"""
Expand All @@ -217,6 +240,7 @@ def check_stopping_criteria(self):
-------
bool
"""

if self.iteration_count < self.iteration_max:
return False
return True
Expand All @@ -225,38 +249,41 @@ def finalization(self):
"""
finalization after running the image deconvolution
"""
avalluvan marked this conversation as resolved.
Show resolved Hide resolved
if self.save_results == True:
logger.info('Saving results in {self.save_results_directory}')

# model
for this_result in self.results:
iteration_count = this_result["iteration"]
# Master
if self.MASTER:
if self.save_results == True:
logger.info('Saving results in {self.save_results_directory}')

# model
for this_result in self.results:
iteration_count = this_result["iteration"]

this_result["model"].write(f"{self.save_results_directory}/model_itr{iteration_count}.hdf5", overwrite = True)
this_result["delta_model"].write(f"{self.save_results_directory}/delta_model_itr{iteration_count}.hdf5", overwrite = True)
this_result["processed_delta_model"].write(f"{self.save_results_directory}/processed_delta_model_itr{iteration_count}.hdf5", overwrite = True)
this_result["model"].write(f"{self.save_results_directory}/model_itr{iteration_count}_2.hdf5", overwrite = True)
this_result["delta_model"].write(f"{self.save_results_directory}/delta_model_itr{iteration_count}_2.hdf5", overwrite = True)
this_result["processed_delta_model"].write(f"{self.save_results_directory}/processed_delta_model_itr{iteration_count}_2.hdf5", overwrite = True)

#fits
primary_hdu = fits.PrimaryHDU()
#fits
primary_hdu = fits.PrimaryHDU()

col_iteration = fits.Column(name='iteration', array=[float(result['iteration']) for result in self.results], format='K')
col_alpha = fits.Column(name='alpha', array=[float(result['alpha']) for result in self.results], format='D')
cols_bkg_norm = [fits.Column(name=key, array=[float(result['background_normalization'][key]) for result in self.results], format='D')
for key in self.dict_bkg_norm.keys()]
cols_loglikelihood = [fits.Column(name=f"{self.dataset[i].name}", array=[float(result['loglikelihood'][i]) for result in self.results], format='D')
for i in range(len(self.dataset))]
col_iteration = fits.Column(name='iteration', array=[float(result['iteration']) for result in self.results], format='K')
col_alpha = fits.Column(name='alpha', array=[float(result['alpha']) for result in self.results], format='D')
cols_bkg_norm = [fits.Column(name=key, array=[float(result['background_normalization'][key]) for result in self.results], format='D')
for key in self.dict_bkg_norm.keys()]
cols_loglikelihood = [fits.Column(name=f"{self.dataset[i].name}", array=[float(result['loglikelihood'][i]) for result in self.results], format='D')
for i in range(len(self.dataset))]

table_alpha = fits.BinTableHDU.from_columns([col_iteration, col_alpha])
table_alpha.name = "alpha"
table_alpha = fits.BinTableHDU.from_columns([col_iteration, col_alpha])
table_alpha.name = "alpha"

table_bkg_norm = fits.BinTableHDU.from_columns([col_iteration] + cols_bkg_norm)
table_bkg_norm.name = "bkg_norm"
table_bkg_norm = fits.BinTableHDU.from_columns([col_iteration] + cols_bkg_norm)
table_bkg_norm.name = "bkg_norm"

table_loglikelihood = fits.BinTableHDU.from_columns([col_iteration] + cols_loglikelihood)
table_loglikelihood.name = "loglikelihood"
table_loglikelihood = fits.BinTableHDU.from_columns([col_iteration] + cols_loglikelihood)
table_loglikelihood.name = "loglikelihood"

hdul = fits.HDUList([primary_hdu, table_alpha, table_bkg_norm, table_loglikelihood])
hdul.writeto(f'{self.save_results_directory}/results.fits', overwrite=True)
hdul = fits.HDUList([primary_hdu, table_alpha, table_bkg_norm, table_loglikelihood])
hdul.writeto(f'{self.save_results_directory}/results.fits', overwrite=True)

def calc_alpha(self, delta_model, model):
"""
Expand All @@ -267,6 +294,7 @@ def calc_alpha(self, delta_model, model):
float
Acceleration parameter
"""

diff = -1 * (model / delta_model).contents

diff[(diff <= 0) | (delta_model.contents == 0)] = np.inf
Expand Down
1 change: 1 addition & 0 deletions cosipy/image_deconvolution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .image_deconvolution_data_interface_base import ImageDeconvolutionDataInterfaceBase
from .dataIF_COSI_DC2 import DataIF_COSI_DC2
from .dataIFWithParallelSupport import DataIFWithParallel

from .model_base import ModelBase
from .allskyimage import AllSkyImageModel
Expand Down
4 changes: 3 additions & 1 deletion cosipy/image_deconvolution/allskyimage.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ def set_values_from_parameters(self, parameter):
if algorithm_name == "flat":
unit = u.Unit(algorithm_parameter['unit'])
for idx, value in enumerate(algorithm_parameter['value']):
self[:,idx] = value * unit
self[:,idx] = value * unit # TODO: Why is this indexed as [:, idx] instead of [idx, :]?
avalluvan marked this conversation as resolved.
Show resolved Hide resolved
# elif algorithm_name == ...
# ...
else:
raise ValueError(f'Model algorithm {algorithm_name} not supported')

def set_values_from_extendedmodel(self, extendedmodel):
"""
Expand Down
Loading
Loading