Skip to content

Commit

Permalink
Reorganize entry point logic in process_mth5
Browse files Browse the repository at this point in the history
- Update process_mth5 so check_if_fc_levels_already_exist is done before initializing mth5s
- factor logic in TFKernel so that the actual check in the mth5 object is a standalone function
- address old Note#3 in check_if_fc_levels_already_exist (ambiguity eliminated)
  • Loading branch information
kkappler committed Jan 17, 2024
1 parent 542d377 commit 53f0481
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 93 deletions.
20 changes: 7 additions & 13 deletions aurora/pipelines/process_mth5.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,27 +316,21 @@ def process_mth5(
tfk.make_processing_summary()
tfk.show_processing_summary()
tfk.validate()
tfk.check_if_fc_levels_already_exist()
# Check if FC Levels Exist Then Initialize mth5s

# See Note #1
if tfk.config.decimations[0].save_fcs:
mth5_mode = "a"
else:
mth5_mode = "r"
tfk.initialize_mth5s(mode=mth5_mode)
try:
tfk.check_if_fc_levels_already_exist() # populate the "fc" column of dataset_df
msg = f"fc_levels_already_exist = {tfk.dataset_df['fc'].iloc[0]}"
logger.info(msg)
msg = (
f"Processing config indicates {len(tfk.config.decimations)} "
f"decimation levels"
)
logger.info(msg)
except:
msg = "WARNING -- Unable to execute check for FC Levels"
msg = f"{msg} Possibly FCs not present at all (file from old MTH5 version)?"
logger.warning(msg)

msg = (
f"Processing config indicates {len(tfk.config.decimations)} "
f"decimation levels"
)
logger.info(msg)
tf_dict = {}

for i_dec_level, dec_level_config in enumerate(tfk.valid_decimations()):
Expand Down
176 changes: 96 additions & 80 deletions aurora/pipelines/transfer_function_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from aurora.pipelines.time_series_helpers import prototype_decimate
from mth5.utils.exceptions import MTH5Error
from mth5.utils.helpers import initialize_mth5
from mth5.utils.helpers import path_or_mth5_object
from mt_metadata.transfer_functions.core import TF
from loguru import logger

Expand Down Expand Up @@ -89,7 +90,7 @@ def update_dataset_df(self, i_dec_level):
"""
This function has two different modes. The first mode initializes values in the
array, and could be placed into TFKDataset.initialize_time_series_data()
The second mode, decimates. The function is kept in pipelines becasue it calls
The second mode, decimates. The function is kept in pipelines because it calls
time series operations.
Notes:
Expand Down Expand Up @@ -150,35 +151,25 @@ def apply_clock_zero(self, dec_level_config):

def check_if_fc_levels_already_exist(self):
"""
Iterate over the processing summary_df, grouping by unique "Survey-Station-Run"s.
(Could also iterate over kernel_dataset.dataframe, to get the groupby).
If all FC Levels for a given station-run are already built, mark the RunSummary with a True in
the "fc" column. Otherwise its False
Fills out the "fc" column of dataset dataframe with True/False.
Note 1: Because decimation is a cascading operation, we avoid the case where some (valid) decimation
levels exist in the mth5 FC archive and others do not. The maximum granularity tolerated will be at the
"station-run level, so for a given run, either all relevant FCs are packed into the h5 or we treat as if none
of them are. Sounds harsh, but if you want to add the logic otherwise, feel free. If one wanted to support
variations at the decimation-level, an appropriate way to address would be to store teh decimated time series
in the archive as well (they would simply be runs with different sample rates, and some extra filters).
If all FC Levels for a given station-run are already built, mark True otherwise False.
Note 2: At this point in the logic, it is established that there are FCs associated with run_id and there are
at least as many FC decimation levels as we require as per the processing config. The next step is to
assert whether it is True that the existing FCs conform to the recipe in the processing config.
Iterates over the processing summary_df, grouping by unique "Survey-Station-Run"s.
(Could also iterate over kernel_dataset.dataframe, to get the groupby).
Note #3: Need to update mth5_objs dict so that it is keyed by survey, then station, else might break when
mixing in data from other surveys (if the stations are named the same. This can be addressed in
the initialize_mth5s() method of TFK. When addresssing the above issue -- consider adding the mth5_obj to
self.dataset_df instead of keeping the dict around ..., the concern about doing this is that multiple rows
of the dataset_df may refernece the same h5, and I don't know if updating one row will have unintended
consequences.
Note 1: Because decimation is a cascading operation, we avoid the case where some (valid) decimation
levels exist in the mth5 FC archive and others do not. The maximum granularity is the
"station-run" level. For a given run, either all relevant FCs are in the h5 or we treat as if none
of them are. To support variations at the decimation-level, an appropriate way to address would
be to store decimated time series in the archive as well (they would simply be runs with different sample
rates, and some extra filters).
Note #4: associated_run_sub_df may have multiple rows, even though the run id is unique.
Note #2: run_sub_df may have multiple rows, even though the run id is unique.
This could happen for example when you have a long run at the local station, but multiple (say two) shorter runs
at the reference station. In that case, the processing summary will have a separate row for the
intersection of the long run with each of the remote runs. We ignore this for now, selecting only the first
element of the associated_run_sub_df, under the assumption that FCs have been created for the entire run,
element of the run_sub_df, under the assumption that FCs have been created for the entire run,
or not at all. This assumption can be relaxed in future by using the time_period attribute of the FC layer.
For now, we proceed with the all-or-none logic. That is, if a ['survey', 'station_id', 'run_id',] has FCs,
assume that the FCs are present for the entire run. We assign the "fc" column of dataset_df to have the same
Expand All @@ -199,71 +190,37 @@ def check_if_fc_levels_already_exist(self):
cond1 = self.dataset_df.survey == survey_id
cond2 = self.dataset_df.station_id == station_id
cond3 = self.dataset_df.run_id == run_id
associated_run_sub_df = self.dataset_df[cond1 & cond2 & cond3]
run_sub_df = self.dataset_df[cond1 & cond2 & cond3]

if len(associated_run_sub_df) > 1:
# See Note #4
msg = "Not all runs will processed as a continuous chunk "
if len(run_sub_df) > 1:
# See Note #2
msg = "Not all runs will process as a continuous chunk "
msg += "-- in future may need to loop over runlets to check for FCs"
logger.warning(msg)

dataset_df_indices = np.r_[associated_run_sub_df.index]
run_row = associated_run_sub_df.iloc[0]
row_ssr_str = f"survey: {run_row.survey}, station_id: {run_row.station_id}, run_id: {run_row.run_id}"

# See Note #3 above relating to mixing multiple surveys in a processing scheme
mth5_obj = self.mth5_objs[station_id]
survey_obj = mth5_obj.get_survey(survey_id)
station_obj = survey_obj.stations_group.get_station(station_id)
if not station_obj.fourier_coefficients_group.groups_list:
msg = f"Fourier coefficients not detected for {row_ssr_str}"
msg += "-- Fourier coefficients will be computed"
logger.info(msg)
self.dataset_df.loc[dataset_df_indices, "fc"] = False
dataset_df_indices = np.r_[run_sub_df.index]
remote = run_sub_df.remote.iloc[0]
mth5_path = run_sub_df.mth5_path.iloc[0]
fcs_present = mth5_has_fcs(
mth5_path, survey_id, station_id, run_id, remote, self.processing_config
)
self.dataset_df.loc[dataset_df_indices, "fc"] = fcs_present

if self.dataset_df["fc"].any():
if self.dataset_df["fc"].all():
msg = "All fc_levels already exist"
msg += "Skip time series processing is OK"
else:
msg = (
"Fourier Coefficients detected -- "
"checking if they satisfy processing requirements..."
)
logger.info(msg)

# Assume FC Groups are keyed by run_id, check if there is a relevant group
try:
fc_group = station_obj.fourier_coefficients_group.get_fc_group(
run_id
)
except MTH5Error:
self.dataset_df.loc[dataset_df_indices, "fc"] = False
msg = f"Run ID {run_id} not found in FC Groups, -- will need to compute FCs"
logger.info(msg)
continue

if (
len(fc_group.groups_list)
< self.processing_config.num_decimation_levels
):
self.dataset_df.loc[dataset_df_indices, "fc"] = False
logger.info(
f"Not enough FC Groups available for {row_ssr_str} -- will need to build them "
)
continue

# Can check time periods here if desired, but unique (survey, station, run) should make this unneeded
# processing_run = self.processing_config.stations.local.get_run(run_id)
# for tp in processing_run.time_periods:
# assert tp in fc_group time periods

# See note #2
fcs_already_there = fc_group.supports_aurora_processing_config(
self.processing_config, run_row.remote
)
self.dataset_df.loc[dataset_df_indices, "fc"] = fcs_already_there
msg = f"Some, but not all fc_levels already exist = {self.dataset_df['fc']}"
else:
msg = "FC levels not present"
logger.info(msg)

return

def show_processing_summary(
self,
omit_columns=[
omit_columns=(
"mth5_path",
"channel_scale_factors",
"start",
Expand All @@ -273,12 +230,12 @@ def show_processing_summary(
"num_samples_overlap",
"num_samples_advance",
"run_dataarray",
],
),
):
columns_to_show = self.processing_summary.columns
columns_to_show = [x for x in columns_to_show if x not in omit_columns]
logger.info("Processing Summary Dataframe:")
logger.info(self.processing_summary[columns_to_show].to_string())
logger.info(f"\n{self.processing_summary[columns_to_show].to_string()}")

def make_processing_summary(self):
"""
Expand Down Expand Up @@ -611,3 +568,62 @@ def memory_warning(self):
return True
else:
return False


# ###################################################################
# ######## Helper Functions #########################################
# ###################################################################


@path_or_mth5_object
def mth5_has_fcs(m, survey_id, station_id, run_id, remote, processing_config):
"""
Checks if all needed fc-levels for survey-station-run are present under processing_config
Note #1: At this point in the logic, it is established that there are FCs associated with run_id and there are
at least as many FC decimation levels as we require as per the processing config. The next step is to
assert whether it is True that the existing FCs conform to the recipe in the processing config.
Parameters
----------
m
survey_id
station_id
run_id
dataset_df
Returns
-------
"""
row_ssr_str = f"survey: {survey_id}, station: {station_id}, run: {run_id}"
survey_obj = m.get_survey(survey_id)
station_obj = survey_obj.stations_group.get_station(station_id)
if not station_obj.fourier_coefficients_group.groups_list:
msg = f"Fourier coefficients not detected for {row_ssr_str}"
msg += "-- Fourier coefficients will be computed"
logger.info(msg)
return False

logger.info("FCs detected -- checking against processing requirements.")
try:
fc_group = station_obj.fourier_coefficients_group.get_fc_group(run_id)
except MTH5Error:
msg = f"Run {run_id} not found in FC Groups -- need to compute FCs"
logger.info(msg)
return False

if len(fc_group.groups_list) < processing_config.num_decimation_levels:
msg = f"Not enough FC Groups found for {row_ssr_str} -- will build them"
return False

# Can check time periods here if desired, but unique (survey, station, run) should make this unneeded
# processing_run = self.processing_config.stations.local.get_run(run_id)
# for tp in processing_run.time_periods:
# assert tp in fc_group time periods

# See note #1
fcs_already_there = fc_group.supports_aurora_processing_config(
processing_config, remote
)
return fcs_already_there

0 comments on commit 53f0481

Please sign in to comment.