Skip to content

Commit

Permalink
Merge doc from #337 for test
Browse files Browse the repository at this point in the history
  • Loading branch information
kkappler committed Jul 23, 2024
1 parent cf88f7c commit 8e994aa
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 89 deletions.
139 changes: 87 additions & 52 deletions aurora/time_series/apodization_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@
import numpy as np
import scipy.signal as ssig
from loguru import logger
from typing import Optional, Union


class ApodizationWindow(object):
"""
Instantiate an apodization window object. Example usages:
Instantiate an apodization window object.
Example usages:
apod_window = ApodizationWindow()
taper=ApodizationWindow(taper_family='hanning', num_samples_window=55 )
Expand All @@ -75,32 +78,42 @@ class ApodizationWindow(object):
ENBW: Effective Noise BandWidth, see Equation (22)
NENBW Normalized Equivalent Noise BandWidth, see Equation (21)
Parameters
----------
taper_family : string
Specify the taper type - boxcar, kaiser, hanning, etc
num_samples_window : int
The number of samples in the taper
taper : numpy array
The actual window coefficients themselves. This can be passed if a
particular custom window is desired.
additional_args: dictionary
These are any additional requirements scipy needs in order to
generate the window.
"""

def __init__(self, **kwargs):
def __init__(
self,
taper_family: Optional[str] = "boxcar",
num_samples_window: Optional[
int
] = 0, # TODO: this should probably init to None
taper: Optional[np.ndarray] = np.empty(0),
taper_additional_args: Optional[Union[dict, None]] = None,
**kwargs, # needed as a passthrough argument to WindowingScheme
):

"""
Constructor
Parameters
----------
kwargs: dict
See parameters list in class level documentation above
taper_family : string
Specify the taper type - boxcar, kaiser, hanning, etc
num_samples_window : int
The number of samples in the taper
taper : numpy array
The actual window coefficients themselves. This can be passed if a
particular custom window is desired.
additional_args: dictionary
These are any additional requirements scipy needs in order to
generate the window.
"""
self.taper_family = kwargs.get("taper_family", "boxcar")
self._num_samples_window = kwargs.get("num_samples_window", 0)
self._taper = kwargs.get("taper", np.empty(0))
self.additional_args = kwargs.get("taper_additional_args", {})
self.taper_family = taper_family
self._num_samples_window = num_samples_window
self._taper = taper
if taper_additional_args is None:
taper_additional_args = {}
self.additional_args = taper_additional_args

self._coherent_gain = None
self._nenbw = None
Expand All @@ -112,8 +125,13 @@ def __init__(self, **kwargs):
self.make()

@property
def summary(self):
def summary(self) -> str:
"""
Returns a string summarizing the window properties
TODO: This should be the __str__(), i.e. the readable version
https://stackoverflow.com/questions/1436703/what-is-the-difference-between-str-and-repr
Returns
-------
out_str: str
Expand All @@ -129,19 +147,25 @@ def summary(self):

def __str__(self):
"""
? __repr__?
returns the taper values as a string
TODO: this should be __repr__ (unambiguous)
https://stackoverflow.com/questions/1436703/what-is-the-difference-between-str-and-repr
"""
return f"{self.taper}"

@property
def num_samples_window(self):
def num_samples_window(self) -> int:
"""returns the window length in samples"""
if self._num_samples_window == 0:
self._num_samples_window = len(self.taper)
return self._num_samples_window

def make(self):
def make(self) -> None:
"""
this is just a wrapper call to scipy.signal
A wrapper call to scipy.signal
Note: see scipy.signal.get_window for a description of what is
expected in args[1:]. http://docs.scipy.org/doc/scipy/reference/
generated/scipy.signal.get_window.html
Expand All @@ -152,55 +176,67 @@ def make(self):
window_args = [v for k, v in self.additional_args.items()]
window_args.insert(0, self.taper_family)
window_args = tuple(window_args)
# print(f"\n\nWINDOW args {window_args}")

self.taper = ssig.get_window(window_args, self.num_samples_window)
self.apodization_factor # calculate
return
self.apodization_factor # calculate -- 20240723: this should not be needed

@property
def S1(self):
"""sum of the window coefficients"""
def S1(self) -> float:
"""Return the sum of the window coefficients"""
if getattr(self, "_S1", None) is None:
self._S1 = sum(self.taper)
return self._S1

@property
def S2(self):
"""sum of squares of the window coefficients"""
def S2(self) -> float:
"""Retursn the sum of squares of the window coefficients."""
if getattr(self, "_S2", None) is None:
self._S2 = sum(self.taper**2)
return self._S2

@property
def coherent_gain(self):
"""DC gain of the window normalized by window length"""
def coherent_gain(self) -> float:
"""Returns the DC gain of the window normalized by window length."""
return self.S1 / self.num_samples_window

@property
def nenbw(self):
"""NENBW Normalized Equivalent Noise BandWidth, see Equation (21) in
Heinzel et al 2002"""
def nenbw(self) -> float:
"""
Returns the Normalized Equivalent Noise BandWidth
Notes:
a.k.a NENBW, see Equation (21) in Heinzel et al. 2002.
"""
return self.num_samples_window * self.S2 / (self.S1**2)

