diff --git a/pyproject.toml b/pyproject.toml index 92116e219..2c564ba41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,8 @@ dependencies = [ #"roman_datamodels @ git+https://github.com/spacetelescope/roman_datamodels.git", "scipy >=1.14.1", # "stcal>=1.10.0,<1.11.0", - "stcal @ git+https://github.com/spacetelescope/stcal.git@main", + # "stcal @ git+https://github.com/spacetelescope/stcal.git@main", + "stcal @ git+https://github.com/mcara/stcal.git@resample-common-code2", # "stpipe >=0.7.0,<0.8.0", "stpipe @ git+https://github.com/spacetelescope/stpipe.git@main", "tweakwcs >=0.8.8", diff --git a/romancal/outlier_detection/utils.py b/romancal/outlier_detection/utils.py index 21085f389..02bccca3e 100644 --- a/romancal/outlier_detection/utils.py +++ b/romancal/outlier_detection/utils.py @@ -256,7 +256,6 @@ def detect_outliers( resamp = ResampleData( library, single=True, - blendheaders=False, # FIXME prior code provided weight_type when only wht_type is understood # both default to 'ivm' but tests that set this to something else did # not change the resampling weight type. For now, disabling it to match diff --git a/romancal/resample/gwcs_drizzle.py b/romancal/resample/gwcs_drizzle.py index 75eeb2a39..caaa1abc7 100644 --- a/romancal/resample/gwcs_drizzle.py +++ b/romancal/resample/gwcs_drizzle.py @@ -1,7 +1,7 @@ import logging import numpy as np -from drizzle import cdrizzle, util +from drizzle import cdrizzle from . import resample_utils @@ -375,7 +375,7 @@ def dodrizzle( """ # Insure that the fillval parameter gets properly interpreted for use with tdriz - if util.is_blank(str(fillval)): + if fillval.strip() == "": fillval = "INDEF" else: fillval = str(fillval) diff --git a/romancal/resample/resample.py b/romancal/resample/resample.py index c359f1593..5a2cad170 100644 --- a/romancal/resample/resample.py +++ b/romancal/resample/resample.py @@ -5,7 +5,7 @@ import numpy as np from astropy import units as u from astropy.coordinates import SkyCoord -from drizzle import cdrizzle, util +from drizzle import cdrizzle from roman_datamodels import datamodels, maker_utils, stnode from stcal.alignment.util import compute_s_region_keyword, compute_scale @@ -18,11 +18,7 @@ log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) -__all__ = ["OutputTooLargeError", "ResampleData"] - - -class OutputTooLargeError(MemoryError): - """Raised when the output is too large for in-memory instantiation""" +__all__ = ["ResampleData"] class ResampleData: @@ -55,7 +51,12 @@ def __init__( good_bits="0", pscale_ratio=1.0, pscale=None, - **kwargs, + in_memory=True, + output_wcs=None, + output_shape=None, + crpix=None, + crval=None, + rotation=None, ): """ Parameters @@ -66,19 +67,6 @@ def __init__( output : str filename for output - - kwargs : dict - Other parameters. - - .. note:: - ``output_shape`` is in the ``x, y`` order. - - .. note:: - ``in_memory`` controls whether or not the resampled - array from ``resample_many_to_many()`` - should be kept in memory or written out to disk and - deleted from memory. Default value is `True` to keep - all products in memory. """ if (input_models is None) or (len(input_models) == 0): raise ValueError( @@ -94,7 +82,7 @@ def __init__( self.fillval = fillval self.weight_type = wht_type self.good_bits = good_bits - self.in_memory = kwargs.get("in_memory", True) + self.in_memory = in_memory if "target" in input_models.asn: self.location_name = input_models.asn["target"] else: @@ -105,12 +93,6 @@ def __init__( log.info(f"Driz parameter fillval: {self.fillval}") log.info(f"Driz parameter weight_type: {self.weight_type}") - output_wcs = kwargs.get("output_wcs", None) - output_shape = kwargs.get("output_shape", None) - crpix = kwargs.get("crpix", None) - crval = kwargs.get("crval", None) - rotation = kwargs.get("rotation", None) - if pscale is not None: log.info(f"Output pixel scale: {pscale} arcsec.") pscale /= 3600.0 @@ -151,7 +133,7 @@ def __init__( models = list(self.input_models) # update meta.basic - populate_mosaic_basic(self.blank_output, models) + self.blank_output.meta.basic.product_type = "TBD" # update meta.cal_step self.blank_output.meta.cal_step = maker_utils.mk_l3_cal_step( @@ -164,13 +146,26 @@ def __init__( cal_logs = model.meta.cal_logs # removing meta.cal_logs del model.meta["cal_logs"] + # Update the output with all the component metas - populate_mosaic_individual(self.blank_output, [model]) + self.blank_output.append_individual_image_meta(model.meta) + # re-attaching cal_logs to meta model.meta.cal_logs = cal_logs # update meta data and wcs - l2_into_l3_meta(self.blank_output.meta, models[0].meta) + l2_meta = models[0].meta + self.blank_output.meta.basic.visit = l2_meta.observation.visit + self.blank_output.meta.basic.segment = l2_meta.observation.segment + self.blank_output.meta.basic["pass"] = l2_meta.observation["pass"] + self.blank_output.meta.basic.program = l2_meta.observation.program + self.blank_output.meta.basic.optical_element = ( + l2_meta.instrument.optical_element + ) + self.blank_output.meta.basic.instrument = l2_meta.instrument.name + self.blank_output.meta.coordinates = l2_meta.coordinates + self.blank_output.meta.program = l2_meta.program + self.blank_output.meta.wcs = self.output_wcs gwcs_into_l3(self.blank_output, self.output_wcs) @@ -206,7 +201,11 @@ def resample_group(self, input_models, indices): output_model.meta["resample"] = maker_utils.mk_resample() output_model.meta.basic.location_name = self.location_name - copy_asn_info_from_library(input_models, output_model) + # copy over asn information + if (asn_pool := input_models.asn.get("asn_pool", None)) is not None: + output_model.meta.asn.pool_name = asn_pool + if (asn_table_name := input_models.asn.get("table_name", None)) is not None: + output_model.meta.asn.table_name = asn_table_name with input_models: example_image = input_models.borrow(indices[0]) @@ -539,18 +538,20 @@ def update_exposure_times(self, output_model, exptime_tot): f"Mean, max exposure times: {total_exposure_time:.1f}, " f"{max_exposure_time:.1f}" ) - exposure_times = {"start": [], "end": []} + exposure_times = {"start": [], "end": [], "mid": []} with self.input_models: for indices in self.input_models.group_indices.values(): index = indices[0] model = self.input_models.borrow(index) exposure_times["start"].append(model.meta.exposure.start_time) exposure_times["end"].append(model.meta.exposure.end_time) + exposure_times["mid"].append(model.meta.exposure.mid_time.mjd) self.input_models.shelve(model, index, modify=False) # Update some basic exposure time values based on output_model output_model.meta.basic.mean_exposure_time = total_exposure_time output_model.meta.basic.time_first_mjd = min(exposure_times["start"]).mjd + output_model.meta.basic.time_mean_mjd = np.mean(exposure_times["mid"]) output_model.meta.basic.time_last_mjd = max(exposure_times["end"]).mjd output_model.meta.basic.max_exposure_time = max_exposure_time output_model.meta.resample.product_exposure_time = max_exposure_time @@ -673,7 +674,7 @@ def drizzle_arrays( """ # Insure that the fillval parameter gets properly interpreted for use with tdriz - fillval = "INDEF" if util.is_blank(str(fillval)) else str(fillval) + fillval = "INDEF" if str(fillval).strip() == "" else str(fillval) if insci.dtype > np.float32: insci = insci.astype(np.float32) @@ -731,39 +732,6 @@ def drizzle_arrays( ) -def l2_into_l3_meta(l3_meta, l2_meta): - """Update the level 3 meta with info from the level 2 meta - - Parameters - ---------- - l3_meta : dict - The meta to update. This is updated in-place - - l2_meta : stnode - The Level 2-like meta to pull from - - Notes - ----- - The list of meta that is pulled from the Level 2 meta into the Level 3 meta is as follows: - basic.visit: observation.visit - basic.segment: observation.segment - basic.pass: observation.pass - basic.program: observation.program - basic.optical_element: optical_element - basic.instrument: instrument.name - basic.telescope: telescope - program: program - """ - l3_meta.basic.visit = l2_meta.observation.visit - l3_meta.basic.segment = l2_meta.observation.segment - l3_meta.basic["pass"] = l2_meta.observation["pass"] - l3_meta.basic.program = l2_meta.observation.program - l3_meta.basic.optical_element = l2_meta.instrument.optical_element - l3_meta.basic.instrument = l2_meta.instrument.name - l3_meta.coordinates = l2_meta.coordinates - l3_meta.program = l2_meta.program - - def gwcs_into_l3(model, wcs): """Update the Level 3 wcsinfo block from a GWCS object @@ -870,97 +838,3 @@ def calc_pa(wcs, ra, dec): coord = SkyCoord(ra, dec, frame="icrs", unit="deg") return coord.position_angle(delta_coord).degree - - -def populate_mosaic_basic( - output_model: datamodels.MosaicModel, input_models: list | ModelLibrary -): - """ - Populate basic metadata fields in the output mosaic model based on input models. - - Parameters - ---------- - output_model : MosaicModel - Object to populate with basic metadata. - input_models : [List, ModelLibrary] - List of input data models from which to extract the metadata. - ModelLibrary is also supported. - - Returns - ------- - None - """ - - input_meta = [datamodel.meta for datamodel in input_models] - - # time data - output_model.meta.basic.time_first_mjd = np.min( - [x.exposure.start_time.mjd for x in input_meta] - ) - output_model.meta.basic.time_last_mjd = np.max( - [x.exposure.end_time.mjd for x in input_meta] - ) - output_model.meta.basic.time_mean_mjd = np.mean( - [x.exposure.mid_time.mjd for x in input_meta] - ) - - # observation data - output_model.meta.basic.visit = ( - input_meta[0].observation.visit - if len({x.observation.visit for x in input_meta}) == 1 - else -1 - ) - output_model.meta.basic.segment = ( - input_meta[0].observation.segment - if len({x.observation.segment for x in input_meta}) == 1 - else -1 - ) - output_model.meta.basic["pass"] = ( - input_meta[0].observation["pass"] - if len({x.observation["pass"] for x in input_meta}) == 1 - else -1 - ) - output_model.meta.basic.program = ( - input_meta[0].observation.program - if len({x.observation.program for x in input_meta}) == 1 - else -1 - ) - - # instrument data - output_model.meta.basic.optical_element = input_meta[0].instrument.optical_element - output_model.meta.basic.instrument = input_meta[0].instrument.name - - # association product type - output_model.meta.basic.product_type = "TBD" - - -def populate_mosaic_individual( - output_model: datamodels.MosaicModel, input_models: [list, ModelLibrary] -): - """ - Populate individual meta fields in the output mosaic model based on input models. - - Parameters - ---------- - output_model : MosaicModel - Object to populate with basic metadata. - input_models : [List, ModelLibrary] - List of input data models from which to extract the metadata. - ModelLibrary is also supported. - - Returns - ------- - None - """ - - input_metas = [datamodel.meta for datamodel in input_models] - for input_meta in input_metas: - output_model.append_individual_image_meta(input_meta) - - -def copy_asn_info_from_library(input_models, output_model): - # copy over asn information - if (asn_pool := input_models.asn.get("asn_pool", None)) is not None: - output_model.meta.asn.pool_name = asn_pool - if (asn_table_name := input_models.asn.get("table_name", None)) is not None: - output_model.meta.asn.table_name = asn_table_name diff --git a/romancal/resample/resample_step.py b/romancal/resample/resample_step.py index 32c418b06..15fddead8 100644 --- a/romancal/resample/resample_step.py +++ b/romancal/resample/resample_step.py @@ -7,14 +7,12 @@ import asdf import numpy as np -from astropy.extern.configobj.configobj import ConfigObj -from astropy.extern.configobj.validate import Validator from roman_datamodels import datamodels from stcal.alignment import util from ..datamodels import ModelLibrary from ..stpipe import RomanStep -from . import resample +from .resample import ResampleData if TYPE_CHECKING: from typing import ClassVar @@ -73,6 +71,13 @@ class ResampleStep(RomanStep): reference_file_types: ClassVar = [] def process(self, input): + if self.output_shape is not None: + for v in self.output_shape: + if v < 1: + raise ValueError( + f"output shape values must be >= 1: {self.output_shape}" + ) + if isinstance(input, datamodels.DataModel): input_models = ModelLibrary([input]) # set output filename from meta.filename found in the first datamodel @@ -111,37 +116,38 @@ def process(self, input): # resample can only handle 2D images, not 3D cubes, etc raise RuntimeError(f"Input {input_models[0]} is not a 2D image.") - self.wht_type = self.weight_type - self.log.info("Setting drizzle's default parameters...") - kwargs = self.set_drizzle_defaults() - # Issue a warning about the use of exptime weighting - if self.wht_type == "exptime": + if self.weight_type == "exptime": self.log.warning("Use of EXPTIME weighting will result in incorrect") self.log.warning("propagated errors in the resampled product") # Custom output WCS parameters. + self.log.info("Setting drizzle's default parameters...") # Modify get_drizpars if any of these get into reference files: - kwargs["output_shape"] = self._check_list_pars( - self.output_shape, "output_shape", min_vals=[1, 1] - ) - kwargs["output_wcs"] = self._load_custom_wcs( - self.output_wcs, kwargs["output_shape"] - ) - kwargs["crpix"] = self._check_list_pars(self.crpix, "crpix") - kwargs["crval"] = self._check_list_pars(self.crval, "crval") - kwargs["rotation"] = self.rotation - kwargs["pscale"] = self.pixel_scale - kwargs["pscale_ratio"] = self.pixel_scale_ratio - kwargs["in_memory"] = self.in_memory # Call the resampling routine - resamp = resample.ResampleData(input_models, output=output, **kwargs) + resamp = ResampleData( + input_models, + output=output, + output_shape=self.output_shape, + output_wcs=self._load_custom_wcs(self.output_wcs, self.output_shape), + crpix=self.crpix, + crval=self.crval, + rotation=self.rotation, + pscale=self.pixel_scale, + pscale_ratio=self.pixel_scale_ratio, + in_memory=self.in_memory, + good_bits=self.good_bits, + wht_type=str(self.weight_type), + fillval=str(self.fillval), + kernel=str(self.kernel), + pixfrac=self.pixfrac, + ) result = resamp.do_drizzle() with result: for i, model in enumerate(result): - self._final_updates(model, input_models, kwargs) + self._final_updates(model, input_models) result.shelve(model, i) if len(result) == 1: model = result.borrow(0) @@ -150,7 +156,7 @@ def process(self, input): return result - def _final_updates(self, model, input_models, kwargs): + def _final_updates(self, model, input_models): model.meta.cal_step["resample"] = "COMPLETE" model.meta.wcsinfo.s_region = util.compute_s_region_imaging( model.meta.wcs, model.data.shape @@ -164,60 +170,10 @@ def _final_updates(self, model, input_models, kwargs): if self.pixel_scale else self.pixel_scale_ratio ) - model.meta.resample.pixfrac = kwargs["pixfrac"] - self.update_phot_keywords(model) - model.meta.resample["good_bits"] = kwargs["good_bits"] - - @staticmethod - def _check_list_pars(vals, name, min_vals=None): - """ - Check if a specific keyword parameter is properly formatted. - - Parameters - ---------- - vals : list or tuple - A list or tuple containing a pair of values currently assigned to the - keyword parameter `name`. Both values must be either `None` or not `None`. - name : str - The name of the keyword parameter. - min_vals : list or tuple, optional - A list or tuple containing a pair of minimum values to be assigned - to `name`, by default None. - - Returns - ------- - None or list - If either `vals` is set to `None` (or both of its elements), the - returned result will be `None`. Otherwise, the returned result will be - a list containing the current values assigned to `name`. - - Raises - ------ - ValueError - This error will be raised if any of the following conditions are met: - - the number of elements of `vals` is not 2; - - the currently assigned values of `vals` are smaller than the - minimum value provided; - - one element is `None` and the other is not `None`. - """ - if vals is None: - return None - if len(vals) != 2: - raise ValueError(f"List '{name}' must have exactly two elements.") - n = sum(x is None for x in vals) - if n == 2: - return None - elif n == 0: - if ( - min_vals - and sum(x >= y for x, y in zip(vals, min_vals, strict=False)) != 2 - ): - raise ValueError( - f"'{name}' values must be larger or equal to {list(min_vals)}" - ) - return list(vals) - else: - raise ValueError(f"Both '{name}' values must be either None or not None.") + model.meta.resample.pixfrac = self.pixfrac + if model.meta.photometry.pixel_area is not None: + model.meta.photometry.pixel_area *= model.meta.resample.pixel_scale_ratio**2 + model.meta.resample["good_bits"] = self.good_bits @staticmethod def _load_custom_wcs(asdf_wcs_file, output_shape): @@ -244,47 +200,3 @@ def _load_custom_wcs(asdf_wcs_file, output_shape): ) return wcs - - def update_phot_keywords(self, model): - """Update pixel scale keywords""" - if model.meta.photometry.pixel_area is not None: - model.meta.photometry.pixel_area *= model.meta.resample.pixel_scale_ratio**2 - - def set_drizzle_defaults(self): - """Set the default parameters for drizzle.""" - configspec = self.load_spec_file() - config = ConfigObj(configspec=configspec) - if config.validate(Validator()): - kwargs = config.dict() - - if self.pixfrac is None: - self.pixfrac = 1.0 - if self.kernel is None: - self.kernel = "square" - if self.fillval is None: - self.fillval = "INDEF" - # Force definition of good bits - kwargs["good_bits"] = self.good_bits - kwargs["pixfrac"] = self.pixfrac - kwargs["kernel"] = str(self.kernel) - kwargs["fillval"] = str(self.fillval) - # self.weight_type has a default value of None - # The other instruments read this parameter from a reference file - if self.wht_type is None: - self.wht_type = "ivm" - - kwargs["wht_type"] = str(self.wht_type) - kwargs["pscale_ratio"] = self.pixel_scale_ratio - kwargs.pop("pixel_scale_ratio") - - for k, v in kwargs.items(): - if k in [ - "pixfrac", - "kernel", - "fillval", - "wht_type", - "pscale_ratio", - ]: - log.info(" using: %s=%s", k, repr(v)) - - return kwargs diff --git a/romancal/resample/resample_utils.py b/romancal/resample/resample_utils.py index 4b8c78aa0..3860e9d1d 100644 --- a/romancal/resample/resample_utils.py +++ b/romancal/resample/resample_utils.py @@ -174,50 +174,6 @@ def build_driz_weight( return result.astype(np.float32) -def build_mask(dqarr, bitvalue): - """ - Build a bit mask from an input DQ array and a bitvalue flag. - - Parameters - ---------- - dqarr : numpy.ndarray - Input DQ array. - bitvalue : str - Bitvalue flag. - - Returns - ------- - ndarray - Bit mask where 1 represents good and 0 represents bad. - - Notes - ----- - - The function interprets the bitvalue flag using the - `astropy.nddata.bitmask.interpret_bit_flags` function. - - If the bitvalue is None, the function returns a bit mask with all elements - set to 1. - - Otherwise, the function performs a bitwise AND operation between the dqarr and - the complement of the bitvalue, and then applies a logical NOT operation to - obtain the bit mask. - - The resulting bit mask is returned as a `numpy.ndarray` of dtype `numpy.uint8`. - """ - warnings.warn( - "'build_mask' has been deprecated since release 0.16.3. " - "Use functions from astropy.nddata.bitmask module instead such as " - "bitfield_to_boolean_mask().", - DeprecationWarning, - stacklevel=2, - ) - dqmask = bitfield_to_boolean_mask( - dqarr, - bitvalue, - good_mask_value=1, - dtype=np.uint8, - flag_name_map=pixel, - ) - return dqmask - - def calc_gwcs_pixmap(in_wcs, out_wcs, shape=None): """ Generate a pixel map grid using the input and output WCS. diff --git a/romancal/resample/tests/test_resample.py b/romancal/resample/tests/test_resample.py index bd782c607..543aa9f91 100644 --- a/romancal/resample/tests/test_resample.py +++ b/romancal/resample/tests/test_resample.py @@ -3,7 +3,6 @@ from astropy import coordinates as coord from astropy import units as u from astropy.modeling import models -from astropy.table import QTable from astropy.time import Time from gwcs import WCS from gwcs import coordinate_frames as cf @@ -15,8 +14,6 @@ from romancal.resample import gwcs_drizzle, resample_utils from romancal.resample.resample import ( ResampleData, - populate_mosaic_basic, - populate_mosaic_individual, ) @@ -716,341 +713,6 @@ def test_resampledata_do_drizzle_default_single_exposure_weight_array( output_models_many_to_one.shelve(many_to_one_model, 0, modify=False) -def test_populate_mosaic_basic_single_exposure(exposure_1): - """ - Test the populate_mosaic_basic function with a given exposure. - """ - input_models = ModelLibrary(exposure_1) - with input_models: - models = list(input_models) - crpix = tuple(s // 2 for s in models[0].data.shape[::-1]) - crval = models[0].meta.wcs(*crpix) - output_wcs = resample_utils.make_output_wcs( - models, - pscale_ratio=1, - pscale=0.000031, - rotation=0, - shape=None, - crpix=(0, 0), - crval=crval, - ) - - output_model = maker_utils.mk_datamodel( - datamodels.MosaicModel, shape=tuple(output_wcs.array_shape) - ) - - populate_mosaic_basic(output_model, input_models=models) - - input_meta = [datamodel.meta for datamodel in models] - - [input_models.shelve(model, i, modify=False) for i, model in enumerate(models)] - - assert output_model.meta.basic.time_first_mjd == np.min( - [x.exposure.start_time.mjd for x in input_meta] - ) - assert output_model.meta.basic.time_last_mjd == np.max( - [x.exposure.end_time.mjd for x in input_meta] - ) - assert output_model.meta.basic.time_mean_mjd == np.mean( - [x.exposure.mid_time.mjd for x in input_meta] - ) - assert output_model.meta.basic.visit == ( - input_meta[0].observation.visit - if len({x.observation.visit for x in input_meta}) == 1 - else -1 - ) - assert output_model.meta.basic.segment == ( - input_meta[0].observation.segment - if len({x.observation.segment for x in input_meta}) == 1 - else -1 - ) - assert output_model.meta.basic["pass"] == ( - input_meta[0].observation["pass"] - if len({x.observation["pass"] for x in input_meta}) == 1 - else -1 - ) - assert output_model.meta.basic.program == ( - input_meta[0].observation.program - if len({x.observation.program for x in input_meta}) == 1 - else -1 - ) - assert ( - output_model.meta.basic.optical_element - == input_meta[0].instrument.optical_element - ) - assert output_model.meta.basic.instrument == input_meta[0].instrument.name - - -@pytest.mark.parametrize( - "input_models_data, expected_output", - [ - ( - [ - ( - 59000.0, - 59000.5, - 1, - 1, - 1, - 12345, - "N/A", - "F158", - "WFI", - ), - ( - 59000.5, - 59001.0, - 1, - 1, - 1, - 12345, - "N/A", - "F158", - "WFI", - ), - ], - { - "time_first_mjd": 59000.0, - "time_last_mjd": 59001.0, - "time_mean_mjd": 59000.5, - "visit": 1, - "segment": 1, - "pass": 1, - "program": 12345, - "survey": "N/A", - "optical_element": "F158", - "instrument": "WFI", - }, - ), - # different visits - ( - [ - ( - 59000.0, - 59000.5, - 1, - 1, - 1, - 12345, - "N/A", - "F158", - "WFI", - ), - ( - 59000.5, - 59001.0, - 2, - 1, - 1, - 12345, - "N/A", - "F158", - "WFI", - ), - ], - { - "time_first_mjd": 59000.0, - "time_last_mjd": 59001.0, - "time_mean_mjd": 59000.5, - "visit": -1, - "segment": 1, - "pass": 1, - "program": 12345, - "survey": "N/A", - "optical_element": "F158", - "instrument": "WFI", - }, - ), - # different segments - ( - [ - ( - 59000.0, - 59000.5, - 1, - 1, - 1, - 12345, - "N/A", - "F158", - "WFI", - ), - ( - 59000.5, - 59001.0, - 1, - 2, - 1, - 12345, - "N/A", - "F158", - "WFI", - ), - ], - { - "time_first_mjd": 59000.0, - "time_last_mjd": 59001.0, - "time_mean_mjd": 59000.5, - "visit": 1, - "segment": -1, - "pass": 1, - "program": 12345, - "survey": "N/A", - "optical_element": "F158", - "instrument": "WFI", - }, - ), - # different passes - ( - [ - ( - 59000.0, - 59000.5, - 1, - 1, - 1, - 12345, - "HLS", - "F158", - "WFI", - ), - ( - 59000.5, - 59001.0, - 1, - 1, - 2, - 12345, - "EMS", - "F158", - "WFI", - ), - ], - { - "time_first_mjd": 59000.0, - "time_last_mjd": 59001.0, - "time_mean_mjd": 59000.5, - "visit": 1, - "segment": 1, - "pass": -1, - "program": 12345, - "survey": "MULTIPLE", - "optical_element": "F158", - "instrument": "WFI", - }, - ), - # different programs - ( - [ - ( - 59000.0, - 59000.5, - 1, - 1, - 1, - 12345, - "N/A", - "F158", - "WFI", - ), - ( - 59000.5, - 59001.0, - 1, - 1, - 1, - 54321, - "N/A", - "F158", - "WFI", - ), - ], - { - "time_first_mjd": 59000.0, - "time_last_mjd": 59001.0, - "time_mean_mjd": 59000.5, - "visit": 1, - "segment": 1, - "pass": 1, - "program": -1, - "survey": "N/A", - "optical_element": "F158", - "instrument": "WFI", - }, - ), - # different surveys - ( - [ - ( - 59000.0, - 59000.5, - 1, - 1, - 1, - 12345, - "HLS", - "F158", - "WFI", - ), - ( - 59000.5, - 59001.0, - 1, - 1, - 1, - 12345, - "EMS", - "F158", - "WFI", - ), - ], - { - "time_first_mjd": 59000.0, - "time_last_mjd": 59001.0, - "time_mean_mjd": 59000.5, - "visit": 1, - "segment": 1, - "pass": 1, - "program": 12345, - "survey": "MULTIPLE", - "optical_element": "F158", - "instrument": "WFI", - }, - ), - ], -) -def test_populate_mosaic_basic_different_observations( - input_models_data, expected_output -): - """Test that populate_mosaic_basic function works properly under different observational scenarios.""" - input_models = [create_mock_model(*data) for data in input_models_data] - output_wcs = resample_utils.make_output_wcs( - input_models, - pscale_ratio=1, - pscale=0.000031, - rotation=0, - shape=None, - crpix=(0, 0), - crval=(30, 45), - ) - output_model = maker_utils.mk_datamodel( - datamodels.MosaicModel, shape=tuple(output_wcs.array_shape) - ) - - # Act - populate_mosaic_basic(output_model, input_models) - - # Assert - assert output_model.meta.basic.time_first_mjd == expected_output["time_first_mjd"] - assert output_model.meta.basic.time_last_mjd == expected_output["time_last_mjd"] - assert output_model.meta.basic.time_mean_mjd == expected_output["time_mean_mjd"] - assert output_model.meta.basic.visit == expected_output["visit"] - assert output_model.meta.basic.segment == expected_output["segment"] - assert output_model.meta.basic.program == expected_output["program"] - assert output_model.meta.basic.optical_element == expected_output["optical_element"] - assert output_model.meta.basic.instrument == expected_output["instrument"] - - def test_l3_wcsinfo(multiple_exposures): """Test the population of the Level 3 wcsinfo block""" expected = maker_utils.mk_mosaic_wcsinfo( @@ -1102,25 +764,3 @@ def test_l3_wcsinfo(multiple_exposures): if key not in ["projection", "s_region"]: assert np.allclose(output_model.meta.wcsinfo[key], expected[key]) output_models.shelve(output_model, 0, modify=False) - - -def test_l3_individual_image_meta(multiple_exposures): - """Test that the individual_image_meta is being populated""" - input_models = multiple_exposures - output_model = maker_utils.mk_datamodel(datamodels.MosaicModel) - - # Act - populate_mosaic_individual(output_model, input_models) - - # Assert sizes are expected - n_inputs = len(input_models) - for value in output_model.meta.individual_image_meta.values(): - assert isinstance(value, QTable) - assert len(value) == n_inputs - - # Assert spot check on filename, which is different for each mock input - basic_table = output_model.meta.individual_image_meta.basic - for idx, input in enumerate(input_models): - assert input.meta.filename == basic_table["filename"][idx] - - assert "background" in output_model.meta.individual_image_meta diff --git a/romancal/resample/tests/test_resample_step.py b/romancal/resample/tests/test_resample_step.py index 2a479b7c3..4c95098e4 100644 --- a/romancal/resample/tests/test_resample_step.py +++ b/romancal/resample/tests/test_resample_step.py @@ -1,71 +1,21 @@ +from functools import reduce + import numpy as np import pytest from asdf import AsdfFile from astropy import coordinates as coord from astropy import units as u from astropy.modeling import models +from astropy.table import QTable +from astropy.time import Time from gwcs import WCS from gwcs import coordinate_frames as cf from roman_datamodels import datamodels, maker_utils +from romancal.datamodels import ModelLibrary from romancal.resample import ResampleStep, resample_utils -class MockModel: - def __init__(self, pixel_area, pixel_scale_ratio): - self.meta = MockMeta(pixel_area, pixel_scale_ratio) - - -class MockMeta: - def __init__(self, pixel_area, pixel_scale_ratio): - self.photometry = MockPhotometry(pixel_area) - self.resample = MockResample(pixel_scale_ratio) - - -class MockPhotometry: - def __init__(self, pixel_area): - self.pixel_area = pixel_area - - -class MockResample: - def __init__(self, pixel_scale_ratio): - self.pixel_scale_ratio = pixel_scale_ratio - - -class Mosaic: - def __init__(self, fiducial_world, pscale, shape, filename, n_images): - self.fiducial_world = fiducial_world - self.pscale = pscale - self.shape = shape - self.filename = filename - self.n_images = n_images - - def create_mosaic(self): - """ - Create a dummy L3 datamodel given the coordinates of the fiducial point, - a pixel scale, and the image shape and filename. - - Returns - ------- - datamodels.MosaicModel - An L3 MosaicModel datamodel. - """ - l3 = maker_utils.mk_level3_mosaic( - shape=self.shape, - n_images=self.n_images, - ) - # data from WFISim simulation of SCA #01 - l3.meta.filename = self.filename - l3.meta["wcs"] = create_wcs_object_without_distortion( - fiducial_world=self.fiducial_world, - pscale=self.pscale, - shape=self.shape, - ) - # Call the forward transform so its value is pulled from disk - _ = l3.meta.wcs.forward_transform - return datamodels.MosaicModel(l3) - - def create_wcs_object_without_distortion(fiducial_world, pscale, shape, **kwargs): """ Create a simple WCS object without either distortion or rotation. @@ -151,21 +101,6 @@ def _create_asdf_wcs_file(tmp_path): return _create_asdf_wcs_file -@pytest.mark.parametrize( - "vals, name, min_vals, expected", - [ - ([1, 2], "list1", None, [1, 2]), - ([None, None], "list2", None, None), - ([1, 2], "list4", [0, 0], [1, 2]), - ], -) -def test_check_list_pars_valid(vals, name, min_vals, expected): - step = ResampleStep() - - result = step._check_list_pars(vals, name, min_vals) - assert result == expected - - def test_load_custom_wcs_no_file(): step = ResampleStep() result = step._load_custom_wcs(None, (512, 512)) @@ -199,69 +134,6 @@ def test_load_custom_wcs_asdf_without_wcs_attribute(tmp_path): step._load_custom_wcs(str(file_path), (100, 100)) -@pytest.mark.parametrize( - "vals, name, min_vals, expected", - [ - ([1, 2], "test", [0, 0], [1, 2]), - ([None, None], "test", [0, 0], None), - ([0, 0], "test", [0, 0], [0, 0]), - ([1, 1], "test", [0, 0], [1, 1]), - ([0, 1], "test", [0, 0], [0, 1]), - ([1, 0], "test", [0, 0], [1, 0]), - ], -) -def test_check_list_pars(vals, name, min_vals, expected): - step = ResampleStep() - - result = step._check_list_pars(vals, name, min_vals) - assert result == expected - - -@pytest.mark.parametrize( - "vals, name, min_vals", - [ - ([None, 2], "test", [0, 0]), - ([1, None], "test", [0, 0]), - ([1], "test", [0, 0]), - ([1, 2, 3], "test", [0, 0]), - ([None, None, None], "test", [0, 0]), - ([1, 2], "test", [2, 2]), - ], -) -def test_check_list_pars_exception(vals, name, min_vals): - step = ResampleStep() - - with pytest.raises(ValueError): - step._check_list_pars(vals, name, min_vals) - - -@pytest.mark.parametrize( - """pixel_area, pixel_scale_ratio, - expected_steradians, expected_arcsecsq""", - [ - # Happy path tests - (1.0, 2.0, 4.0, 4.0), - (2.0, 0.5, 0.5, 0.5), - (0.0, 2.0, 0.0, 0.0), - (1.0, 0.0, 0.0, 0.0), - (None, 2.0, None, 4.0), - (1.0, 2.0, 4.0, None), - ], -) -def test_update_phot_keywords( - pixel_area, - pixel_scale_ratio, - expected_steradians, - expected_arcsecsq, -): - step = ResampleStep() - model = MockModel(pixel_area, pixel_scale_ratio) - - step.update_phot_keywords(model) - - assert model.meta.photometry.pixel_area == expected_steradians - - @pytest.mark.parametrize( "good_bits, dq_array, expected_output", [ @@ -370,3 +242,140 @@ def test_build_driz_weight_different_weight_type(base_image, weight_type): } np.testing.assert_array_almost_equal(expected_results.get(weight_type), result) + + +def test_individual_image_meta(base_image): + """Test that the individual_image_meta is being populated""" + input_models = ModelLibrary([base_image() for _ in range(2)]) + output_model = ResampleStep().run(input_models) + + # Assert sizes are expected + n_inputs = len(input_models) + for value in output_model.meta.individual_image_meta.values(): + assert isinstance(value, QTable) + assert len(value) == n_inputs + + # Assert spot check on filename, which is different for each mock input + basic_table = output_model.meta.individual_image_meta.basic + with input_models: + for idx, input in enumerate(input_models): + assert input.meta.filename == basic_table["filename"][idx] + input_models.shelve(input, index=idx) + + assert "background" in output_model.meta.individual_image_meta + + +@pytest.mark.parametrize( + "meta_overrides, expected_basic", + [ + ( # 2 exposures, share visit, etc + ( + { + "meta.observation.visit": 1, + "meta.observation.pass": 1, + "meta.observation.segment": 1, + "meta.observation.program": 1, + "meta.instrument.optical_element": "F158", + "meta.instrument.name": "WFI", + }, + { + "meta.observation.visit": 1, + "meta.observation.pass": 1, + "meta.observation.segment": 1, + "meta.observation.program": 1, + "meta.instrument.optical_element": "F158", + "meta.instrument.name": "WFI", + }, + ), + { + "visit": 1, + "pass": 1, + "segment": 1, + "optical_element": "F158", + "instrument": "WFI", + }, + ), + ( # 2 exposures, different metadata + ( + { + "meta.observation.visit": 1, + "meta.observation.pass": 1, + "meta.observation.segment": 1, + "meta.observation.program": 1, + "meta.instrument.optical_element": "F158", + "meta.instrument.name": "WFI", + }, + { + "meta.observation.visit": 2, + "meta.observation.pass": 2, + "meta.observation.segment": 2, + "meta.observation.program": 2, + "meta.instrument.optical_element": "F062", + "meta.instrument.name": "WFI", + }, + ), + { + "visit": 1, + "pass": 1, + "segment": 1, + "optical_element": "F158", + "instrument": "WFI", + }, + ), + ], +) +def test_populate_mosaic_basic(base_image, meta_overrides, expected_basic): + """Test that the basic mosaic metadata is being populated""" + models = [] + for i, meta_override in enumerate(meta_overrides): + model = base_image() + + model.meta.observation.observation_id = i + + model.meta.exposure.start_time = Time(59000 + i, format="mjd") + model.meta.exposure.end_time = Time(59001 + i, format="mjd") + model.meta.exposure.mid_time = Time( + (model.meta.exposure.start_time.mjd + model.meta.exposure.end_time.mjd) / 2, + format="mjd", + ) + + for key, value in meta_override.items(): + *parent_keys, child_key = key.split(".") + setattr(reduce(getattr, parent_keys, model), child_key, value) + models.append(model) + + input_models = ModelLibrary(models) + output_model = ResampleStep().run(input_models) + + assert ( + output_model.meta.basic.time_first_mjd == models[0].meta.exposure.start_time.mjd + ) + assert ( + output_model.meta.basic.time_last_mjd == models[-1].meta.exposure.end_time.mjd + ) + assert output_model.meta.basic.time_mean_mjd == np.mean( + [m.meta.exposure.mid_time.mjd for m in models] + ) + + for key, value in expected_basic.items(): + assert getattr(output_model.meta.basic, key) == value + + +@pytest.mark.parametrize( + "input_pixel_area, pixel_scale_ratio, expected_pixel_area", + [ + # (None, 1.0, None), # this cannot be tested since it causes the step to crash + (1.0, 1.0, -999999.0), + (1.0, 2.0, -999999.0 * 4.0), + ], +) +def test_pixel_area_update( + base_image, input_pixel_area, pixel_scale_ratio, expected_pixel_area +): + # if input model has a non-None pixel resample should scale it by the square of the pixel_scale_ratio + model = base_image() + model.meta.photometry.pixel_area = input_pixel_area + output_model = ResampleStep(pixel_scale_ratio=pixel_scale_ratio).run( + ModelLibrary([model]) + ) + assert output_model.meta.photometry.pixel_area == expected_pixel_area