Skip to content

Commit

Permalink
Update FC Logic
Browse files Browse the repository at this point in the history
- add a property to TFK returns a boolean if all_fcs_already_exist
- check all_fcs_already_exist at start of process_mth5
- if all_fcs_already_exist, do not build fc layers
- move fc_level checking into TFK, out of process_mth5
- remove mode kwarg from tfk.initialize_mth5s(), get this info from tfk.get_mth5_file_open_mode()
- kernel_dataset initializes "fc" column to None, rather than False
  - boolean state is set during check_if_fcs_already_exist
  • Loading branch information
kkappler committed Jan 17, 2024
1 parent 4351708 commit 011ac55
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 20 deletions.
38 changes: 23 additions & 15 deletions aurora/pipelines/process_mth5.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,27 @@ def load_stft_obj_from_mth5(i_dec_level, row, run_obj):


def save_fourier_coefficients(dec_level_config, row, run_obj, stft_obj):
"""
Note #1: Logic for building FC layers:
If the processing config decimation_level.save_fcs_type = "h5" and fc_levels_already_exist is False, then open
in append mode, else open in read mode. We should support a flag: force_rebuild_fcs, normally False. This flag
is only needed when save_fcs_type=="h5". If True, then we open in append mode, regarless of fc_levels_already_exist
The task of setting mode="a", mode="r" can be handled by tfk (maybe in tfk.validate())
Parameters
----------
dec_level_config
row
run_obj
stft_obj
Returns
-------
"""

if not dec_level_config.save_fcs:
msg = "Skip saving FCs. dec_level_config.save_fc = "
msg = f"{msg} {dec_level_config.save_fcs}"
Expand Down Expand Up @@ -279,13 +300,6 @@ def process_mth5(
This is the main method used to transform a processing_config,
and a kernel_dataset into a transfer function estimate.
Note 1: Logic for building FC layers:
If the processing config decimation_level.save_fcs_type = "h5" and fc_levels_already_exist is False, then open
in append mode, else open in read mode. We should support a flag: force_rebuild_fcs, normally False. This flag
is only needed when save_fcs_type=="h5". If True, then we open in append mode, regarless of fc_levels_already_exist
The task of setting mode="a", mode="r" can be handled by tfk (maybe in tfk.validate())
Parameters
----------
config: mt_metadata.transfer_functions.processing.aurora.Processing or path to json
Expand Down Expand Up @@ -316,15 +330,8 @@ def process_mth5(
tfk.make_processing_summary()
tfk.show_processing_summary()
tfk.validate()
tfk.check_if_fc_levels_already_exist()
# Check if FC Levels Exist Then Initialize mth5s

# See Note #1
if tfk.config.decimations[0].save_fcs:
mth5_mode = "a"
else:
mth5_mode = "r"
tfk.initialize_mth5s(mode=mth5_mode)
tfk.initialize_mth5s()

msg = (
f"Processing config indicates {len(tfk.config.decimations)} "
Expand All @@ -334,6 +341,7 @@ def process_mth5(
tf_dict = {}

for i_dec_level, dec_level_config in enumerate(tfk.valid_decimations()):
# if not tfk.all_fcs_already_exist():
tfk.update_dataset_df(i_dec_level)
dec_level_config = tfk.apply_clock_zero(dec_level_config)

Expand Down
35 changes: 31 additions & 4 deletions aurora/pipelines/transfer_function_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def mth5_objs(self):
self.initialize_mth5s()
return self._mth5_objs

def initialize_mth5s(self, mode="r"):
def initialize_mth5s(self):
"""
returns a dict of open mth5 objects, keyed by station_id
Expand All @@ -70,7 +70,7 @@ def initialize_mth5s(self, mode="r"):
local station id : mth5.mth5.MTH5
remote station id: mth5.mth5.MTH5
"""

mode = self.get_mth5_file_open_mode()
local_mth5_obj = initialize_mth5(
self.config.stations.local.mth5_path, mode=mode
)
Expand Down Expand Up @@ -149,7 +149,17 @@ def apply_clock_zero(self, dec_level_config):
dec_level_config.window.clock_zero = str(self.dataset_df.start.min())
return dec_level_config

def check_if_fc_levels_already_exist(self):
@property
def all_fcs_already_exist(self):
if self.kernel_dataset.df["fc"].isna().any():
self.check_if_fcs_already_exist()

# these should all be booleans now
assert not self.kernel_dataset.df["fc"].isna().any()

return self.kernel_dataset.df.fc.all()

def check_if_fcs_already_exist(self):
"""
Fills out the "fc" column of dataset dataframe with True/False.
Expand Down Expand Up @@ -374,7 +384,8 @@ def validate(self):
self.validate_processing()
self.validate_decimation_scheme_and_dataset_compatability()
if self.memory_warning():
raise Exception
raise Exception("Job requires too much memory")
self.validate_save_fc_settings()

def valid_decimations(self):
"""
Expand All @@ -393,6 +404,22 @@ def valid_decimations(self):
logger.info(msg)
return dec_levels

def validate_save_fc_settings(self):
if self.all_fcs_already_exist:
msg = "FC Layer already exists -- forcing processing config save_fcs=False"
logger.info(msg)
for dec_level_config in self.config.decimations:
# if dec_level_config.save_fcs:
dec_level_config.save_fcs = False

def get_mth5_file_open_mode(self):
if self.all_fcs_already_exist:
return "r"
elif self.config.decimations[0].save_fcs:
return "a"
else:
return "r"

def is_valid_dataset(self, row, i_dec):
"""
Given a row from the RunSummary, answers:
Expand Down
2 changes: 1 addition & 1 deletion aurora/transfer_function/kernel_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def from_run_summary(self, run_summary, local_station_id, remote_station_id=None
raise ValueError(msg)
else:
self._add_duration_column()
self.df["fc"] = False
self.df["fc"] = None

@property
def mini_summary(self):
Expand Down

0 comments on commit 011ac55

Please sign in to comment.