def enbw(self, fs):
def enbw(self, fs) -> float:
"""
Notes that unlike NENBW, CG, S1, S2, this is not a pure property of the
Return the "Equivalent Noise Bandwidth of the window.
Notes:
Effective Noise BandWidth = fs*NENBW/N = fs S2/(S1**2)
Unlike NENBW, CG, S1, S2, this is not a pure property of the
window -- but instead this is a property of the window combined with
the sample rate.
Parameters
----------
fs : sampling frequency (1/dt)
fs : float
The sampling frequency (1/dt)
Returns
-------
enbw: float
"""
"""Effective Noise BandWidth = fs*NENBW/N = fs S2/(S1**2)"""

return fs * self.S2 / (self.S1**2)

def test_linear_spectral_density_factor(self):
"""This is just a test to verify some algebra
def test_linear_spectral_density_factor(self) -> None:
"""T
This is just a test to verify some algebra
TODO Move this into tests
Claim:
The lsd_calibration factors
A (1./coherent\_gain)\*np.sqrt((2\*dt)/(nenbw\*N))
Expand All @@ -216,10 +252,6 @@ def test_linear_spectral_density_factor(self):
which I show in githib aurora issue #3 via .
(CG\*\*2) \* NENBW \*N = S2
Returns
-------
"""
lsd_factor1 = (1.0 / self.coherent_gain) * np.sqrt(
1.0 / (self.nenbw * self.num_samples_window)
Expand All @@ -231,17 +263,20 @@ def test_linear_spectral_density_factor(self):
raise Exception

@property
def taper(self):
def taper(self) -> np.ndarray:
"""Returns the values of the taper as a numpy array"""
return self._taper

@taper.setter
def taper(self, x):
def taper(self, x: np.ndarray) -> None:
"""Sets the value of the taper"""
self._taper = x
self._S1 = None
self._S2 = None

@property
def apodization_factor(self):
def apodization_factor(self) -> float:
"""Returns the apodization factor (computes if it is None)"""
if self._apodization_factor is None:
self._apodization_factor = np.sqrt(self.nenbw) * self.coherent_gain
return self._apodization_factor
12 changes: 11 additions & 1 deletion aurora/time_series/decorators.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
"""
This module is a place to put decorators used by methods in aurora.
It is a work in progress.
Development notes:
| Here is the decorator pattern
| def decorator(func):
| @functools.wraps(func)
Expand All @@ -8,6 +13,7 @@
| # Do something after
| return value
| return wrapper_decorator
"""

import functools
Expand All @@ -16,7 +22,11 @@

def can_use_xr_dataarray(func):
"""
Intended as a decorator. Most of the windowed time series methods are
Decorator to allow functions that operate on xr.Dataset to also operate
on xr.DataArray, by converting xr.DataArray to xr.Dataset, calling the
function, and then converting back to xr.DataArray
Most of the windowed time series methods are
intended to work with xarray.Dataset class. But I would like to be able
to pass them xarray.DataArray objects. This class casts a DataArray to a
Dataset, runs it through func and casts back to a DataArray.
Expand Down
Empty file.
39 changes: 23 additions & 16 deletions aurora/time_series/frequency_band_helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import numpy as np
"""
This module contains functions that are associated with time series of Fourier coefficients
"""
# import numpy as np
from loguru import logger


Expand Down Expand Up @@ -47,11 +51,14 @@ def get_band_for_tf_estimate(band, dec_level_config, local_stft_obj, remote_stft

def extract_band(frequency_band, fft_obj, channels=[], epsilon=1e-7):
"""
Stand alone method that operates on an xr.DataArray, and is wrapped with Spectrogram
Extracts a frequency band from xr.DataArray representing a spectrogram.
Stand alone version of the method that is used by WIP Spectrogram class.
Note #1: 20230902
drop=True does not play nice with h5py and Dataset, results in a type error.
Development Notes:
#1: 20230902
TODO: Decide if base dataset object should be a xr.DataArray (not xr.Dataset)
- drop=True does not play nice with h5py and Dataset, results in a type error.
File "stringsource", line 2, in h5py.h5r.Reference.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
However, it works OK with DataArray, so maybe use data array in general
Expand Down Expand Up @@ -87,35 +94,34 @@ def extract_band(frequency_band, fft_obj, channels=[], epsilon=1e-7):

def check_time_axes_synched(X, Y):
"""
Utility function for checking that time axes agree
Utility function for checking that time axes agree.
Raises ValueError if axes do not agree.
It is critical that X, Y, RR have the same time axes for aurora processing.
Parameters
----------
X : xarray
Y : xarray
Returns
-------
"""
"""
It is critical that X, Y, RR have the same time axes here
Returns
-------
"""
if (X.time == Y.time).all():
pass
else:
logger.warning("WARNING - NAN Handling could fail if X,Y dont share time axes")
raise Exception
msg = "Time axes of arrays not identical"
# "NAN Handling could fail if X,Y dont share time axes"
logger.warning(msg)
raise ValueError(msg)
return


def adjust_band_for_coherence_sorting(frequency_band, spectrogram, rule="min3"):
"""
WIP: Intended to broaden band to allow more FCs for spectral features
- used in coherence sorting and general feature extraction
Parameters
----------
frequency_band
Expand Down Expand Up @@ -250,4 +256,5 @@ def adjust_band_for_coherence_sorting(frequency_band, spectrogram, rule="min3"):


def cross_spectra(X, Y):
"""WIP: Returns the cross power spectra between two arrays"""
return X.conj() * Y
Loading

0 comments on commit 8e994aa

Please sign in to comment.