diff --git a/py/picca/delta_extraction/data_catalogues/sdss_data.py b/py/picca/delta_extraction/data_catalogues/sdss_data.py index 540ed4eac..c341e54f9 100644 --- a/py/picca/delta_extraction/data_catalogues/sdss_data.py +++ b/py/picca/delta_extraction/data_catalogues/sdss_data.py @@ -2,6 +2,7 @@ import os import logging import time +from multiprocessing import Pool import numpy as np import fitsio @@ -112,84 +113,106 @@ def __parse_config(self, config): config["wave solution"] = "log" - def read_from_spec(self, catalogue): - """Read the spectra and formats its data as Forest instances. + def _read_single_spec(self, catalog, i_row): + """Read a single spectrum from a catalog row, + formats its data as a Forest instance. Arguments --------- - catalogue: astropy.table.Table + catalog: astropy.table.Table Table with the DRQ catalogue + + i_row: int + Row entry + """ + row = catalog[i_row] + thingid = row['THING_ID'] + plate = row["PLATE"] + mjd = row["MJD"] + fiberid = row["FIBERID"] + + filename = (f"{self.input_directory}/{plate}/spec-{plate}-{mjd}-" + f"{fiberid:04d}.fits") + try: + hdul = fitsio.FITS(filename) + except IOError: + self.logger.warning(f"Error reading {filename}. Ignoring file") + return None + self.logger.progress(f"Read {filename}") + + try: + log_lambda = np.array(hdul[1]["loglam"][:], dtype=np.float64) + flux = np.array(hdul[1]["flux"][:], dtype=np.float64) + ivar = (np.array(hdul[1]["ivar"][:], dtype=np.float64) * + hdul[1]["and_mask"][:] == 0) + except OSError: + self.logger.warning(f"Error reading HDU for {filename}. Ignoring file") + return None + + if self.analysis_type == "BAO 3D": + forest = SdssForest( + **{ + "log_lambda": log_lambda, + "flux": flux, + "ivar": ivar, + "thingid": thingid, + "ra": row["RA"], + "dec": row["DEC"], + "z": row["Z"], + "plate": plate, + "mjd": mjd, + "fiberid": fiberid + }) + elif self.analysis_type == "PK 1D": + # compute difference between exposure + exposures_diff = exp_diff(hdul, log_lambda) + # compute spectral resolution + wdisp = hdul[1]["wdisp"][:] + reso = spectral_resolution(wdisp, True, fiberid, log_lambda) + + forest = SdssPk1dForest( + **{ + "log_lambda": log_lambda, + "flux": flux, + "ivar": ivar, + "thingid": thingid, + "ra": row["RA"], + "dec": row["DEC"], + "z": row["Z"], + "plate": plate, + "mjd": mjd, + "fiberid": fiberid, + "exposures_diff": exposures_diff, + "reso": reso, + "reso_pix": wdisp + }) + else: + raise DataError("An unknown analysis_type was specified," + " currently PK 1D and BAO 3D are supported.") + + forest.rebin() + return (thingid, forest) + + def read_from_spec(self, catalog): + """Read the spectra and formats its data as Forest instances. + + Arguments + --------- + catalog: astropy.table.Table + Table with the DRQ catalog """ - self.logger.progress(f"Reading {len(catalogue)} objects") + self.logger.progress(f"Reading {len(catalog)} objects") forests_by_thingid = {} #-- Loop over unique objects - for row in catalogue: - thingid = row['THING_ID'] - plate = row["PLATE"] - mjd = row["MJD"] - fiberid = row["FIBERID"] - - filename = (f"{self.input_directory}/{plate}/spec-{plate}-{mjd}-" - f"{fiberid:04d}.fits") - try: - hdul = fitsio.FITS(filename) - except IOError: - self.logger.warning(f"Error reading {filename}. Ignoring file") - continue - self.logger.progress(f"Read {filename}") + pool = Pool(self.num_processors) + output_spec = pool.starmap( + self._read_single_spec, [[catalog, i] for i in range(len(catalog))]) - try: - log_lambda = np.array(hdul[1]["loglam"][:], dtype=np.float64) - flux = np.array(hdul[1]["flux"][:], dtype=np.float64) - ivar = (np.array(hdul[1]["ivar"][:], dtype=np.float64) * - hdul[1]["and_mask"][:] == 0) - except OSError: - self.logger.warning(f"Error reading HDU for {filename}. Ignoring file") - continue - - if self.analysis_type == "BAO 3D": - forest = SdssForest( - **{ - "log_lambda": log_lambda, - "flux": flux, - "ivar": ivar, - "thingid": thingid, - "ra": row["RA"], - "dec": row["DEC"], - "z": row["Z"], - "plate": plate, - "mjd": mjd, - "fiberid": fiberid - }) - elif self.analysis_type == "PK 1D": - # compute difference between exposure - exposures_diff = exp_diff(hdul, log_lambda) - # compute spectral resolution - wdisp = hdul[1]["wdisp"][:] - reso = spectral_resolution(wdisp, True, fiberid, log_lambda) - - forest = SdssPk1dForest( - **{ - "log_lambda": log_lambda, - "flux": flux, - "ivar": ivar, - "thingid": thingid, - "ra": row["RA"], - "dec": row["DEC"], - "z": row["Z"], - "plate": plate, - "mjd": mjd, - "fiberid": fiberid, - "exposures_diff": exposures_diff, - "reso": reso, - "reso_pix": wdisp - }) - else: - raise DataError("An unknown analysis_type was specified," - " currently PK 1D and BAO 3D are supported.") + # remove ignored files: + output_spec = [x for x in output_spec if x is not None] - forest.rebin() + for (thingid, forest) in output_spec: if thingid in forests_by_thingid: forests_by_thingid[thingid].coadd(forest) else: