From 886cac6381c1e87bee630cc12023ba3724d13bbc Mon Sep 17 00:00:00 2001 From: "Karl N. Kappler" Date: Sat, 16 Mar 2024 07:22:11 -0700 Subject: [PATCH] Cleanup and codecov - rename df to less ambiguous frequency_increment - add solve_single_time_window to helper functions - add a test for solve_single_time_window - factor z-time-series calculation out of coherence into spectral features --- aurora/pipelines/transfer_function_kernel.py | 7 + aurora/time_series/frequency_band_helpers.py | 14 +- .../regression/helper_functions.py | 31 ++++ .../weights/coherence_weights.py | 134 +----------------- .../weights/spectral_features.py | 102 +++++++++++++ 5 files changed, 150 insertions(+), 138 deletions(-) create mode 100644 aurora/transfer_function/weights/spectral_features.py diff --git a/aurora/pipelines/transfer_function_kernel.py b/aurora/pipelines/transfer_function_kernel.py index 2d544388..4e03454e 100644 --- a/aurora/pipelines/transfer_function_kernel.py +++ b/aurora/pipelines/transfer_function_kernel.py @@ -411,6 +411,13 @@ def validate_save_fc_settings(self): for dec_level_config in self.config.decimations: # if dec_level_config.save_fcs: dec_level_config.save_fcs = False + + # TODO: Add logic here that checks if remote reference station is in processing and save_FCs is True + # -- logger.critical() This is not a supported configuration (but it might just work!) + # -- basically, you would need to have runs at the local and remote station be simultaneous to work + # -- if they overlap in a janky way, then the saved FCs will be janky + # -- recommend that you apply single station processing to local and remote station with save FCs =True + # -- and then run remote reference processing (with save FCs =True) return def get_mth5_file_open_mode(self): diff --git a/aurora/time_series/frequency_band_helpers.py b/aurora/time_series/frequency_band_helpers.py index 9c71cb3e..fe8a5e52 100644 --- a/aurora/time_series/frequency_band_helpers.py +++ b/aurora/time_series/frequency_band_helpers.py @@ -185,8 +185,8 @@ def adjust_band_for_coherence_sorting(frequency_band, spectrogram, rule="min3"): logger.warning("Cant evaluate coherence with only 1 harmonic") logger.info(f"Widening band according to {rule} rule") if rule == "min3": - band.frequency_min -= spectrogram.df - band.frequency_max += spectrogram.df + band.frequency_min -= spectrogram.frequency_increment + band.frequency_max += spectrogram.frequency_increment else: msg = f"Band adjustment rule {rule} not recognized" logger.error(msg) @@ -264,18 +264,18 @@ class Spectrogram(object): def __init__(self, dataset=None): self._dataset = dataset - self._df = None + self._frequency_increment = None @property def dataset(self): return self._dataset @property - def delta_freq(self): - if self._df is None: + def frequency_increment(self): + if self._frequency_increment is None: frequency_axis = self.dataset.frequency - self._df = frequency_axis.data[1] - frequency_axis.data[0] - return self._df + self._frequency_increment = frequency_axis.data[1] - frequency_axis.data[0] + return self._frequency_increment def num_harmonics_in_band(self, frequency_band, epsilon=1e-7): """ diff --git a/aurora/transfer_function/regression/helper_functions.py b/aurora/transfer_function/regression/helper_functions.py index 08d45b08..e4390358 100644 --- a/aurora/transfer_function/regression/helper_functions.py +++ b/aurora/transfer_function/regression/helper_functions.py @@ -38,3 +38,34 @@ def rme_beta(r0): """ beta = 1.0 - np.exp(-r0) return beta + + +def solve_single_time_window(Y, X, R=None): + """ + Cast problem Y = Xb into scipy.linalg.solve form which solves: a @ x = b + - This function is used for testing vectorized, direct solver. + + + Parameters + ---------- + Y: numpy.ndarray + The "output" of regression problem. For testing this often has shape (2,). + X: numpy.ndarray + The "input" of regression problem. For testing this often has shape (2,2). + R: numpy.ndarray or None + Remote reference channels (optional) + + Returns + ------- + z: numpy.ndarray + The TF estimate -- Usually two complex numbers, (Zxx, Zxy), or (Zyx, Zyy) + + """ + if R is None: + xH = X.conjugate().transpose() + else: + xH = R.conjugate().transpose() + a = xH @ X + b = xH @ Y + z = np.linalg.solve(a, b) + return z diff --git a/aurora/transfer_function/weights/coherence_weights.py b/aurora/transfer_function/weights/coherence_weights.py index a0d8f11f..fe33f378 100644 --- a/aurora/transfer_function/weights/coherence_weights.py +++ b/aurora/transfer_function/weights/coherence_weights.py @@ -21,6 +21,9 @@ from aurora.time_series.frequency_band_helpers import adjust_band_for_coherence_sorting from aurora.time_series.frequency_band_helpers import Spectrogram +from aurora.transfer_function.weights.spectral_features import ( + estimate_time_series_of_impedances, +) from collections import namedtuple from loguru import logger @@ -313,137 +316,6 @@ def multiple_coherence_channel_sets(local_or_remote): elif local_or_remote == "remote": return (CoherenceChannel("remote", "hx"), CoherenceChannel("remote", "hy")) - def solve_single_time_window(Y, X, R=None): - """remember scipy.linalg.solve solves: a @ x == b""" - if R is None: - xH = X.conjugate().transpose() - else: - xH = R.conjugate().transpose() - a = xH @ X - b = xH @ Y - z = np.linalg.solve(a, b) - return z - - def estimate_time_series_of_impedances(band, output_ch="ex", use_remote=True): - """ - solve: - - [] = [, ] [Zxx] - [] [, ] [Zyx] - - [a, b] - [c, d] - - determinant where det(A) = 1/(ad-bc) - - :param band: band dataset: xarray, will be spectrogram in future - :return: - - Requires some nomenclature setup... for now just hard code:/ - - TODO: Note that cross powers can be computed once only by using Spectrogram class - and spectrogram.cross_power("CH1", "CH2") - which returns self._ch1_ch2 - which initialized to None and is written when requested - - :param band_dataset: - :return: - """ - - def cross_power_series(ch1, ch2): - """ summed along frequnecy""" - return (ch1.conjugate().transpose() * ch2).sum(dim="frequency") - - # def multiply_2x2xN_by_2x1xN(m2x2, m2x1): - # """ - # This is sort of a sad function. There are a few places in TF estimation, where we would - # like to do matrix multiplication, of a 2x1 array on the left by a 2x2. If we want to do - # this tens of thousands of times, the options are for-looping (too slow!), reshape the problem - # to sparse matrix and do all at once (matrix too big when built naively!) or this function. - # - # An alternative, becuase the multiplication is very straigghtforward - # [a11 a12] [b1] = [a11*b1 + a12*b2] - # [a21 a22] [b2] = [a11*b1 + a12*b2] - # We can just assign the values vectorially. - # - # :param a2x2: - # :param b2x1: - # :return: ab2x1 - # """ - # pass - - # Start by computing relevant cross powers - if use_remote: - rx = band["rx"] - ry = band["ry"] - else: - rx = band["hx"] - ry = band["hy"] - rxex = cross_power_series(rx, band["ex"]) - ryex = cross_power_series(ry, band["ex"]) - rxhx = cross_power_series(rx, band["hx"]) - ryhx = cross_power_series(ry, band["hx"]) - rxhy = cross_power_series(rx, band["hy"]) - ryhy = cross_power_series(ry, band["hy"]) - - N = len(rxex) - # Compute determinants (one per time window) - # Note that when no remote reference (rx=hx, ry=hy) then det is - # the product of the auto powers in h minus the product of the cross powers - det = rxhx * ryhy - rxhy * ryhx - det = np.real(det) - - # Construct the Inverse matrices (2 x 2 x N) - inverse_matrices = np.zeros((2, 2, N), dtype=np.complex128) - inverse_matrices[0, 0, :] = ryhy / det - inverse_matrices[1, 1, :] = rxhx / det - inverse_matrices[0, 1, :] = -rxhy / det - inverse_matrices[1, 0, :] = -ryhx / det - - Zxx = ( - inverse_matrices[0, 0, :] * rxex.data - + inverse_matrices[0, 1, :] * ryex.data - ) - Zxy = ( - inverse_matrices[1, 0, :] * rxex.data - + inverse_matrices[1, 1, :] * ryex.data - ) - - # Below code is a spot check that the calculation of Zxx and Zxy from the "fast vectorized" - # method above is consistent with for-looping on np.solve - - # Set up the problem in terms of linalg.solve() - idx = 0 # a time-window index to check - E = band["ex"][idx, :] - H = band[["hx", "hy"]].to_array()[:, idx].T - HH = H.conj().transpose() - a = HH.data @ H.data - b = HH.data @ E.data - - # Solve using direct inverse - inv_a = np.linalg.inv(a) - zz0 = inv_a @ b - # solve using linalg.solve - zz = solve_single_time_window(b, a, None) - # compare the two solutions - assert np.isclose(np.abs(zz0 - zz), 0, atol=1e-10) - - # Estimate multiple coherence with linalg.solve soln - solved_residual = E - H[:, 0] * zz[0] - H[:, 1] * zz[1] - solved_residual_energy = (np.abs(solved_residual) ** 2).sum() - output_energy = (np.abs(E) ** 2).sum() - multiple_coherence_1 = solved_residual_energy / output_energy - print("solve", multiple_coherence_1) - # Estimate multiple coherence with Zxx Zxy - homebrew_residual = E - H[:, 0] * Zxx[0] - H[:, 1] * Zxy[0] - homebrew_residual_energy = (np.abs(homebrew_residual) ** 2).sum() - multiple_coherence_2 = homebrew_residual_energy / output_energy - print(multiple_coherence_2) - assert np.isclose( - multiple_coherence_1.data, multiple_coherence_2.data, 0, atol=1e-14 - ) - return Zxx, Zxy - # cutoff_type = "threshold" cutoffs = {} cutoffs["local"] = {} diff --git a/aurora/transfer_function/weights/spectral_features.py b/aurora/transfer_function/weights/spectral_features.py new file mode 100644 index 00000000..d81874d6 --- /dev/null +++ b/aurora/transfer_function/weights/spectral_features.py @@ -0,0 +1,102 @@ +import numpy as np + +from aurora.transfer_function.regression.helper_functions import ( + solve_single_time_window, +) + + +def estimate_time_series_of_impedances(band, output_ch="ex", use_remote=True): + """ + solve: + + [] = [, ] [Zxx] + [] [, ] [Zyx] + + [a, b] + [c, d] + + determinant where det(A) = 1/(ad-bc) + + :param band: band dataset: xarray, will be spectrogram in future + :return: + + Requires some nomenclature setup... for now just hard code:/ + + TODO: Note that cross powers can be computed once only by using Spectrogram class + and spectrogram.cross_power("CH1", "CH2") + which returns self._ch1_ch2 + which initialized to None and is written when requested + + :param band_dataset: + :return: + """ + + def cross_power_series(ch1, ch2): + """ summed along frequnecy""" + return (ch1.conjugate().transpose() * ch2).sum(dim="frequency") + + # Start by computing relevant cross powers + if use_remote: + rx = band["rx"] + ry = band["ry"] + else: + rx = band["hx"] + ry = band["hy"] + rxex = cross_power_series(rx, band["ex"]) + ryex = cross_power_series(ry, band["ex"]) + rxhx = cross_power_series(rx, band["hx"]) + ryhx = cross_power_series(ry, band["hx"]) + rxhy = cross_power_series(rx, band["hy"]) + ryhy = cross_power_series(ry, band["hy"]) + + N = len(rxex) + # Compute determinants (one per time window) + # Note that when no remote reference (rx=hx, ry=hy) then det is + # the product of the auto powers in h minus the product of the cross powers + det = rxhx * ryhy - rxhy * ryhx + det = np.real(det) + + # Construct the Inverse matrices (2 x 2 x N) + inverse_matrices = np.zeros((2, 2, N), dtype=np.complex128) + inverse_matrices[0, 0, :] = ryhy / det + inverse_matrices[1, 1, :] = rxhx / det + inverse_matrices[0, 1, :] = -rxhy / det + inverse_matrices[1, 0, :] = -ryhx / det + + Zxx = inverse_matrices[0, 0, :] * rxex.data + inverse_matrices[0, 1, :] * ryex.data + Zxy = inverse_matrices[1, 0, :] * rxex.data + inverse_matrices[1, 1, :] * ryex.data + + # Below code is a spot check that the calculation of Zxx and Zxy from the "fast vectorized" + # method above is consistent with for-looping on np.solve + + # Set up the problem in terms of linalg.solve() + idx = 0 # a time-window index to check + E = band["ex"][idx, :] + H = band[["hx", "hy"]].to_array()[:, idx].T + HH = H.conj().transpose() + a = HH.data @ H.data + b = HH.data @ E.data + + # Solve using direct inverse + inv_a = np.linalg.inv(a) + zz0 = inv_a @ b + # solve using linalg.solve + zz = solve_single_time_window(b, a, None) + # compare the two solutions + assert np.isclose(np.abs(zz0 - zz), 0, atol=1e-10).all() + + # Estimate multiple coherence with linalg.solve soln + solved_residual = E - H[:, 0] * zz[0] - H[:, 1] * zz[1] + solved_residual_energy = (np.abs(solved_residual) ** 2).sum() + output_energy = (np.abs(E) ** 2).sum() + multiple_coherence_1 = solved_residual_energy / output_energy + print("solve", multiple_coherence_1) + # Estimate multiple coherence with Zxx Zxy + homebrew_residual = E - H[:, 0] * Zxx[0] - H[:, 1] * Zxy[0] + homebrew_residual_energy = (np.abs(homebrew_residual) ** 2).sum() + multiple_coherence_2 = homebrew_residual_energy / output_energy + print(multiple_coherence_2) + assert np.isclose( + multiple_coherence_1.data, multiple_coherence_2.data, 0, atol=1e-14 + ).all() + return Zxx, Zxy