diff --git a/cosipy/image_deconvolution/RLparallelscript.py b/cosipy/image_deconvolution/RLparallelscript.py new file mode 100644 index 00000000..1277dbb4 --- /dev/null +++ b/cosipy/image_deconvolution/RLparallelscript.py @@ -0,0 +1,375 @@ +import os +from pathlib import Path +import logging +import argparse + +# logging setup +logger = logging.getLogger(__name__) + +# argparse setup +parser = argparse.ArgumentParser() +parser.add_argument("--numrows", type=int, dest='numrows', help="Number of rows in the response matrix") +parser.add_argument("--numcols", type=int, dest='numcols', help="Number of columns in the response matrix") +parser.add_argument("--base_dir", type=str, dest='base_dir', help="Current working directory and where configuration file is assumed to lie") +parser.add_argument("--config_file", type=str, dest='config_file', help="Name of configuration file (assumed to lie in CWD)") +args = parser.parse_args() + +# Import third party libraries +import numpy as np +from mpi4py import MPI +import h5py +from yayc import Configurator + +# Load configuration file +config = Configurator.open(f'{args.base_dir}/{args.config_file}') + +# Number of elements in data space (ROWS) and model space (COLS) +NUMROWS = args.numrows # TODO: Ideally, for row-major form to exploit caching, NUMROWS must be smaller than NUMCOLS +NUMCOLS = args.numcols + +# Define MPI and iteration misc variables +MASTER = 0 # Indicates master process +MAXITER = config.get('deconvolution:parameter:iteration_max', 10) + +# FILE_DIR = Path(os.path.dirname(os.path.abspath(__file__))) +BASE_DIR = Path(args.base_dir) +RESULTS_DIR = BASE_DIR / config.get('deconvolution:parameter:save_results_directory', './results') + +''' +Response matrix +''' +def load_response_matrix(comm, start_row, end_row, filename='response_matrix.h5'): + with h5py.File(BASE_DIR / filename, "r", driver="mpio", comm=comm) as f1: + dataset = f1["response_matrix"] + R = dataset[start_row:end_row, :] + return R + +''' +Response matrix transpose +''' +def load_response_matrix_transpose(comm, start_col, end_col, filename='response_matrix.h5'): + with h5py.File(BASE_DIR / filename, "r", driver="mpio", comm=comm) as f1: + dataset = f1["response_matrix"] + RT = dataset[:, start_col:end_col] + return RT + +''' +Response matrix summed along axis=i +''' +def load_axis0_summed_response_matrix(filename='response_matrix.h5'): + with h5py.File(BASE_DIR / filename, "r") as f1: + dataset = f1["response_vector"] + Rj = dataset[:] + return Rj + +''' +Sky model +''' +def initial_sky_model(model_init_val=[1e-4]): + M0 = np.ones(NUMCOLS, dtype=np.float64) * float(model_init_val[0]) # Initial guess according to image_deconvolution.py. TODO: Make this more general than element 0 + return M0 + +''' +Background model +''' +def load_bg_model(filename='bg.csv'): + bg = np.loadtxt(filename) + return bg + +''' +Observed data +''' +def load_event_data(filename='event.csv'): + event = np.loadtxt(filename) + return event + +def register_result(iter, M, delta): + """ + The values below are stored at the end of each iteration. + - iteration: iteration number + - model: updated image + - delta_model: delta map after M-step + - processed_delta_model: delta map after post-processing + - alpha: acceleration parameter in RL algirithm + - background_normalization: optimized background normalization + - loglikelihood: log-likelihood + """ + + this_result = {"iteration": iter, + "model": M, + "delta_model": delta, + # "processed_delta_model": copy.deepcopy(self.processed_delta_model), TODO: The RL parallel implementation does not currently support smooth convergence through weighting, background normalization, or likelihood calculation + # "background_normalization": copy.deepcopy(self.dict_bkg_norm), + # "alpha": self.alpha, + # "loglikelihood": copy.deepcopy(self.loglikelihood_list) + } + + # # show intermediate results + # logger.info(f' alpha: {this_result["alpha"]}') + # logger.info(f' background_normalization: {this_result["background_normalization"]}') + # logger.info(f' loglikelihood: {this_result["loglikelihood"]}') + + return this_result + +def save_results(results): + ''' + NOTE: Copied from RichardsonLucy.py + ''' + logger.info('Saving results in {RESULTS_DIR}') + # model + for this_result in results: + iteration_count = this_result["iteration"] + # this_result["model"].write(f"{RESULTS_DIR}/model_itr{iteration_count}.hdf5", overwrite = True) # TODO: numpy arrays do not support write_to_hdf5 as a method. Need to ensure rest of code is modified to support cosipy.image_deconvolution.allskyimage.AllSkyImageModel + # this_result["delta_model"].write(f"{RESULTS_DIR}/delta_model_itr{iteration_count}.hdf5", overwrite = True) + # this_result["processed_delta_model"].write(f"{RESULTS_DIR}/processed_delta_model_itr{iteration_count}.hdf5", overwrite = True) TODO: processed_delta_model here is not different from delta_model + np.savetxt(f'{RESULTS_DIR}/model_itr{iteration_count}.csv', this_result['model'], delimiter=',') + np.savetxt(f'{RESULTS_DIR}/delta_model_itr{iteration_count}.csv', this_result['delta_model'], delimiter=',') + + # TODO: The following will be enabled once the respective calculations are incorporated + # #fits + # primary_hdu = fits.PrimaryHDU() + + # col_iteration = fits.Column(name='iteration', array=[float(result['iteration']) for result in self.results], format='K') + # col_alpha = fits.Column(name='alpha', array=[float(result['alpha']) for result in self.results], format='D') + # cols_bkg_norm = [fits.Column(name=key, array=[float(result['background_normalization'][key]) for result in self.results], format='D') + # for key in self.dict_bkg_norm.keys()] + # cols_loglikelihood = [fits.Column(name=f"{self.dataset[i].name}", array=[float(result['loglikelihood'][i]) for result in self.results], format='D') + # for i in range(len(self.dataset))] + + # table_alpha = fits.BinTableHDU.from_columns([col_iteration, col_alpha]) + # table_alpha.name = "alpha" + + # table_bkg_norm = fits.BinTableHDU.from_columns([col_iteration] + cols_bkg_norm) + # table_bkg_norm.name = "bkg_norm" + + # table_loglikelihood = fits.BinTableHDU.from_columns([col_iteration] + cols_loglikelihood) + # table_loglikelihood.name = "loglikelihood" + + # hdul = fits.HDUList([primary_hdu, table_alpha, table_bkg_norm, table_loglikelihood]) + # hdul.writeto(f'{RESULTS_DIR}/results.fits', overwrite=True) + +def main(): + # Set up MPI + comm = MPI.COMM_WORLD + numtasks = comm.Get_size() + taskid = comm.Get_rank() + + # Calculate the indices in Rij that the process has to parse. My hunch is that calculating these scalars individually will be faster than the MPI send broadcast overhead. + averow = NUMROWS // numtasks + extra_rows = NUMROWS % numtasks + start_row = taskid * averow + end_row = (taskid + 1) * averow if taskid < (numtasks - 1) else NUMROWS + + # Calculate the indices in Rji, i.e., Rij transpose, that the process has to parse. + avecol = NUMCOLS // numtasks + extra_cols = NUMCOLS % numtasks + start_col = taskid * avecol + end_col = (taskid + 1) * avecol if taskid < (numtasks - 1) else NUMCOLS + + # Initialise vectors required by all processes + epsilon = np.zeros(NUMROWS) # All gatherv-ed. Explicit variable declaration. + epsilon_fudge = 1e-12 # To prevent divide-by-zero and underflow errors. Value taken from `almost_zero = 1e-12` in dataIF_COSI_DC2.py + + # Initialise epsilon_slice and C_slice. Explicit variable declarations. + epsilon_slice = np.zeros(end_row - start_row) + C_slice = np.zeros(end_col - start_col) + + # Load R and RT into memory (single time if response matrix doesn't + # change with time) + R = load_response_matrix(comm, start_row, end_row) + RT = load_response_matrix_transpose(comm, start_col, end_col) + + # Loaded and broadcasted by master. + M = np.empty(NUMCOLS, dtype=np.float64) + d = np.empty(NUMROWS, dtype=np.float64) + bg = np.zeros(NUMROWS) + +# ****************************** MPI ****************************** + +# **************************** Part I ***************************** + + '''*************** Master ***************''' + + if taskid == MASTER: + + # Pretty print definitions + linebreak_stars = '**********************' + linebreak_dashes = '----------------------' + + # Log input information (Only master node does this) + save_results_flag = config.get('deconvolution:parameter:save_results', False) # Extract from config file + logger.info(linebreak_stars) + logger.info(f'Number of elements in data space: {NUMROWS}') + logger.info(f'Number of elements in model space: {NUMCOLS}') + logger.info(f'Base directory: {BASE_DIR}') + if save_results_flag == True: + logger.info(f'Results directory (if save_results flag is set to True): {RESULTS_DIR}') + logger.info(f'Configuration filename: {args.config_file}') + logger.info(f'Master node: {MASTER}') + logger.info(f'Maximum number of RL iterations: {MAXITER}') + + # Load Rj vector (response matrix summed along axis=i) + Rj = load_axis0_summed_response_matrix() + + # Generate initial sky model from configuration file + M = initial_sky_model(model_init_val=config.get('model_definition:initialization:parameter:value', [1e-4])) + + # Load event data and background model (intermediate files created in RichardsonLucyParallel.py) + bg = load_bg_model() + d = load_event_data() + + # Sanity check: print d + print() + print('Observed data-space d vector:') + print(d) + ## Pretty print + print() + print(linebreak_stars) + + # Initialise C vector. Only master requires full length. Explicit variable declaration. + C = np.empty(NUMCOLS, dtype=np.float64) + + # Initialise update delta vector. Explicit variable declaration. + delta = np.empty(NUMCOLS, dtype=np.float64) + + # Initialise list for results. See function register_result() for list elements. + results = [] + + '''*************** Worker ***************''' + + if taskid > MASTER: + # Only separate if... clause for NON-MASTER processes. + # Initialise C vector to None. Only master requires full length. + C = None + + # Broadcast d vector + comm.Bcast([d, MPI.DOUBLE], root=MASTER) + + # Scatter bg vector to epsilon_BG + comm.Bcast([bg, MPI.DOUBLE], root=MASTER) + # comm.Scatter(bg, [epsilon_BG, recvcounts, displacements, MPI.DOUBLE]) + + # print(f"TaskID {taskid}, gathered broadcast") + + # Sanity check: print epsilon + # if taskid == MASTER: + # print('epsilon_BG') + # print(bg) + # print() + +# **************************** Part IIa ***************************** + + '''***************** Begin Iterative Segment *****************''' + # Set up initial values for iterating variables. + # Exit if: + ## 1. Max iterations are reached + # for iter in tqdm(range(MAXITER)): + for iter in range(MAXITER): + + '''*************** Master ***************''' + if taskid == MASTER: + # Pretty print - starting + print(f"Starting iteration {iter + 1}") + # logger.info(f"## Iteration {self.iteration_count}/{self.iteration_max} ##") + # logger.info("<< E-step >>") + + + # Calculate epsilon vector and all gatherv + + '''**************** All *****************''' + + '''Synchronization Barrier 1''' + # Broadcast M vector + comm.Bcast([M, MPI.DOUBLE], root=MASTER) + + # Calculate epsilon slice + epsilon_BG = bg[start_row:end_row] # TODO: Change the way epsilon_BG is loaded. Make it taskID dependent through MPI.Scatter for example. Use `recvcounts` + epsilon_slice = np.dot(R, M) + epsilon_BG + epsilon_fudge # TODO: For a more general implementation, see calc_expectation() in dataIF_COSI_DC2.py + + '''Synchronization Barrier 2''' + # All vector gather epsilon slices + recvcounts = [averow] * (numtasks-1) + [averow + extra_rows] + displacements = np.arange(numtasks) * averow + comm.Allgatherv(epsilon_slice, [epsilon, recvcounts, displacements, MPI.DOUBLE]) + + # Sanity check: print epsilon + # if taskid == MASTER: + # print('epsilon') + # print(epsilon) + # print(epsilon.min(), epsilon.max()) + # print() + +# **************************** Part IIb ***************************** + + # Calculate C vector and gatherv + + '''**************** All *****************''' + + # Calculate C slice + C_slice = np.dot(RT.T, d/epsilon) + + '''Synchronization Barrier 3''' + # All vector gather C slices + recvcounts = [avecol] * (numtasks-1) + [avecol + extra_cols] + displacements = np.arange(numtasks) * avecol + comm.Gatherv(C_slice, [C, recvcounts, displacements, MPI.DOUBLE], root=MASTER) + +# **************************** Part IIc ***************************** + + # Iterative update of model-space M vector + + if taskid == MASTER: + + # logger.info("<< M-step >>") + + # Sanity check: print C + # print('C') + # print(C) + # print(C.min(), C.max()) + # print() + + delta = C / Rj - 1 + M = M + delta * M # Allows for optimization features presented in Siegert et al. 2020 + + # Sanity check: print M + # print('M') + # print(np.round(M, 5)) + # print(np.round(M.max(), 5)) + + # Sanity check: print delta + # print('delta') + # print(delta) + + # Pretty print - completion + print(f"Done") + print(linebreak_dashes) + + # Save iteration + if save_results_flag == True: + results.append(register_result(iter, M, delta)) + + '''****************** End Iterative Segment ******************''' + + # Print converged M + if taskid == MASTER: + # logger.info("<< Registering Result >>") + print('Converged M vector:') + print(np.round(M, 5)) + print(np.round(M.max(), 5)) + print(np.sum(M)) + print() + + if save_results_flag == True: + save_results(results) + + # MAXITER + if iter == (MAXITER - 1): + print(f'Reached maximum iterations = {MAXITER}') + print(linebreak_stars) + print() + + # MPI Shutdown + MPI.Finalize() + +if __name__ == "__main__": + main() diff --git a/cosipy/image_deconvolution/RichardsonLucyParallel.py b/cosipy/image_deconvolution/RichardsonLucyParallel.py new file mode 100644 index 00000000..b86186fe --- /dev/null +++ b/cosipy/image_deconvolution/RichardsonLucyParallel.py @@ -0,0 +1,154 @@ +import os +import subprocess +import logging +logger = logging.getLogger(__name__) + +# Import third party libraries +import numpy as np +import h5py +# from histpy import Histogram + +from .deconvolution_algorithm_base import DeconvolutionAlgorithmBase + +class RichardsonLucyParallel(DeconvolutionAlgorithmBase): + """ + NOTE: Comments copied from RichardsonLucy.py + A class for a parallel implementation of the Richardson- + Lucy algorithm. + + An example of parameter is as follows. + + iteration_max: 100 + minimum_flux: + value: 0.0 + unit: "cm-2 s-1 sr-1" + background_normalization_optimization: True + """ + + def __init__(self, initial_model, dataset, mask, parameter): + """ + NOTE: Copied from RichardsonLucy.py + """ + + DeconvolutionAlgorithmBase.__init__(self, initial_model, dataset, mask, parameter) + + # TODO: these RL algorithm improvements are yet to be implemented/utilized in this file + # self.do_acceleration = parameter.get('acceleration', False) + + # self.alpha_max = parameter.get('alpha_max', 1.0) + + # self.do_response_weighting = parameter.get('response_weighting', False) + # if self.do_response_weighting: + # self.response_weighting_index = parameter.get('response_weighting_index', 0.5) + + # self.do_smoothing = parameter.get('smoothing', False) + # if self.do_smoothing: + # self.smoothing_fwhm = parameter['smoothing_FWHM']['value'] * u.Unit(parameter['smoothing_FWHM']['unit']) + # logger.info(f"Gaussian filter with FWHM of {self.smoothing_fwhm} will be applied to delta images ...") + + self.do_bkg_norm_optimization = parameter.get('background_normalization_optimization', False) + if self.do_bkg_norm_optimization: + self.dict_bkg_norm_range = parameter.get('background_normalization_range', {key: [0.0, 100.0] for key in self.dict_bkg_norm.keys()}) + + self.save_results = parameter.get('save_results', False) + self.save_results_directory = parameter.get('save_results_directory', './results') + + if self.save_results is True: + if os.path.isdir(self.save_results_directory): + logger.warning(f"A directory {self.save_results_directory} already exists. Files in {self.save_results_directory} may be overwritten. Make sure that is not a problem.") + else: + os.makedirs(self.save_results_directory) + + # Specific to parallel implementation + image_response = self.dataset[0]._image_response + self.numproc = parameter.get('numproc', 1) + self.iteration_max = parameter.get('iteration_max', 10) + self.base_dir = os.getcwd() + self.data_dir = parameter.get('data_dir', './data') # NOTE: Data should ideally be present in disk scratch space. + self.numrows = np.product(image_response.contents.shape[-3:]) # Em, Phi, PsiChi. NOTE: Change the "-3" if more general model space definitions are expected + self.numcols = np.product(image_response.contents.shape[:-3]) # Remaining columns + self.config_file = parameter.absolute_path + + def initialization(self): + """ + initialization before running the image deconvolution + """ + # Flatten and write dense bkg and events to scratch space. + self.write_intermediate_files_to_disk() + + def write_intermediate_files_to_disk(self): + # Event + event = self.dataset[0].event.contents.flatten() + np.savetxt(self.base_dir + '/event.csv', event) + + # Background + bg = np.zeros(len(event)) + bg_models = self.dataset[0]._bkg_models + for key in bg_models: + bg += bg_models[key].contents.flatten() + np.savetxt(self.base_dir + '/bg.csv', bg) + + # Response matrix + image_response = self.dataset[0]._image_response + new_shape = (self.numrows, self.numcols) + ndim = image_response.contents.ndim + with h5py.File(self.base_dir + '/response_matrix.h5', 'w') as output_file: + dset1 = output_file.create_dataset('response_matrix', data=np.transpose(image_response.contents, + np.take(np.arange(ndim), + range(ndim-3, 2*ndim-3), + mode='wrap') + ).reshape(new_shape)) # NOTE: Change the "ndim-3" if more general model space definitions are expected + logger.info(f'Shape of response matrix {dset1.shape}') + dset2 = output_file.create_dataset('response_vector', data=np.sum(dset1, axis=0)) + logger.info(f'Shape of response vector summed along axis=0 {dset2.shape}') + + def iteration(self): + """ + Performs all iterations of image deconvolution. + NOTE: Overrides implementation in deconvolution_algorithm_base.py and invokes an external script + """ + + # All arguments must be passed as type=str. Explicitly type cast boolean and number to string. + FILE_DIR = os.path.dirname(os.path.abspath(__file__)) # Path to directory containing RichardsonLucyParallel.py + logger.info(f"Subprocess call to run RLparallelscript.py at '{FILE_DIR}'") + + # RLparallelscript.py will be installed in the same directory as RichardsonLucyParallel.py + stdout = subprocess.run(args=["mpiexec", "-n", str(self.numproc), + "python", FILE_DIR + "/RLparallelscript.py", + "--numrows", str(self.numrows), + "--numcols", str(self.numcols), + "--base_dir", str(self.base_dir), + "--config_file", str(self.config_file) + ], env=os.environ) + logger.info(stdout) + + # RLparallelscript already contains check_stopping_criteria and iteration_max break condition. + # NOTE: RichardsonLucy.py currently does not support a sophisticated break condition. + return True + + def finalization(self): + """ + finalization after running the image deconvolution + """ + # Delete intermediate files + self.remove_intermediate_files_from_disk() + + def remove_intermediate_files_from_disk(self): + # Ensure that the number of deletions corresponds to the + # number of file creations in write_... function + os.remove(self.base_dir + '/event.csv') + os.remove(self.base_dir + '/bg.csv') + os.remove(self.base_dir + '/response_matrix.h5') + + def pre_processing(self): + pass + def Estep(self): + pass + def Mstep(self): + pass + def post_processing(self): + pass + def check_stopping_criteria(self): + pass + def register_result(self): + pass diff --git a/cosipy/image_deconvolution/image_deconvolution.py b/cosipy/image_deconvolution/image_deconvolution.py index 362108ac..e07c4d85 100644 --- a/cosipy/image_deconvolution/image_deconvolution.py +++ b/cosipy/image_deconvolution/image_deconvolution.py @@ -9,13 +9,14 @@ from .RichardsonLucy import RichardsonLucy from .RichardsonLucySimple import RichardsonLucySimple +from .RichardsonLucyParallel import RichardsonLucyParallel class ImageDeconvolution: """ A class to reconstruct all-sky images from COSI data based on image deconvolution methods. """ model_classes = {"AllSkyImage": AllSkyImageModel} - deconvolution_algorithm_classes = {"RL": RichardsonLucy, "RLsimple": RichardsonLucySimple} + deconvolution_algorithm_classes = {"RL": RichardsonLucy, "RLsimple": RichardsonLucySimple, "RLparallel": RichardsonLucyParallel} def __init__(self): self._dataset = None diff --git a/docs/tutorials/image_deconvolution/511keV/GalacticCDS/imagedeconvolution_parfile_gal_511keV.yml b/docs/tutorials/image_deconvolution/511keV/GalacticCDS/imagedeconvolution_parfile_gal_511keV.yml index 53e53ae7..f7e9344d 100644 --- a/docs/tutorials/image_deconvolution/511keV/GalacticCDS/imagedeconvolution_parfile_gal_511keV.yml +++ b/docs/tutorials/image_deconvolution/511keV/GalacticCDS/imagedeconvolution_parfile_gal_511keV.yml @@ -1,7 +1,9 @@ author: Hiroki Yoneda -date: 2024-06-12 +date: 2024-06-12 # Modified by Anaya to include deconvolution:parameter:numproc on 2024-08-23 + model_definition: class: "AllSkyImage" + property: coordinate: "galactic" nside: 16 @@ -10,13 +12,16 @@ model_definition: value: [509.0, 513.0] unit: "keV" unit: "cm-2 s-1 sr-1" # do not change it as for now + initialization: algorithm: "flat" # more methods, e.g., simple-backprojection, user-defined, would be implemented. parameter: value: [1e-4] #the number of these values should be the same as "the number of energy_edges - 1". unit: "cm-2 s-1 sr-1" # do not change it as for now + deconvolution: - algorithm: "RL" + algorithm: "RL" # Choose from RL, RLsimple and RLparallel + parameter: iteration_max: 10 acceleration: True @@ -30,4 +35,5 @@ deconvolution: background_normalization_optimization: True background_normalization_range: {"albedo": [0.01, 10.0]} save_results: False - save_results_directory: "./results" \ No newline at end of file + save_results_directory: "./results" # Relative file path + numproc: 6 # Number of MPI threads to spawn. Limited by number of nodes available. Only applicable for algorithm: "RLparallel" \ No newline at end of file