diff --git a/cosipy/image_deconvolution/RLparallelscript.py b/cosipy/image_deconvolution/RLparallelscript.py new file mode 100644 index 00000000..561a6f38 --- /dev/null +++ b/cosipy/image_deconvolution/RLparallelscript.py @@ -0,0 +1,68 @@ +from pathlib import Path + +import logging +logger = logging.getLogger(__name__) + +from mpi4py import MPI +from histpy import Histogram + +from cosipy.response import FullDetectorResponse +from cosipy.image_deconvolution import ImageDeconvolution, DataIF_Parallel, DataIF_COSI_DC2 + +# Define MPI variables +MASTER = 0 # Indicates master process +DRM_DIR = Path('/Users/penguin/Documents/Grad School/Research/COSI/COSIpy/docs/tutorials/data') +DATA_DIR = Path('/Users/penguin/Documents/Grad School/Research/COSI/COSIpy/docs/tutorials/image_deconvolution/511keV/GalacticCDS') + +def main(): + ''' + Main script to create a parallel execution-compatible + dataset using DataIF_Parallel and call ImageDeconvolution + ''' + + # Set up MPI + comm = MPI.COMM_WORLD + + # Create dataset + dataset = DataIF_Parallel(name = '511keV', + event_filename = '511keV_dc2_galactic_event.hdf5', + bkg_filename = '511keV_dc2_galactic_bkg.hdf5', + drm_filename = 'psr_gal_511_DC2.h5', + comm = comm) # Convert dataset to a list of datasets before passing to RichardsonLucy class + + # bkg = Histogram.open(DATA_DIR / '511keV_dc2_galactic_bkg.hdf5') + # event = Histogram.open(DATA_DIR / '511keV_dc2_galactic_event.hdf5') + # image_response = Histogram.open(DRM_DIR / 'psr_gal_511_DC2.h5') + # dataset = DataIF_COSI_DC2.load(name = "511keV", # Create a dataset compatible with ImageDeconvolution: name (unique identifier), event data, background model, response, coordinate system conversion matrix (if detector response is not in galactic coordinates) + # event_binned_data = event.project(['Em', 'Phi', 'PsiChi']), + # dict_bkg_binned_data = {"total": bkg.project(['Em', 'Phi', 'PsiChi'])}, + # rsp = image_response) + + # Create image deconvolution object + image_deconvolution = ImageDeconvolution() + + # set data_interface to image_deconvolution + image_deconvolution.set_dataset([dataset]) + + # set a parameter file for the image deconvolution + parameter_filepath = DATA_DIR / 'imagedeconvolution_parfile_gal_511keV.yml' + image_deconvolution.read_parameterfile(parameter_filepath) + + parallel_computation = True + if comm.Get_rank() == MASTER: + master_node = True + else: + master_node = False + + # Initialize model + image_deconvolution.initialize(parallel_computation = parallel_computation, + master_node = master_node) + + # Execute deconvolution + image_deconvolution.run_deconvolution() + + # MPI Shutdown + MPI.Finalize() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cosipy/image_deconvolution/RichardsonLucy.py b/cosipy/image_deconvolution/RichardsonLucy.py index 99e038d2..c790339e 100644 --- a/cosipy/image_deconvolution/RichardsonLucy.py +++ b/cosipy/image_deconvolution/RichardsonLucy.py @@ -1,11 +1,15 @@ import os import copy -import numpy as np -import astropy.units as u -import astropy.io.fits as fits import logging + +# logging setup logger = logging.getLogger(__name__) +# Import third party libraries +import numpy as np +import pandas as pd +import astropy.units as u +from astropy.io import fits from histpy import Histogram from .deconvolution_algorithm_base import DeconvolutionAlgorithmBase @@ -36,7 +40,8 @@ class RichardsonLucy(DeconvolutionAlgorithmBase): """ - def __init__(self, initial_model, dataset, mask, parameter): + def __init__(self, initial_model:Histogram, dataset:list, mask, parameter, + parallel:bool=False, MASTER:bool=False): DeconvolutionAlgorithmBase.__init__(self, initial_model, dataset, mask, parameter) @@ -66,16 +71,22 @@ def __init__(self, initial_model, dataset, mask, parameter): else: os.makedirs(self.save_results_directory) + self.parallel = parallel + self.MASTER = MASTER + def initialization(self): """ - initialization before running the image deconvolution + initialization before performing image deconvolution """ + + # Master + if (not self.parallel) or (self.MASTER): + # Clear results + self.results.clear() + # clear counter self.iteration_count = 0 - # clear results - self.results.clear() - # copy model self.model = copy.deepcopy(self.initial_model) @@ -116,6 +127,9 @@ def Estep(self): """ pass + # At the end of this function, all processes should have a complete `self.expectation_list` + # to proceed to the Mstep function + def Mstep(self): """ M-step in RL algorithm. @@ -129,7 +143,7 @@ def Mstep(self): if self.mask is not None: self.delta_model = self.delta_model.mask_pixels(self.mask) - + # background normalization optimization if self.do_bkg_norm_optimization: for key in self.dict_bkg_norm.keys(): @@ -146,6 +160,11 @@ def Mstep(self): self.dict_bkg_norm[key] = bkg_norm + # At the end of this function, all the nodes will have a full + # copy of delta_model. Although this is not necessary, this is + # the easiest way without editing RichardsonLucy.py. + # The same applies for self.dict_bkg_norm + def post_processing(self): """ Here three processes will be performed. @@ -154,6 +173,7 @@ def post_processing(self): - acceleration of RL algirithm: the normalization of delta map is increased as long as the updated image has no non-negative components. """ + # All self.processed_delta_model = copy.deepcopy(self.delta_model) if self.do_response_weighting: @@ -172,7 +192,7 @@ def post_processing(self): if self.mask is not None: self.model = self.model.mask_pixels(self.mask) - + # update expectation_list self.expectation_list = self.calc_expectation_list(self.model, dict_bkg_norm = self.dict_bkg_norm) logger.debug("The expected count histograms were updated with the new model map.") @@ -181,6 +201,10 @@ def post_processing(self): self.loglikelihood_list = self.calc_loglikelihood_list(self.expectation_list) logger.debug("The loglikelihood list was updated with the new expected count histograms.") + # At the end of this function, all MPI processes needs to have a full + # copy of updated model. They calculate it from delta_model (which is + # distributed by MPI.Bcast) independently + def register_result(self): """ The values below are stored at the end of each iteration. @@ -193,21 +217,23 @@ def register_result(self): - loglikelihood: log-likelihood """ - this_result = {"iteration": self.iteration_count, - "model": copy.deepcopy(self.model), - "delta_model": copy.deepcopy(self.delta_model), - "processed_delta_model": copy.deepcopy(self.processed_delta_model), - "background_normalization": copy.deepcopy(self.dict_bkg_norm), - "alpha": self.alpha, - "loglikelihood": copy.deepcopy(self.loglikelihood_list)} - - # show intermediate results - logger.info(f' alpha: {this_result["alpha"]}') - logger.info(f' background_normalization: {this_result["background_normalization"]}') - logger.info(f' loglikelihood: {this_result["loglikelihood"]}') + # Master + if (not self.parallel) or (self.MASTER): + this_result = {"iteration": self.iteration_count, + "model": copy.deepcopy(self.model), + "delta_model": copy.deepcopy(self.delta_model), + "processed_delta_model": copy.deepcopy(self.processed_delta_model), + "background_normalization": copy.deepcopy(self.dict_bkg_norm), + "alpha": self.alpha, + "loglikelihood": copy.deepcopy(self.loglikelihood_list)} + + # show intermediate results + logger.info(f' alpha: {this_result["alpha"]}') + logger.info(f' background_normalization: {this_result["background_normalization"]}') + logger.info(f' loglikelihood: {this_result["loglikelihood"]}') - # register this_result in self.results - self.results.append(this_result) + # register this_result in self.results + self.results.append(this_result) def check_stopping_criteria(self): """ @@ -217,6 +243,7 @@ def check_stopping_criteria(self): ------- bool """ + if self.iteration_count < self.iteration_max: return False return True @@ -225,38 +252,41 @@ def finalization(self): """ finalization after running the image deconvolution """ - if self.save_results == True: - logger.info('Saving results in {self.save_results_directory}') - # model - for this_result in self.results: - iteration_count = this_result["iteration"] + # Master + if (not self.parallel) or (self.MASTER): + if self.save_results == True: + logger.info('Saving results in {self.save_results_directory}') - this_result["model"].write(f"{self.save_results_directory}/model_itr{iteration_count}.hdf5", overwrite = True) - this_result["delta_model"].write(f"{self.save_results_directory}/delta_model_itr{iteration_count}.hdf5", overwrite = True) - this_result["processed_delta_model"].write(f"{self.save_results_directory}/processed_delta_model_itr{iteration_count}.hdf5", overwrite = True) + # model + for this_result in self.results: + iteration_count = this_result["iteration"] - #fits - primary_hdu = fits.PrimaryHDU() + this_result["model"].write(f"{self.save_results_directory}/model_itr{iteration_count}.hdf5", overwrite = True) + this_result["delta_model"].write(f"{self.save_results_directory}/delta_model_itr{iteration_count}.hdf5", overwrite = True) + this_result["processed_delta_model"].write(f"{self.save_results_directory}/processed_delta_model_itr{iteration_count}.hdf5", overwrite = True) - col_iteration = fits.Column(name='iteration', array=[float(result['iteration']) for result in self.results], format='K') - col_alpha = fits.Column(name='alpha', array=[float(result['alpha']) for result in self.results], format='D') - cols_bkg_norm = [fits.Column(name=key, array=[float(result['background_normalization'][key]) for result in self.results], format='D') - for key in self.dict_bkg_norm.keys()] - cols_loglikelihood = [fits.Column(name=f"{self.dataset[i].name}", array=[float(result['loglikelihood'][i]) for result in self.results], format='D') - for i in range(len(self.dataset))] + #fits + primary_hdu = fits.PrimaryHDU() - table_alpha = fits.BinTableHDU.from_columns([col_iteration, col_alpha]) - table_alpha.name = "alpha" + col_iteration = fits.Column(name='iteration', array=[float(result['iteration']) for result in self.results], format='K') + col_alpha = fits.Column(name='alpha', array=[float(result['alpha']) for result in self.results], format='D') + cols_bkg_norm = [fits.Column(name=key, array=[float(result['background_normalization'][key]) for result in self.results], format='D') + for key in self.dict_bkg_norm.keys()] + cols_loglikelihood = [fits.Column(name=f"{self.dataset[i].name}", array=[float(result['loglikelihood'][i]) for result in self.results], format='D') + for i in range(len(self.dataset))] - table_bkg_norm = fits.BinTableHDU.from_columns([col_iteration] + cols_bkg_norm) - table_bkg_norm.name = "bkg_norm" + table_alpha = fits.BinTableHDU.from_columns([col_iteration, col_alpha]) + table_alpha.name = "alpha" - table_loglikelihood = fits.BinTableHDU.from_columns([col_iteration] + cols_loglikelihood) - table_loglikelihood.name = "loglikelihood" + table_bkg_norm = fits.BinTableHDU.from_columns([col_iteration] + cols_bkg_norm) + table_bkg_norm.name = "bkg_norm" - hdul = fits.HDUList([primary_hdu, table_alpha, table_bkg_norm, table_loglikelihood]) - hdul.writeto(f'{self.save_results_directory}/results.fits', overwrite=True) + table_loglikelihood = fits.BinTableHDU.from_columns([col_iteration] + cols_loglikelihood) + table_loglikelihood.name = "loglikelihood" + + hdul = fits.HDUList([primary_hdu, table_alpha, table_bkg_norm, table_loglikelihood]) + hdul.writeto(f'{self.save_results_directory}/results.fits', overwrite=True) def calc_alpha(self, delta_model, model): """ @@ -267,6 +297,7 @@ def calc_alpha(self, delta_model, model): float Acceleration parameter """ + diff = -1 * (model / delta_model).contents diff[(diff <= 0) | (delta_model.contents == 0)] = np.inf diff --git a/cosipy/image_deconvolution/__init__.py b/cosipy/image_deconvolution/__init__.py index 147d3ff6..0ae97d7c 100644 --- a/cosipy/image_deconvolution/__init__.py +++ b/cosipy/image_deconvolution/__init__.py @@ -2,6 +2,7 @@ from .image_deconvolution_data_interface_base import ImageDeconvolutionDataInterfaceBase from .dataIF_COSI_DC2 import DataIF_COSI_DC2 +from .dataIF_Parallel import DataIF_Parallel from .model_base import ModelBase from .allskyimage import AllSkyImageModel diff --git a/cosipy/image_deconvolution/allskyimage.py b/cosipy/image_deconvolution/allskyimage.py index 665ad74e..43721223 100644 --- a/cosipy/image_deconvolution/allskyimage.py +++ b/cosipy/image_deconvolution/allskyimage.py @@ -154,6 +154,8 @@ def set_values_from_parameters(self, parameter): self[:,idx] = value * unit # elif algorithm_name == ... # ... + else: + raise ValueError(f'Model algorithm {algorithm_name} not supported') def set_values_from_extendedmodel(self, extendedmodel): """ diff --git a/cosipy/image_deconvolution/dataIF_Parallel.py b/cosipy/image_deconvolution/dataIF_Parallel.py new file mode 100644 index 00000000..765b6a6e --- /dev/null +++ b/cosipy/image_deconvolution/dataIF_Parallel.py @@ -0,0 +1,437 @@ +import sys +from pathlib import Path + +import logging +logger = logging.getLogger(__name__) + +import numpy as np +import astropy.units as u +from mpi4py import MPI +import h5py +from histpy import Histogram, Axes, Axis, HealpixAxis + +from cosipy.response import FullDetectorResponse +from cosipy.image_deconvolution import ImageDeconvolutionDataInterfaceBase + +# Define npix in NuLambda and PsiChi +# TODO: information is contained in FullDetectorResponse +# and will be supported at a later release +NUMROWS = 3072 +NUMCOLS = 3072 +MASTER = 0 + +# Define data paths +DRM_DIR = Path('/Users/penguin/Documents/Grad School/Research/COSI/COSIpy/docs/tutorials/data') +DATA_DIR = Path('/Users/penguin/Documents/Grad School/Research/COSI/COSIpy/docs/tutorials/image_deconvolution/511keV/GalacticCDS') + +def load_response_matrix(comm, start_col, end_col, filename): + ''' + Response matrix + ''' + with h5py.File(DRM_DIR / filename, "r", driver="mpio", comm=comm) as f1: + dataset = f1['hist/contents'] + R = dataset[1:-1, 1:-1, 1:-1, 1:-1, start_col+1:end_col+1] + + hist_group = f1['hist'] + if 'unit' in hist_group.attrs: + unit = u.Unit(hist_group.attrs['unit']) + + # Axes + axes_group = hist_group['axes'] + + axes = [] + for axis in axes_group.values(): + label = axis.attrs['label'] + + # Get class. Backwards compatible with version + # with only Axis + axis_cls = Axis + + if '__class__' in axis.attrs: + class_module, class_name = axis.attrs['__class__'] + axis_cls = getattr(sys.modules[class_module], class_name) + axis_tmp = axis_cls._open(axis) + + if label == 'PsiChi': + axis_tmp = HealpixAxis(edges = axis_tmp.edges[start_col:end_col+1], + label = axis_tmp.label, + scale = axis_tmp._scale, + coordsys = axis_tmp._coordsys, + nside = axis_tmp.nside) + + axes += [axis_tmp] + + return Histogram(axes, contents = R, unit = unit) + +def load_response_matrix_transpose(comm, start_row, end_row, filename): + ''' + Response matrix tranpose + ''' + with h5py.File(DRM_DIR / filename, "r", driver="mpio", comm=comm) as f1: + dataset = f1['hist/contents'] + RT = dataset[start_row+1:end_row+1, 1:-1, 1:-1, 1:-1, 1:-1] + + hist_group = f1['hist'] + if 'unit' in hist_group.attrs: + unit = u.Unit(hist_group.attrs['unit']) + + # Axes + axes_group = hist_group['axes'] + + axes = [] + for axis in axes_group.values(): + label = axis.attrs['label'] + + # Get class. Backwards compatible with version + # with only Axis + axis_cls = Axis + + if '__class__' in axis.attrs: + class_module, class_name = axis.attrs['__class__'] + axis_cls = getattr(sys.modules[class_module], class_name) + axis_tmp = axis_cls._open(axis) + + if label == 'NuLambda': + axis_tmp = HealpixAxis(edges = axis_tmp.edges[start_row:end_row+1], + label = axis_tmp.label, + scale = axis_tmp._scale, + coordsys = axis_tmp._coordsys, + nside = axis_tmp.nside) + + axes += [axis_tmp] + + return Histogram(axes, contents = RT, unit = unit) + +class DataIF_Parallel(ImageDeconvolutionDataInterfaceBase): + """ + A subclass of ImageDeconvolutionDataInterfaceBase for the COSI data challenge 2. + """ + + def __init__(self, event_filename, bkg_filename, drm_filename, name = None, comm = None): + + ImageDeconvolutionDataInterfaceBase.__init__(self, name) + + # Specific to parallel implementation: + # 1. Assume numproc is known by the process that invoked `run_deconvolution()` + # 2. All processes have loaded event data, background, and created + # initial model (from model properties) independently + self.parallel = False + if comm is not None: + self.numtasks = comm.Get_size() + self.taskid = comm.Get_rank() + self._comm = comm + if self.numtasks > 1: + self.parallel = True + logger.info('Image Deconvolution set to run in parallel mode') + + self._MPI_init(event_filename, bkg_filename, drm_filename, comm) + + # None if using Galactic CDS, required if using local CDS + self._coordsys_conv_matrix = None + + # Calculate exposure map + self._calc_exposure_map() + + def _MPI_load_data(self, event_filename, bkg_filename, drm_filename, comm): + + numtasks = self.numtasks + taskid = self.taskid + + print(f'TaskID = {taskid}, Number of tasks = {numtasks}') + + # Calculate the indices in Rij that the process has to parse. My hunch is that calculating these scalars individually will be faster than the MPI send broadcast overhead. + self.averow = NUMROWS // numtasks + self.extra_rows = NUMROWS % numtasks + self.start_row = taskid * self.averow + self.end_row = (taskid + 1) * self.averow if taskid < (numtasks - 1) else NUMROWS + + # Calculate the indices in Rji, i.e., Rij transpose, that the process has to parse. + self.avecol = NUMCOLS // numtasks + self.extra_cols = NUMCOLS % numtasks + self.start_col = taskid * self.avecol + self.end_col = (taskid + 1) * self.avecol if taskid < (numtasks - 1) else NUMCOLS + + # Load event_binned_data + event = Histogram.open(DATA_DIR / event_filename) + self._event = event.project(['Em', 'Phi', 'PsiChi']).to_dense() + + # Load dict_bg_binned_data + bkg = Histogram.open(DATA_DIR / bkg_filename) + self._bkg_models = {"total": bkg.project(['Em', 'Phi', 'PsiChi']).to_dense()} + + # Load response and response transpose + self._image_response = load_response_matrix(comm, self.start_col, self.end_col, filename=drm_filename) + self._image_response_T = load_response_matrix_transpose(comm, self.start_row, self.end_row, filename=drm_filename) + + self.col_size = 1 # TODO: This can change for more sophisticated model space contents + self.row_size = np.prod(self.event.contents.shape[:-1]) + + def _MPI_set_aux_data(self): + + # Set variable _model_axes + # Derived from Parent class (ImageDeconvolutionDataInterfaceBase) + axes = [self._image_response.axes['NuLambda'], self._image_response.axes['Ei']] + axes[0].label = 'lb' + self._model_axes = Axes(axes) + ## Create model_axes_slice + axes = [] + for axis in self.model_axes: + if axis.label == 'lb': + axes.append(HealpixAxis(edges = axis.edges[self.start_row:self.end_row+1], + label = axis.label, + scale = axis._scale, + coordsys = axis._coordsys, + nside = axis.nside)) + else: + axes.append(axis) + self._model_axes_slice = Axes(axes) + + # Set variable _data_axes + # Derived from Parent class (ImageDeconvolutionDataInterfaceBase) + self._data_axes = self.event.axes + ## Create data_axes_slice + axes = [] + for axis in self.data_axes: + if axis.label == 'PsiChi': + axes.append(HealpixAxis(edges = axis.edges[self.start_col:self.end_col+1], + label = axis.label, + scale = axis._scale, + coordsys = axis._coordsys, + nside = axis.nside)) + else: + axes.append(axis) + self._data_axes_slice = Axes(axes) + + ## Create bkg_model_slice Histogram + self._bkg_models_slice = {} + for key in self._bkg_models: + bkg_model = self._bkg_models[key] + self._summed_bkg_models[key] = np.sum(bkg_model) + self._bkg_models_slice[key] = bkg_model.slice[:, :, self.start_col:self.end_col] + + def _MPI_init(self, event_filename, bkg_filename, drm_filename, comm): + + self._MPI_load_data(event_filename, bkg_filename, drm_filename, comm) + + self._MPI_set_aux_data() + + def _calc_exposure_map(self): + """ + Calculate exposure_map, which is an intermidiate matrix used in RL algorithm. + """ + + logger.info("Calculating an exposure map...") + + if self._coordsys_conv_matrix is None: + self._exposure_map = Histogram(self._model_axes, unit = self._image_response.unit * u.sr) + self._exposure_map[:] = np.sum(self._image_response.contents, axis = (2,3,4)) * self.model_axes['lb'].pixarea() + else: + self._exposure_map = Histogram(self._model_axes, unit = self._image_response.unit * self._coordsys_conv_matrix.unit * u.sr) + self._exposure_map[:] = np.tensordot(np.sum(self._coordsys_conv_matrix, axis = (0)), + np.sum(self._image_response, axis = (2,3,4)), + axes = ([1], [0]) ) * self._image_response.unit * self._coordsys_conv_matrix.unit * self.model_axes['lb'].pixarea() + # [Time/ScAtt, lb, NuLambda] -> [lb, NuLambda] + # [NuLambda, Ei, Em, Phi, PsiChi] -> [NuLambda, Ei] + # [lb, NuLambda] x [NuLambda, Ei] -> [lb, Ei] + + if self.parallel: + ''' + Synchronization Barrier 0 (performed only once) + ''' + total_exposure_map = np.empty_like(self.exposure_map.contents, dtype=np.float64) + + # Gather all arrays into recvbuf + self._comm.Allreduce(self.exposure_map.contents, total_exposure_map, op=MPI.SUM) # For multiple MPI processes, full = [slice1, ... sliceN] + + # Reshape the received buffer back into the original array shape + self.exposure_map[:] = total_exposure_map + + logger.info("Finished...") + + def calc_expectation(self, model, dict_bkg_norm = None, almost_zero = 1e-12): + """ + Calculate expected counts from a given model. + + Parameters + ---------- + model : :py:class:`cosipy.image_deconvolution.AllSkyImageModel` + Model map + dict_bkg_norm : dict, default None + background normalization for each background model, e.g, {'albedo': 0.95, 'activation': 1.05} + almost_zero : float, default 1e-12 + In order to avoid zero components in extended count histogram, a tiny offset is introduced. + It should be small enough not to effect statistics. + + Returns + ------- + :py:class:`histpy.Histogram` + Expected count histogram + + Notes + ----- + This method should be implemented in a more general class, for example, extended source response class in the future. + """ + # Currenly (2024-01-12) this method can work for both local coordinate CDS and in galactic coordinate CDS. + # This is just because in DC2 the rotate response for galactic coordinate CDS does not have an axis for time/scatt binning. + # However it is likely that it will have such an axis in the future in order to consider background variability depending on time and pointign direction etc. + # Then, the implementation here will not work. Thus, keep in mind that we need to modify it once the response format is fixed. + + expectation_slice = Histogram(self._data_axes_slice) + + if self._coordsys_conv_matrix is None: + expectation_slice[:] = np.tensordot( model.contents, self._image_response.contents, axes = ([0,1],[0,1])) * model.axes['lb'].pixarea() + # ['lb', 'Ei'] x [NuLambda(lb), Ei, Em, Phi, PsiChi] -> [Em, Phi, PsiChi] + else: + map_rotated = np.tensordot(self._coordsys_conv_matrix.contents, model.contents, axes = ([1], [0])) + # ['Time/ScAtt', 'lb', 'NuLambda'] x ['lb', 'Ei'] -> [Time/ScAtt, NuLambda, Ei] + map_rotated *= self._coordsys_conv_matrix.unit * model.unit + map_rotated *= model.axes['lb'].pixarea() + # data.coordsys_conv_matrix.contents is sparse, so the unit should be restored. + # the unit of map_rotated is 1/cm2 ( = s * 1/cm2/s/sr * sr) + expectation_slice[:] = np.tensordot( map_rotated, self._image_response.contents, axes = ([1,2], [0,1])) + # [Time/ScAtt, NuLambda, Ei] x [NuLambda, Ei, Em, Phi, PsiChi] -> [Time/ScAtt, Em, Phi, PsiChi] + + if dict_bkg_norm is not None: + for key in self.keys_bkg_models(): + expectation_slice += self.bkg_model_slice(key) * dict_bkg_norm[key] + + expectation_slice += almost_zero + + # Parallel + if self.parallel: + ''' + Synchronization Barrier 1 + ''' + + # Calculate all_sizes and displacements + all_sizes = [(self.averow) * self.row_size] * (self.numtasks-1) + [(self.averow + self.extra_rows) * self.row_size] + displacements = np.insert(np.cumsum(all_sizes), 0, 0)[0:-1] + + # Create a buffer to receive the gathered data + total_size = int(np.sum(all_sizes)) + recvbuf = np.empty(total_size, dtype=np.float64) # Receive buffer + + # Gather all arrays into recvbuf + self._comm.Allgatherv(expectation_slice.contents.flatten(), [recvbuf, all_sizes, displacements, MPI.DOUBLE]) # For multiple MPI processes, full = [slice1, ... sliceN] + + # Reshape the received buffer back into the original 3D array shape + epsilon = np.concatenate([ recvbuf[displacements[i]:displacements[i] + all_sizes[i]].reshape(expectation_slice.contents.shape[:-1] + (-1,)) for i in range(self.numtasks) ], axis=-1) + + # Add to list that manages multiple datasets + expectation = Histogram(self.event.axes, contents=epsilon, unit=self.event.unit) + + # Serial + else: + expectation = expectation_slice # If single process, then full = slice + + return expectation + + def calc_T_product(self, dataspace_histogram): + """ + Calculate the product of the input histogram with the transonse matrix of the response function. + Let R_{ij}, H_{i} be the response matrix and dataspace_histogram, respectively. + Note that i is the index for the data space, and j is for the model space. + In this method, \sum_{j} H{i} R_{ij}, namely, R^{T} H is calculated. + + Parameters + ---------- + dataspace_histogram: :py:class:`histpy.Histogram` + Its axes must be the same as self.data_axes + + Returns + ------- + :py:class:`histpy.Histogram` + The product with self.model_axes + """ + # TODO: currently, dataspace_histogram is assumed to be a dense. + + hist_unit = self.exposure_map.unit + if dataspace_histogram.unit is not None: + hist_unit *= dataspace_histogram.unit + + hist_slice = Histogram(self._model_axes_slice, unit = hist_unit) + if self._coordsys_conv_matrix is None: + hist_slice[:] = np.tensordot(dataspace_histogram.contents, self._image_response_T.contents, axes = ([0,1,2], [2,3,4])) * self.model_axes['lb'].pixarea() + # [Em, Phi, PsiChi] x [NuLambda (lb), Ei, Em, Phi, PsiChi] -> [NuLambda (lb), Ei] + else: + _ = np.tensordot(dataspace_histogram.contents, self._image_response_T.contents, axes = ([1,2,3], [2,3,4])) + # [Time/ScAtt, Em, Phi, PsiChi] x [NuLambda, Ei, Em, Phi, PsiChi] -> [Time/ScAtt, NuLambda, Ei] + + hist_slice[:] = np.tensordot(self._coordsys_conv_matrix.contents, _, axes = ([0,2], [0,1])) \ + * _.unit * self._coordsys_conv_matrix.unit * self.model_axes['lb'].pixarea() + # [Time/ScAtt, lb, NuLambda] x [Time/ScAtt, NuLambda, Ei] -> [lb, Ei] + # note that coordsys_conv_matrix is sparse, so the unit should be recovered separately. + + # Parallel + if self.parallel: + ''' + Synchronization Barrier 2 + ''' + + # Calculate all_sizes and displacements + all_sizes = [(self.avecol) * self.col_size] * (self.numtasks-1) + [(self.avecol + self.extra_cols) * self.col_size] + displacements = np.insert(np.cumsum(all_sizes), 0, 0)[0:-1] + + # Create a receive buffer to receive the gathered data + recvbuf = np.empty(int(np.sum(all_sizes)), dtype=np.float64) + + # Gather all arrays into recvbuf + self._comm.Allgatherv(hist_slice.contents.value.flatten(), [recvbuf, all_sizes, displacements, MPI.DOUBLE]) # For multiple MPI processes, full = [slice1, ... sliceN] + + # Reshape the received buffer back into the original 2D array shape + C = np.concatenate([ recvbuf[displacements[i]:displacements[i] + all_sizes[i]].reshape((-1,) + hist_slice.contents.shape[1:]) for i in range(self.numtasks) ], axis=0) + + # hist_slice (only slice operated on by current node) --> sum_T_product (all) + hist = Histogram(self.model_axes, contents=C, unit=hist_slice.unit) + + # Serial + else: + hist = hist_slice + + return hist + + def calc_bkg_model_product(self, key, dataspace_histogram): + """ + Calculate the product of the input histogram with the background model. + Let B_{i}, H_{i} be the background model and dataspace_histogram, respectively. + In this method, \sum_{i} B_{i} H_{i} is calculated. + + Parameters + ---------- + key: str + Background model name + dataspace_histogram: :py:class:`histpy.Histogram` + its axes must be the same as self.data_axes + + Returns + ------- + flaot + """ + # TODO: currently, dataspace_histogram is assumed to be a dense. + + if self._coordsys_conv_matrix is None: + + return np.tensordot(dataspace_histogram.contents, self.bkg_model(key).contents, axes = ([0,1,2], [0,1,2])) + + return np.tensordot(dataspace_histogram.contents, self.bkg_model(key).contents, axes = ([0,1,2,3], [0,1,2,3])) + + def calc_loglikelihood(self, expectation): + """ + Calculate log-likelihood from given expected counts or model/expectation. + + Parameters + ---------- + expectation : :py:class:`histpy.Histogram` + Expected count histogram. + + Returns + ------- + float + Log-likelood + """ + loglikelood = np.sum( self.event * np.log(expectation) ) - np.sum(expectation) + + return loglikelood + + def bkg_model_slice(self, key): + return self._bkg_models_slice[key] diff --git a/cosipy/image_deconvolution/deconvolution_algorithm_base.py b/cosipy/image_deconvolution/deconvolution_algorithm_base.py index 63d6a2f5..e1dc70fa 100644 --- a/cosipy/image_deconvolution/deconvolution_algorithm_base.py +++ b/cosipy/image_deconvolution/deconvolution_algorithm_base.py @@ -233,9 +233,9 @@ def calc_summed_bkg_model(self, key): return np.sum([self.dataset[i].summed_bkg_model(key) for i in indexlist]) - def calc_summed_T_product(self, dataspace_histogram_list): + def calc_summed_T_product(self, dataspace_histogram_list): # dataspace_histogram_list = ratio_list = d_i/E_i """ - For each data in the registered dataset, the product of the corresponding input histogram with the transonse of the response function is computed. + For each data in the registered dataset, the product of the corresponding input histogram with the transpose of the response function is computed. Then, this method returns the sum of all of the products. Parameters diff --git a/cosipy/image_deconvolution/image_deconvolution.py b/cosipy/image_deconvolution/image_deconvolution.py index 362108ac..804cbd00 100644 --- a/cosipy/image_deconvolution/image_deconvolution.py +++ b/cosipy/image_deconvolution/image_deconvolution.py @@ -4,6 +4,7 @@ logger = logging.getLogger(__name__) from yayc import Configurator +from pathlib import Path from .allskyimage import AllSkyImageModel @@ -25,9 +26,9 @@ def __init__(self): self._model_class = None self._deconvolution_class = None - def set_dataset(self, dataset): + def set_dataset(self, dataset: list): """ - Set dataset + Set dataset as a list. Parameters ---------- @@ -51,7 +52,7 @@ def set_mask(self, mask): self._mask = mask - def read_parameterfile(self, parameter_filepath): + def read_parameterfile(self, parameter_filepath: str | Path): """ Read parameters from a yaml file. @@ -60,7 +61,6 @@ def read_parameterfile(self, parameter_filepath): parameter_filepath : str or pathlib.Path Path of parameter file. """ - self._parameter = Configurator.open(parameter_filepath) logger.debug(f"parameter file for image deconvolution was set -> {parameter_filepath}") @@ -118,17 +118,20 @@ def results(self): """ return self._deconvolution.results - def initialize(self): + def initialize(self, parallel_computation = False, master_node = True): """ Initialize an initial model and an image deconvolution algorithm. It is mandatory to execute this method before running the image deconvolution. """ + self.parallel_computation = parallel_computation + self.master_node = master_node + logger.info("#### Initialization Starts ####") self.model_initialization() - self.register_deconvolution_algorithm() + self.register_deconvolution_algorithm() logger.info("#### Initialization Finished ####") @@ -142,9 +145,9 @@ def model_initialization(self): whether the instantiation and initialization are successfully done. """ # set self._model_class - model_name = self.parameter['model_definition']['class'] + model_name = self.parameter['model_definition']['class'] # Options include "AllSkyImage", etc. - if not model_name in self.model_classes.keys(): + if not model_name in self.model_classes.keys(): # See model_classes dictionary declared above logger.error(f'The model class "{model_name}" does not exist!') raise ValueError @@ -161,11 +164,11 @@ def model_initialization(self): # setting initial values logger.info("<< Setting initial values of the created model object >>") parameter_model_initialization = Configurator(self.parameter['model_definition']['initialization']) - self._initial_model.set_values_from_parameters(parameter_model_initialization) + self._initial_model.set_values_from_parameters(parameter_model_initialization) # Initialize M vector and save contents to self._initial_model (which has inherited type Histogram) # applying a mask to the model if needed if self.mask is not None: - self._initial_model = self._initial_model.mask_pixels(self.mask, 0) + self._initial_model = self._initial_model.mask_pixels(mask=self.mask, fill_value=0) # Use self.set_mask(mask) to set a mask # axes check if not self._check_model_response_consistency(): @@ -194,11 +197,13 @@ def register_deconvolution_algorithm(self): logger.error(f'The algorithm "{algorithm_name}" does not exist!') raise ValueError - self._deconvolution_class = self.deconvolution_algorithm_classes[algorithm_name] - self._deconvolution = self._deconvolution_class(initial_model = self.initial_model, + self._deconvolution_class = self.deconvolution_algorithm_classes[algorithm_name] # Alias to class constructor + self._deconvolution = self._deconvolution_class(initial_model = self.initial_model, # Initialize object for relevant class dataset = self.dataset, mask = self.mask, - parameter = algorithm_parameter) + parameter = algorithm_parameter, + parallel = self.parallel_computation, + MASTER = self.master_node) logger.info("---- parameters ----") logger.info(parameter_deconvolution.dump()) diff --git a/docs/tutorials/source_injector/Point_source_injector.ipynb b/docs/tutorials/source_injector/Point_source_injector.ipynb old mode 100755 new mode 100644