Skip to content

Commit

Permalink
add some docstrings/tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kkappler committed May 18, 2024
1 parent 057f1b0 commit e6bc99e
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 64 deletions.
144 changes: 82 additions & 62 deletions aurora/transfer_function/kernel_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,54 @@
Each mth5 has a mth5_obj.channel_summary dataframe which tells what data are available.
Here we use a compressed view of this df with one line per acquisition run. I've been
calling that a "run_summary". That object could be moved to mth5, so that each mth5
has a mth5_obj.run_summary dataframe. As of Mar 29, 2023 a RunSummary is available at the
station level in mth5, but the aurora version is still being used. This should be merged if possible
so that aurora uses the built-in mth5 method.
calling that a "run_summary".
The run_summary provides options for the local and possibly remote reference stations.
Candidates for local station are the unique value in the station column.
*It maybe that we need to groupby survey & station, for now I am considering station
names to be unique.
Candidates for local station are the unique values in the station column.
For any given candidate station, there are some integer n runs available.
This yields 2**n - 1 possible combinations that can be processed, neglecting any
flagging of time intervals within any run, or any joining of runs.
(There are actually 2**n, but we ignore the empty set, so -1)
For any given candidate station, there are some integer n runs available.
This yields 2^n - 1 possible combinations that can be processed, neglecting any
flagging of time intervals within any run, or any joining of runs.
(There are actually 2**n, but we ignore the empty set, so -1)
Intuition suggests default ought to be to process n runs in n+1 configurations:
{all runs} + each run individually. This will give a bulk answer, and bad runs can
be flagged by comparing them. After an initial processing, the tfs can be reviewed
and the problematic runs can be addressed.
Intuition suggests default ought to be to process n runs in n+1 configurations:
{all runs} + each run individually. This will give a bulk answer, and bad runs can
be flagged by comparing them. After an initial processing, the tfs can be reviewed
and the problematic runs can be addressed.
The user can interact with the run_summary_df, selecting sub dataframes via querying,
and in future maybe via some GUI (or a spreadsheet).
The process looks like this:
0. Start with a list of mth5s
1. Extract channel_summaries from each mth5 and join them vertically
2. Compress to a run_summay
3. Stare at the run_summary_df & Select a station "S" to process
4. Given "S"", select a non-empty set of runs for that station
5. Select a remote reference "RR", (this is allowed to be empty)
6. Extract the sub-dataframe corresponding to you local_station acquistion_runs,
and the remote station acquition runs
7. If the remote is non-empty,
a) Drop the runs (rows) associated with the remote that DO NOT intersect with local
b) restrict the start/end times of the remote runs that DO intersect with the
local so that overlap is complete.
c) restrict start/end times of the local runs so that they DO intersect with remote
8. This is now a TFKernel Dataset Definition (ish). Initialize a default
processing object and pass it this df:
cc = ConfigCreator()
p = cc.create_from_kernel_dataset(kernel_dataset, emtf_band_file=emtf_band_setup_file)
9. Edit the Processing Config appropriately,
ToDo: Consider supporting a default value for 'channel_scale_factors' that is None,
0. Start with a list of mth5s
1. Extract channel_summaries from each mth5 and join them vertically
2. Compress to a run_summary
3. Stare at the run_summary_df & Select a station "S" to process
4. Select a non-empty set of runs for station "S"
5. Select a remote reference "RR", (this is allowed to be empty)
6. Extract the sub-dataframe corresponding to acquisition_runs from "S" and "RR"
7. If the remote is non-empty:
- Drop the runs (rows) associated with RR that DO NOT intersect with S
- Restrict start/end times of RR runs that intersect with S so overlap is complete.
- Restrict start/end times of S runs so that they DO intersect with remote
8. This is now a TFKernel Dataset Definition (ish). Initialize a default processing
object and pass it this df:
```
>>> cc = ConfigCreator()
>>> p = cc.create_from_kernel_dataset(kernel_dataset)
# Optionally pass emtf_band_file=emtf_band_setup_file
9. Edit the Processing Config appropriately,
TODO: Consider supporting a default value for 'channel_scale_factors' that is None,
TODO: As of March 2023 a RunSummary is available at the station level in mth5, but
the aurora version is still being used. This should be merged if possible so that
aurora uses the built-in mth5 method.
TODO: Might need to groupby survey & station, for now I am considering station
names to be unique.
"""

import copy
Expand All @@ -74,19 +77,18 @@ class KernelDataset:
This class is closely related to (may actually be an extension of) RunSummary
The main idea is to specify one or two stations, and a list of acquisition "runs"
that can be merged into a "processing run".
Each acquistion run can be further divided into non-overlapping chunks by specifying
time-intervals associated with that acquistion run. An empty iterable of
time-intervals associated with a run is interpretted as the interval
corresponding to the entire run.
that can be merged into a "processing run". Each acquisition run can be further
divided into non-overlapping chunks by specifying time-intervals associated with
that acquisition run. An empty iterable of time-intervals associated with a run
is interpreted as the interval corresponding to the entire run.
The time intervals can be used for several purposes but primarily:
To specify contiguous chunks of data for:
1. STFT, that will be made into merged FC data structures
2. binding together into xarray time series, for eventual gap fill (and then STFT)
3. managing and analyse the availability of reference time series
The basic data strucutre can be represented as a table or as a tree:
The basic data structure can be represented as a table or as a tree:
Station <-- run <-- [Intervals],
This is described in issue #118 https://github.com/simpeg/aurora/issues/118
Expand All @@ -113,6 +115,16 @@ class KernelDataset:
"""

def __init__(self, **kwargs):
"""
Parameters
----------
kwargs: dict
Checked-for keys are ["local_station_id", "remote_station_id", "df"]
Can be ignored. Not usually used, normally the initialize these values with
from_run_summary() method.
"""
self.df = kwargs.get("df")
self.local_station_id = kwargs.get("local_station_id")
self.remote_station_id = kwargs.get("remote_station_id")
Expand Down Expand Up @@ -144,9 +156,6 @@ def from_run_summary(self, run_summary, local_station_id, remote_station_id=None
remote_station_id: string
Label of the remote reference station
Returns
-------
"""
self.local_station_id = local_station_id
self.remote_station_id = remote_station_id
Expand Down Expand Up @@ -177,7 +186,7 @@ def from_run_summary(self, run_summary, local_station_id, remote_station_id=None
f"Local {local_station_id} and remote {remote_station_id} do not overlap, "
f"Remote reference processing not a valid option"
)
logger.warning(msg)
logger.error(msg)
raise ValueError(msg)
else:
self._add_duration_column()
Expand Down Expand Up @@ -215,6 +224,9 @@ def _add_duration_column(self):
self.df["duration"] = durations
return

def _update_duration_column(self):
self._add_duration_column()

def drop_runs_shorter_than(self, duration, units="s"):
"""
This needs to have duration refreshed before hand
Expand All @@ -228,7 +240,8 @@ def drop_runs_shorter_than(self, duration, units="s"):
"""
if units != "s":
raise NotImplementedError
msg = "Expected units are seconds : units='s'"
raise NotImplementedError(msg)
if "duration" not in self.df.columns:
self._add_duration_column()
drop_cond = self.df.duration < duration
Expand All @@ -237,7 +250,18 @@ def drop_runs_shorter_than(self, duration, units="s"):
return

def select_station_runs(self, station_runs_dict, keep_or_drop):
df = select_station_runs(self.df, station_runs_dict, keep_or_drop)
"""
Parameters
----------
station_runs_dict
keep_or_drop
Returns
-------
"""
df = _select_station_runs(self.df, station_runs_dict, keep_or_drop)
self.df = df
return

Expand All @@ -259,7 +283,7 @@ def restrict_run_intervals_to_simultaneous(self):
"""
For each run in local_station_id we check if it has overlap with other runs
There is room for optimiztion here
There is room for optimization here
Note that you can wind up splitting runs here. For example, in that case where
local is running continuously, but remote is intermittent. Then the local
Expand Down Expand Up @@ -489,9 +513,11 @@ def restrict_to_station_list(df, station_ids, inplace=True):
Parameters
----------
df: pd.DataFrame
a run summary dataframer
station_ids: str or list of strings
These are the station ids to keep, normally local and remote
overwrite: bool
inplace: bool
If True, self.df is overwritten with the reduced dataframe
Returns
Expand All @@ -510,37 +536,31 @@ def restrict_to_station_list(df, station_ids, inplace=True):
return df


def select_station_runs(
def _select_station_runs(
df,
station_runs_dict,
keep_or_drop,
overwrite=True,
):
"""
Drops all rows where station_id==station_id, and run_id is NOT in the provided
list of keep_run_ids. Operates on a deepcopy df if inplace=False
Uncommon use case the way this is coded, because it will restrict to a single
station processing case. Better to use drop runs, or a dict-style input
Note1: Logic of keep/drop
keep where cond1 is false
keep where cond1 & cond2 both true
drop where cond1 is true but cond2 is false
Partition the rows of df based on the contents of station_runs_dict and return
one of the two partitions (based on value of keep_or_drop).
Parameters
----------
station_runs_dict: dict
Keys are string ids of the stations to keep
Values are lists of string labels for run_ids to keep
keep_or_drop: str
If "keep": returns df with only the station_rus specified in station_runs_dict
If "keep": returns df with only the station-runs specified in station_runs_dict
If "drop": returns df with station_runs_dict excised
overwrite: bool
If True, self.df is overwritten with the reduced dataframe
Returns
-------
reduced dataframe with only run_ids provided removed.
df: pd.DataFrame
reduced dataframe with only run_ids provided removed.
"""

if not overwrite:
Expand Down Expand Up @@ -593,7 +613,7 @@ def overlap(t1start, t1end, t2start, t2end):
Parameters
----------
t1start
t1start:
t1end
t2start
t2end
Expand Down
61 changes: 59 additions & 2 deletions tests/transfer_function/test_kernel_dataset.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import pathlib
import pandas as pd
import unittest

from aurora.pipelines.run_summary import RunSummary
from aurora.test_utils.synthetic.make_mth5_from_asc import create_test12rr_h5
from aurora.transfer_function.kernel_dataset import intervals_overlap
from aurora.transfer_function.kernel_dataset import overlap
from aurora.transfer_function.kernel_dataset import KernelDataset


Expand Down Expand Up @@ -54,6 +55,62 @@ def test_clone(self):
# add more checks


class TestOverlapFunctions(unittest.TestCase):
@classmethod
def setUpClass(self):
"""
Pick some time intervals and test that the overlap logic is correct
Returns
-------
"""
# A day long interal
self.ti1_start = pd.Timestamp(1980, 1, 1, 12, 30, 0)
self.ti1_end = pd.Timestamp(1980, 1, 2, 12, 30, 0)
self.shift_1_hours = 5
self.shift_2_hours = 25
# hours

# shift the interval forward, leave it overlapping
self.ti2_start = self.ti1_start + pd.Timedelta(hours=self.shift_1_hours)
self.ti2_end = self.ti1_end + pd.Timedelta(hours=self.shift_1_hours)

# shift the interval forward, non-verlapping
self.ti3_start = self.ti1_start + pd.Timedelta(hours=self.shift_2_hours)
self.ti3_end = self.ti1_end + pd.Timedelta(hours=self.shift_2_hours)

def test_overlaps_boolean(self):
self.assertTrue(
intervals_overlap(
self.ti1_start, self.ti1_end, self.ti2_start, self.ti2_end
)
)

self.assertFalse(
intervals_overlap(
self.ti1_start, self.ti1_end, self.ti3_start, self.ti3_end
)
)

def test_overlap_returns_interval(self):
"""
TODO: there are four cases being handled ---
add a subtest for each of the four
Returns
-------
"""
# This test corresponds to the second line in the if/elif logic.
tmp = overlap(self.ti1_start, self.ti1_end, self.ti2_start, self.ti2_end)
self.assertTrue(
tmp[0] == self.ti1_start + pd.Timedelta(hours=self.shift_1_hours)
)
self.assertTrue(tmp[1] == self.ti1_end)

# TODO To test first line, we need t1 to completely enclose t2


def main():
unittest.main()

Expand Down

0 comments on commit e6bc99e

Please sign in to comment.