diff --git a/aurora/transfer_function/kernel_dataset.py b/aurora/transfer_function/kernel_dataset.py index 4ea21fe0..ff4c353c 100644 --- a/aurora/transfer_function/kernel_dataset.py +++ b/aurora/transfer_function/kernel_dataset.py @@ -3,51 +3,54 @@ Each mth5 has a mth5_obj.channel_summary dataframe which tells what data are available. Here we use a compressed view of this df with one line per acquisition run. I've been -calling that a "run_summary". That object could be moved to mth5, so that each mth5 -has a mth5_obj.run_summary dataframe. As of Mar 29, 2023 a RunSummary is available at the -station level in mth5, but the aurora version is still being used. This should be merged if possible -so that aurora uses the built-in mth5 method. +calling that a "run_summary". The run_summary provides options for the local and possibly remote reference stations. - Candidates for local station are the unique value in the station column. - *It maybe that we need to groupby survey & station, for now I am considering station - names to be unique. +Candidates for local station are the unique values in the station column. - For any given candidate station, there are some integer n runs available. - This yields 2**n - 1 possible combinations that can be processed, neglecting any - flagging of time intervals within any run, or any joining of runs. - (There are actually 2**n, but we ignore the empty set, so -1) +For any given candidate station, there are some integer n runs available. +This yields 2^n - 1 possible combinations that can be processed, neglecting any +flagging of time intervals within any run, or any joining of runs. +(There are actually 2**n, but we ignore the empty set, so -1) - Intuition suggests default ought to be to process n runs in n+1 configurations: - {all runs} + each run individually. This will give a bulk answer, and bad runs can - be flagged by comparing them. After an initial processing, the tfs can be reviewed - and the problematic runs can be addressed. +Intuition suggests default ought to be to process n runs in n+1 configurations: +{all runs} + each run individually. This will give a bulk answer, and bad runs can +be flagged by comparing them. After an initial processing, the tfs can be reviewed +and the problematic runs can be addressed. The user can interact with the run_summary_df, selecting sub dataframes via querying, and in future maybe via some GUI (or a spreadsheet). The process looks like this: -0. Start with a list of mth5s -1. Extract channel_summaries from each mth5 and join them vertically -2. Compress to a run_summay -3. Stare at the run_summary_df & Select a station "S" to process -4. Given "S"", select a non-empty set of runs for that station -5. Select a remote reference "RR", (this is allowed to be empty) -6. Extract the sub-dataframe corresponding to you local_station acquistion_runs, -and the remote station acquition runs -7. If the remote is non-empty, -a) Drop the runs (rows) associated with the remote that DO NOT intersect with local -b) restrict the start/end times of the remote runs that DO intersect with the -local so that overlap is complete. -c) restrict start/end times of the local runs so that they DO intersect with remote -8. This is now a TFKernel Dataset Definition (ish). Initialize a default -processing object and pass it this df: -cc = ConfigCreator() -p = cc.create_from_kernel_dataset(kernel_dataset, emtf_band_file=emtf_band_setup_file) -9. Edit the Processing Config appropriately, - -ToDo: Consider supporting a default value for 'channel_scale_factors' that is None, + 0. Start with a list of mth5s + 1. Extract channel_summaries from each mth5 and join them vertically + 2. Compress to a run_summary + 3. Stare at the run_summary_df & Select a station "S" to process + 4. Select a non-empty set of runs for station "S" + 5. Select a remote reference "RR", (this is allowed to be empty) + 6. Extract the sub-dataframe corresponding to acquisition_runs from "S" and "RR" + 7. If the remote is non-empty: + - Drop the runs (rows) associated with RR that DO NOT intersect with S + - Restrict start/end times of RR runs that intersect with S so overlap is complete. + - Restrict start/end times of S runs so that they DO intersect with remote + 8. This is now a TFKernel Dataset Definition (ish). Initialize a default processing + object and pass it this df: + ``` + >>> cc = ConfigCreator() + >>> p = cc.create_from_kernel_dataset(kernel_dataset) + # Optionally pass emtf_band_file=emtf_band_setup_file + 9. Edit the Processing Config appropriately, + +TODO: Consider supporting a default value for 'channel_scale_factors' that is None, + +TODO: As of March 2023 a RunSummary is available at the station level in mth5, but + the aurora version is still being used. This should be merged if possible so that + aurora uses the built-in mth5 method. + +TODO: Might need to groupby survey & station, for now I am considering station + names to be unique. + """ import copy @@ -74,11 +77,10 @@ class KernelDataset: This class is closely related to (may actually be an extension of) RunSummary The main idea is to specify one or two stations, and a list of acquisition "runs" - that can be merged into a "processing run". - Each acquistion run can be further divided into non-overlapping chunks by specifying - time-intervals associated with that acquistion run. An empty iterable of - time-intervals associated with a run is interpretted as the interval - corresponding to the entire run. + that can be merged into a "processing run". Each acquisition run can be further + divided into non-overlapping chunks by specifying time-intervals associated with + that acquisition run. An empty iterable of time-intervals associated with a run + is interpreted as the interval corresponding to the entire run. The time intervals can be used for several purposes but primarily: To specify contiguous chunks of data for: @@ -86,7 +88,7 @@ class KernelDataset: 2. binding together into xarray time series, for eventual gap fill (and then STFT) 3. managing and analyse the availability of reference time series - The basic data strucutre can be represented as a table or as a tree: + The basic data structure can be represented as a table or as a tree: Station <-- run <-- [Intervals], This is described in issue #118 https://github.com/simpeg/aurora/issues/118 @@ -113,6 +115,16 @@ class KernelDataset: """ def __init__(self, **kwargs): + """ + + Parameters + ---------- + kwargs: dict + Checked-for keys are ["local_station_id", "remote_station_id", "df"] + Can be ignored. Not usually used, normally the initialize these values with + from_run_summary() method. + + """ self.df = kwargs.get("df") self.local_station_id = kwargs.get("local_station_id") self.remote_station_id = kwargs.get("remote_station_id") @@ -144,9 +156,6 @@ def from_run_summary(self, run_summary, local_station_id, remote_station_id=None remote_station_id: string Label of the remote reference station - Returns - ------- - """ self.local_station_id = local_station_id self.remote_station_id = remote_station_id @@ -177,7 +186,7 @@ def from_run_summary(self, run_summary, local_station_id, remote_station_id=None f"Local {local_station_id} and remote {remote_station_id} do not overlap, " f"Remote reference processing not a valid option" ) - logger.warning(msg) + logger.error(msg) raise ValueError(msg) else: self._add_duration_column() @@ -215,6 +224,9 @@ def _add_duration_column(self): self.df["duration"] = durations return + def _update_duration_column(self): + self._add_duration_column() + def drop_runs_shorter_than(self, duration, units="s"): """ This needs to have duration refreshed before hand @@ -228,7 +240,8 @@ def drop_runs_shorter_than(self, duration, units="s"): """ if units != "s": - raise NotImplementedError + msg = "Expected units are seconds : units='s'" + raise NotImplementedError(msg) if "duration" not in self.df.columns: self._add_duration_column() drop_cond = self.df.duration < duration @@ -237,7 +250,18 @@ def drop_runs_shorter_than(self, duration, units="s"): return def select_station_runs(self, station_runs_dict, keep_or_drop): - df = select_station_runs(self.df, station_runs_dict, keep_or_drop) + """ + + Parameters + ---------- + station_runs_dict + keep_or_drop + + Returns + ------- + + """ + df = _select_station_runs(self.df, station_runs_dict, keep_or_drop) self.df = df return @@ -259,7 +283,7 @@ def restrict_run_intervals_to_simultaneous(self): """ For each run in local_station_id we check if it has overlap with other runs - There is room for optimiztion here + There is room for optimization here Note that you can wind up splitting runs here. For example, in that case where local is running continuously, but remote is intermittent. Then the local @@ -489,9 +513,11 @@ def restrict_to_station_list(df, station_ids, inplace=True): Parameters ---------- + df: pd.DataFrame + a run summary dataframer station_ids: str or list of strings These are the station ids to keep, normally local and remote - overwrite: bool + inplace: bool If True, self.df is overwritten with the reduced dataframe Returns @@ -510,22 +536,15 @@ def restrict_to_station_list(df, station_ids, inplace=True): return df -def select_station_runs( +def _select_station_runs( df, station_runs_dict, keep_or_drop, overwrite=True, ): """ - Drops all rows where station_id==station_id, and run_id is NOT in the provided - list of keep_run_ids. Operates on a deepcopy df if inplace=False - Uncommon use case the way this is coded, because it will restrict to a single - station processing case. Better to use drop runs, or a dict-style input - - Note1: Logic of keep/drop - keep where cond1 is false - keep where cond1 & cond2 both true - drop where cond1 is true but cond2 is false + Partition the rows of df based on the contents of station_runs_dict and return + one of the two partitions (based on value of keep_or_drop). Parameters ---------- @@ -533,14 +552,15 @@ def select_station_runs( Keys are string ids of the stations to keep Values are lists of string labels for run_ids to keep keep_or_drop: str - If "keep": returns df with only the station_rus specified in station_runs_dict + If "keep": returns df with only the station-runs specified in station_runs_dict If "drop": returns df with station_runs_dict excised overwrite: bool If True, self.df is overwritten with the reduced dataframe Returns ------- - reduced dataframe with only run_ids provided removed. + df: pd.DataFrame + reduced dataframe with only run_ids provided removed. """ if not overwrite: @@ -593,7 +613,7 @@ def overlap(t1start, t1end, t2start, t2end): Parameters ---------- - t1start + t1start: t1end t2start t2end diff --git a/tests/transfer_function/test_kernel_dataset.py b/tests/transfer_function/test_kernel_dataset.py index e27282f7..cf65c652 100644 --- a/tests/transfer_function/test_kernel_dataset.py +++ b/tests/transfer_function/test_kernel_dataset.py @@ -1,9 +1,10 @@ -import logging -import pathlib +import pandas as pd import unittest from aurora.pipelines.run_summary import RunSummary from aurora.test_utils.synthetic.make_mth5_from_asc import create_test12rr_h5 +from aurora.transfer_function.kernel_dataset import intervals_overlap +from aurora.transfer_function.kernel_dataset import overlap from aurora.transfer_function.kernel_dataset import KernelDataset @@ -54,6 +55,62 @@ def test_clone(self): # add more checks +class TestOverlapFunctions(unittest.TestCase): + @classmethod + def setUpClass(self): + """ + Pick some time intervals and test that the overlap logic is correct + + Returns + ------- + + """ + # A day long interal + self.ti1_start = pd.Timestamp(1980, 1, 1, 12, 30, 0) + self.ti1_end = pd.Timestamp(1980, 1, 2, 12, 30, 0) + self.shift_1_hours = 5 + self.shift_2_hours = 25 + # hours + + # shift the interval forward, leave it overlapping + self.ti2_start = self.ti1_start + pd.Timedelta(hours=self.shift_1_hours) + self.ti2_end = self.ti1_end + pd.Timedelta(hours=self.shift_1_hours) + + # shift the interval forward, non-verlapping + self.ti3_start = self.ti1_start + pd.Timedelta(hours=self.shift_2_hours) + self.ti3_end = self.ti1_end + pd.Timedelta(hours=self.shift_2_hours) + + def test_overlaps_boolean(self): + self.assertTrue( + intervals_overlap( + self.ti1_start, self.ti1_end, self.ti2_start, self.ti2_end + ) + ) + + self.assertFalse( + intervals_overlap( + self.ti1_start, self.ti1_end, self.ti3_start, self.ti3_end + ) + ) + + def test_overlap_returns_interval(self): + """ + TODO: there are four cases being handled --- + add a subtest for each of the four + Returns + ------- + + """ + # This test corresponds to the second line in the if/elif logic. + tmp = overlap(self.ti1_start, self.ti1_end, self.ti2_start, self.ti2_end) + self.assertTrue( + tmp[0] == self.ti1_start + pd.Timedelta(hours=self.shift_1_hours) + ) + self.assertTrue(tmp[1] == self.ti1_end) + + # TODO To test first line, we need t1 to completely enclose t2 + + def main(): unittest.main()