diff --git a/aurora/pipelines/process_mth5.py b/aurora/pipelines/process_mth5.py index 564597f1..1acea698 100644 --- a/aurora/pipelines/process_mth5.py +++ b/aurora/pipelines/process_mth5.py @@ -288,6 +288,60 @@ def save_fourier_coefficients(dec_level_config, row, run_obj, stft_obj): return +def get_spectrogams(tfk, i_dec_level, units="MT"): + """ + Can be make a method of TFK + + Parameters + ---------- + tfk: TransferFunctionKernel + i_dec_level: integer + units: "MT" or "SI", likely to be deprecated + + Returns + ------- + dict of short time fourier transforms + """ + + stfts = {} + stfts["local"] = [] + stfts["remote"] = [] + + # Check first if TS processing or accessing FC Levels + for i, row in tfk.dataset_df.iterrows(): + # This iterator could be updated to iterate over row-pairs if remote is True, + # corresponding to simultaneous data + + if not tfk.is_valid_dataset(row, i_dec_level): + continue + + run_obj = row.mth5_obj.from_reference(row.run_reference) + if row.fc: + stft_obj = load_stft_obj_from_mth5(i_dec_level, row, run_obj) + stfts = append_chunk_to_stfts(stfts, stft_obj, row.remote) + continue + + run_xrds = row["run_dataarray"].to_dataset("channel") + + # Musgraves workaround for old MT data + triage_run_id(row.run_id, run_obj) + + stft_obj = make_stft_objects( + tfk.config, + i_dec_level, + run_obj, + run_xrds, + units, + ) + + # Pack FCs into h5 + dec_level_config = tfk.config.decimations[i_dec_level] + save_fourier_coefficients(dec_level_config, row, run_obj, stft_obj) + stfts = append_chunk_to_stfts(stfts, stft_obj, row.remote) + + return stfts + + def process_mth5( config, tfk_dataset=None, @@ -343,42 +397,9 @@ def process_mth5( for i_dec_level, dec_level_config in enumerate(tfk.valid_decimations()): # if not tfk.all_fcs_already_exist(): tfk.update_dataset_df(i_dec_level) - dec_level_config = tfk.apply_clock_zero(dec_level_config) - - stfts = {} - stfts["local"] = [] - stfts["remote"] = [] + tfk.apply_clock_zero(dec_level_config) - # Check first if TS processing or accessing FC Levels - for i, row in tfk.dataset_df.iterrows(): - # This iterator could be updated to iterate over row-pairs if remote is True, - # corresponding to simultaneous data - - if not tfk.is_valid_dataset(row, i_dec_level): - continue - - run_obj = row.mth5_obj.from_reference(row.run_reference) - if row.fc: - stft_obj = load_stft_obj_from_mth5(i_dec_level, row, run_obj) - stfts = append_chunk_to_stfts(stfts, stft_obj, row.remote) - continue - - run_xrds = row["run_dataarray"].to_dataset("channel") - - # Musgraves workaround for old MT data - triage_run_id(row.run_id, run_obj) - - stft_obj = make_stft_objects( - tfk.config, - i_dec_level, - run_obj, - run_xrds, - units, - ) - # Pack FCs into h5 - save_fourier_coefficients(dec_level_config, row, run_obj, stft_obj) - - stfts = append_chunk_to_stfts(stfts, stft_obj, row.remote) + stfts = get_spectrogams(tfk, i_dec_level, units=units) local_merged_stft_obj, remote_merged_stft_obj = merge_stfts(stfts, tfk)