From 011ac55794e8ea9602e59ea86936ee15c1e954e7 Mon Sep 17 00:00:00 2001 From: "Karl N. Kappler" Date: Wed, 17 Jan 2024 15:31:53 -0800 Subject: [PATCH] Update FC Logic - add a property to TFK returns a boolean if all_fcs_already_exist - check all_fcs_already_exist at start of process_mth5 - if all_fcs_already_exist, do not build fc layers - move fc_level checking into TFK, out of process_mth5 - remove mode kwarg from tfk.initialize_mth5s(), get this info from tfk.get_mth5_file_open_mode() - kernel_dataset initializes "fc" column to None, rather than False - boolean state is set during check_if_fcs_already_exist --- aurora/pipelines/process_mth5.py | 38 ++++++++++++-------- aurora/pipelines/transfer_function_kernel.py | 35 +++++++++++++++--- aurora/transfer_function/kernel_dataset.py | 2 +- 3 files changed, 55 insertions(+), 20 deletions(-) diff --git a/aurora/pipelines/process_mth5.py b/aurora/pipelines/process_mth5.py index 3879670e..564597f1 100644 --- a/aurora/pipelines/process_mth5.py +++ b/aurora/pipelines/process_mth5.py @@ -235,6 +235,27 @@ def load_stft_obj_from_mth5(i_dec_level, row, run_obj): def save_fourier_coefficients(dec_level_config, row, run_obj, stft_obj): + """ + + + Note #1: Logic for building FC layers: + If the processing config decimation_level.save_fcs_type = "h5" and fc_levels_already_exist is False, then open + in append mode, else open in read mode. We should support a flag: force_rebuild_fcs, normally False. This flag + is only needed when save_fcs_type=="h5". If True, then we open in append mode, regarless of fc_levels_already_exist + The task of setting mode="a", mode="r" can be handled by tfk (maybe in tfk.validate()) + + Parameters + ---------- + dec_level_config + row + run_obj + stft_obj + + Returns + ------- + + """ + if not dec_level_config.save_fcs: msg = "Skip saving FCs. dec_level_config.save_fc = " msg = f"{msg} {dec_level_config.save_fcs}" @@ -279,13 +300,6 @@ def process_mth5( This is the main method used to transform a processing_config, and a kernel_dataset into a transfer function estimate. - Note 1: Logic for building FC layers: - If the processing config decimation_level.save_fcs_type = "h5" and fc_levels_already_exist is False, then open - in append mode, else open in read mode. We should support a flag: force_rebuild_fcs, normally False. This flag - is only needed when save_fcs_type=="h5". If True, then we open in append mode, regarless of fc_levels_already_exist - The task of setting mode="a", mode="r" can be handled by tfk (maybe in tfk.validate()) - - Parameters ---------- config: mt_metadata.transfer_functions.processing.aurora.Processing or path to json @@ -316,15 +330,8 @@ def process_mth5( tfk.make_processing_summary() tfk.show_processing_summary() tfk.validate() - tfk.check_if_fc_levels_already_exist() - # Check if FC Levels Exist Then Initialize mth5s - # See Note #1 - if tfk.config.decimations[0].save_fcs: - mth5_mode = "a" - else: - mth5_mode = "r" - tfk.initialize_mth5s(mode=mth5_mode) + tfk.initialize_mth5s() msg = ( f"Processing config indicates {len(tfk.config.decimations)} " @@ -334,6 +341,7 @@ def process_mth5( tf_dict = {} for i_dec_level, dec_level_config in enumerate(tfk.valid_decimations()): + # if not tfk.all_fcs_already_exist(): tfk.update_dataset_df(i_dec_level) dec_level_config = tfk.apply_clock_zero(dec_level_config) diff --git a/aurora/pipelines/transfer_function_kernel.py b/aurora/pipelines/transfer_function_kernel.py index 83ed9c21..4c8ae3df 100644 --- a/aurora/pipelines/transfer_function_kernel.py +++ b/aurora/pipelines/transfer_function_kernel.py @@ -57,7 +57,7 @@ def mth5_objs(self): self.initialize_mth5s() return self._mth5_objs - def initialize_mth5s(self, mode="r"): + def initialize_mth5s(self): """ returns a dict of open mth5 objects, keyed by station_id @@ -70,7 +70,7 @@ def initialize_mth5s(self, mode="r"): local station id : mth5.mth5.MTH5 remote station id: mth5.mth5.MTH5 """ - + mode = self.get_mth5_file_open_mode() local_mth5_obj = initialize_mth5( self.config.stations.local.mth5_path, mode=mode ) @@ -149,7 +149,17 @@ def apply_clock_zero(self, dec_level_config): dec_level_config.window.clock_zero = str(self.dataset_df.start.min()) return dec_level_config - def check_if_fc_levels_already_exist(self): + @property + def all_fcs_already_exist(self): + if self.kernel_dataset.df["fc"].isna().any(): + self.check_if_fcs_already_exist() + + # these should all be booleans now + assert not self.kernel_dataset.df["fc"].isna().any() + + return self.kernel_dataset.df.fc.all() + + def check_if_fcs_already_exist(self): """ Fills out the "fc" column of dataset dataframe with True/False. @@ -374,7 +384,8 @@ def validate(self): self.validate_processing() self.validate_decimation_scheme_and_dataset_compatability() if self.memory_warning(): - raise Exception + raise Exception("Job requires too much memory") + self.validate_save_fc_settings() def valid_decimations(self): """ @@ -393,6 +404,22 @@ def valid_decimations(self): logger.info(msg) return dec_levels + def validate_save_fc_settings(self): + if self.all_fcs_already_exist: + msg = "FC Layer already exists -- forcing processing config save_fcs=False" + logger.info(msg) + for dec_level_config in self.config.decimations: + # if dec_level_config.save_fcs: + dec_level_config.save_fcs = False + + def get_mth5_file_open_mode(self): + if self.all_fcs_already_exist: + return "r" + elif self.config.decimations[0].save_fcs: + return "a" + else: + return "r" + def is_valid_dataset(self, row, i_dec): """ Given a row from the RunSummary, answers: diff --git a/aurora/transfer_function/kernel_dataset.py b/aurora/transfer_function/kernel_dataset.py index e997a150..0f427ba3 100644 --- a/aurora/transfer_function/kernel_dataset.py +++ b/aurora/transfer_function/kernel_dataset.py @@ -181,7 +181,7 @@ def from_run_summary(self, run_summary, local_station_id, remote_station_id=None raise ValueError(msg) else: self._add_duration_column() - self.df["fc"] = False + self.df["fc"] = None @property def mini_summary(self):