Skip to content

Commit

Permalink
parallelize SDSS spectra file reading
Browse files Browse the repository at this point in the history
  • Loading branch information
armengau committed Feb 5, 2025
1 parent 9bfc2f6 commit 7c5df0d
Showing 1 changed file with 91 additions and 68 deletions.
159 changes: 91 additions & 68 deletions py/picca/delta_extraction/data_catalogues/sdss_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import logging
import time
from multiprocessing import Pool

import numpy as np
import fitsio
Expand Down Expand Up @@ -112,84 +113,106 @@ def __parse_config(self, config):

config["wave solution"] = "log"

def read_from_spec(self, catalogue):
"""Read the spectra and formats its data as Forest instances.
def _read_single_spec(self, catalog, i_row):
"""Read a single spectrum from a catalog row,
formats its data as a Forest instance.
Arguments
---------
catalogue: astropy.table.Table
catalog: astropy.table.Table
Table with the DRQ catalogue
i_row: int
Row entry
"""
row = catalog[i_row]
thingid = row['THING_ID']
plate = row["PLATE"]
mjd = row["MJD"]
fiberid = row["FIBERID"]

filename = (f"{self.input_directory}/{plate}/spec-{plate}-{mjd}-"
f"{fiberid:04d}.fits")
try:
hdul = fitsio.FITS(filename)
except IOError:
self.logger.warning(f"Error reading {filename}. Ignoring file")
return None
self.logger.progress(f"Read {filename}")

try:
log_lambda = np.array(hdul[1]["loglam"][:], dtype=np.float64)
flux = np.array(hdul[1]["flux"][:], dtype=np.float64)
ivar = (np.array(hdul[1]["ivar"][:], dtype=np.float64) *
hdul[1]["and_mask"][:] == 0)
except OSError:
self.logger.warning(f"Error reading HDU for {filename}. Ignoring file")
return None

if self.analysis_type == "BAO 3D":
forest = SdssForest(
**{
"log_lambda": log_lambda,
"flux": flux,
"ivar": ivar,
"thingid": thingid,
"ra": row["RA"],
"dec": row["DEC"],
"z": row["Z"],
"plate": plate,
"mjd": mjd,
"fiberid": fiberid
})
elif self.analysis_type == "PK 1D":
# compute difference between exposure
exposures_diff = exp_diff(hdul, log_lambda)
# compute spectral resolution
wdisp = hdul[1]["wdisp"][:]
reso = spectral_resolution(wdisp, True, fiberid, log_lambda)

forest = SdssPk1dForest(
**{
"log_lambda": log_lambda,
"flux": flux,
"ivar": ivar,
"thingid": thingid,
"ra": row["RA"],
"dec": row["DEC"],
"z": row["Z"],
"plate": plate,
"mjd": mjd,
"fiberid": fiberid,
"exposures_diff": exposures_diff,
"reso": reso,
"reso_pix": wdisp
})
else:
raise DataError("An unknown analysis_type was specified,"
" currently PK 1D and BAO 3D are supported.")

forest.rebin()
return (thingid, forest)

def read_from_spec(self, catalog):
"""Read the spectra and formats its data as Forest instances.
Arguments
---------
catalog: astropy.table.Table
Table with the DRQ catalog
"""
self.logger.progress(f"Reading {len(catalogue)} objects")
self.logger.progress(f"Reading {len(catalog)} objects")

forests_by_thingid = {}
#-- Loop over unique objects
for row in catalogue:
thingid = row['THING_ID']
plate = row["PLATE"]
mjd = row["MJD"]
fiberid = row["FIBERID"]

filename = (f"{self.input_directory}/{plate}/spec-{plate}-{mjd}-"
f"{fiberid:04d}.fits")
try:
hdul = fitsio.FITS(filename)
except IOError:
self.logger.warning(f"Error reading {filename}. Ignoring file")
continue
self.logger.progress(f"Read {filename}")
pool = Pool(self.num_processors)
output_spec = pool.starmap(
self._read_single_spec, [[catalog, i] for i in range(len(catalog))])

try:
log_lambda = np.array(hdul[1]["loglam"][:], dtype=np.float64)
flux = np.array(hdul[1]["flux"][:], dtype=np.float64)
ivar = (np.array(hdul[1]["ivar"][:], dtype=np.float64) *
hdul[1]["and_mask"][:] == 0)
except OSError:
self.logger.warning(f"Error reading HDU for {filename}. Ignoring file")
continue

if self.analysis_type == "BAO 3D":
forest = SdssForest(
**{
"log_lambda": log_lambda,
"flux": flux,
"ivar": ivar,
"thingid": thingid,
"ra": row["RA"],
"dec": row["DEC"],
"z": row["Z"],
"plate": plate,
"mjd": mjd,
"fiberid": fiberid
})
elif self.analysis_type == "PK 1D":
# compute difference between exposure
exposures_diff = exp_diff(hdul, log_lambda)
# compute spectral resolution
wdisp = hdul[1]["wdisp"][:]
reso = spectral_resolution(wdisp, True, fiberid, log_lambda)

forest = SdssPk1dForest(
**{
"log_lambda": log_lambda,
"flux": flux,
"ivar": ivar,
"thingid": thingid,
"ra": row["RA"],
"dec": row["DEC"],
"z": row["Z"],
"plate": plate,
"mjd": mjd,
"fiberid": fiberid,
"exposures_diff": exposures_diff,
"reso": reso,
"reso_pix": wdisp
})
else:
raise DataError("An unknown analysis_type was specified,"
" currently PK 1D and BAO 3D are supported.")
# remove ignored files:
output_spec = [x for x in output_spec if x is not None]

forest.rebin()
for (thingid, forest) in output_spec:
if thingid in forests_by_thingid:
forests_by_thingid[thingid].coadd(forest)
else:
Expand Down

0 comments on commit 7c5df0d

Please sign in to comment.