diff --git a/aurora/pipelines/transfer_function_kernel.py b/aurora/pipelines/transfer_function_kernel.py index 4e03454e..f8e7364d 100644 --- a/aurora/pipelines/transfer_function_kernel.py +++ b/aurora/pipelines/transfer_function_kernel.py @@ -411,13 +411,17 @@ def validate_save_fc_settings(self): for dec_level_config in self.config.decimations: # if dec_level_config.save_fcs: dec_level_config.save_fcs = False + if self.config.stations.remote: + save_any_fcs = np.array([x.save_fcs for x in self.config.decimations]).any() + if save_any_fcs: + msg = "\n Saving FCs for remote reference processing is not supported" + msg = f"{msg} \n - To save FCs, process as single station, then you can use the FCs for RR processing" + msg = f"{msg} \n - See issue #319 for details " + msg = f"{msg} \n - forcing processing config save_fcs = False " + logger.warning(msg) + for dec_level_config in self.config.decimations: + dec_level_config.save_fcs = False - # TODO: Add logic here that checks if remote reference station is in processing and save_FCs is True - # -- logger.critical() This is not a supported configuration (but it might just work!) - # -- basically, you would need to have runs at the local and remote station be simultaneous to work - # -- if they overlap in a janky way, then the saved FCs will be janky - # -- recommend that you apply single station processing to local and remote station with save FCs =True - # -- and then run remote reference processing (with save FCs =True) return def get_mth5_file_open_mode(self): diff --git a/aurora/transfer_function/regression/helper_functions.py b/aurora/transfer_function/regression/helper_functions.py index e4390358..286eceba 100644 --- a/aurora/transfer_function/regression/helper_functions.py +++ b/aurora/transfer_function/regression/helper_functions.py @@ -40,18 +40,25 @@ def rme_beta(r0): return beta -def solve_single_time_window(Y, X, R=None): +def simple_solve_tf(Y, X, R=None): """ Cast problem Y = Xb into scipy.linalg.solve form which solves: a @ x = b + - Note that the "b" in the two equations is different. + - The EMTF regression problem (and many regression problems in general) use Y=Xb + - Y, X are known and b is the solution + - scipy.linalg.solve form which solves: a @ x = b + - Here a, b are known and x is the solution. - This function is used for testing vectorized, direct solver. Parameters ---------- Y: numpy.ndarray - The "output" of regression problem. For testing this often has shape (2,). + The "output" of regression problem. For testing this often has shape (N,). + Y is normally a vector of Electric field FCs X: numpy.ndarray - The "input" of regression problem. For testing this often has shape (2,2). + The "input" of regression problem. For testing this often has shape (N,2). + X is normally an array of Magnetic field FCs (two-component) R: numpy.ndarray or None Remote reference channels (optional) @@ -69,3 +76,49 @@ def solve_single_time_window(Y, X, R=None): b = xH @ Y z = np.linalg.solve(a, b) return z + + +def direct_solve_tf(Y, X, R=None): + """ + Cast problem Y = Xb + + Parameters + ---------- + Y: numpy.ndarray + The "output" of regression problem. For testing this often has shape (N,). + Y is normally a vector of Electric field FCs + X: numpy.ndarray + The "input" of regression problem. For testing this often has shape (N,2). + X is normally an array of Magnetic field FCs (two-component) + R: numpy.ndarray or None + Remote reference channels (optional) + + Returns + ------- + z: numpy.ndarray + The TF estimate -- Usually two complex numbers, (Zxx, Zxy), or (Zyx, Zyy) + Parameters + ---------- + Y + X + R + + Returns + ------- + + """ + if R is None: + xH = X.conjugate().transpose() + else: + xH = R.conjugate().transpose() + + # multiply both sides by conjugate transpose + xHx = xH @ X + xHY = xH @ Y + + # Invert the square matrix + inv = np.linalg.inv(xHx) + + # get solution + b = inv @ xHY + return b diff --git a/aurora/transfer_function/weights/spectral_features.py b/aurora/transfer_function/weights/spectral_features.py index d81874d6..c5b5f42b 100644 --- a/aurora/transfer_function/weights/spectral_features.py +++ b/aurora/transfer_function/weights/spectral_features.py @@ -1,8 +1,7 @@ import numpy as np -from aurora.transfer_function.regression.helper_functions import ( - solve_single_time_window, -) +from aurora.transfer_function.regression.helper_functions import direct_solve_tf +from aurora.transfer_function.regression.helper_functions import simple_solve_tf def estimate_time_series_of_impedances(band, output_ch="ex", use_remote=True): @@ -70,33 +69,17 @@ def cross_power_series(ch1, ch2): # method above is consistent with for-looping on np.solve # Set up the problem in terms of linalg.solve() - idx = 0 # a time-window index to check + # Y = X*b + # E = H z + + # Uncomment for sanity check test + idx = np.random.randint(len(Zxx)) # 0 # a time-window index to check E = band["ex"][idx, :] H = band[["hx", "hy"]].to_array()[:, idx].T - HH = H.conj().transpose() - a = HH.data @ H.data - b = HH.data @ E.data - - # Solve using direct inverse - inv_a = np.linalg.inv(a) - zz0 = inv_a @ b - # solve using linalg.solve - zz = solve_single_time_window(b, a, None) - # compare the two solutions - assert np.isclose(np.abs(zz0 - zz), 0, atol=1e-10).all() - - # Estimate multiple coherence with linalg.solve soln - solved_residual = E - H[:, 0] * zz[0] - H[:, 1] * zz[1] - solved_residual_energy = (np.abs(solved_residual) ** 2).sum() - output_energy = (np.abs(E) ** 2).sum() - multiple_coherence_1 = solved_residual_energy / output_energy - print("solve", multiple_coherence_1) - # Estimate multiple coherence with Zxx Zxy - homebrew_residual = E - H[:, 0] * Zxx[0] - H[:, 1] * Zxy[0] - homebrew_residual_energy = (np.abs(homebrew_residual) ** 2).sum() - multiple_coherence_2 = homebrew_residual_energy / output_energy - print(multiple_coherence_2) - assert np.isclose( - multiple_coherence_1.data, multiple_coherence_2.data, 0, atol=1e-14 - ).all() + z_direct = direct_solve_tf(E.data, H.data) + z_linalg = simple_solve_tf(E.data, H.data, None) + z_tricky = np.array([Zxx[idx], Zxy[idx]]) + assert np.isclose(np.abs(z_direct - z_linalg), 0, atol=1e-10).all() + assert np.isclose(np.abs(z_direct - z_tricky), 0, atol=1e-10).all() + return Zxx, Zxy diff --git a/tests/transfer_function/regression/test_helper_functions.py b/tests/transfer_function/regression/test_helper_functions.py new file mode 100644 index 00000000..38a3c295 --- /dev/null +++ b/tests/transfer_function/regression/test_helper_functions.py @@ -0,0 +1,55 @@ +import unittest + +import numpy as np + +from aurora.transfer_function.regression.helper_functions import direct_solve_tf +from aurora.transfer_function.regression.helper_functions import simple_solve_tf + + +class TestHelperFunctions(unittest.TestCase): + """ """ + + @classmethod + def setUpClass(self): + self.electric_data = np.array( + [ + 4.39080123e-07 - 2.41097397e-06j, + -2.33418464e-06 + 2.10752581e-06j, + 1.38642624e-06 - 1.87333571e-06j, + ] + ) + self.magnetic_data = np.array( + [ + [7.00767250e-07 - 9.18819198e-07j, 1.94321684e-07 + 3.71934877e-07j], + [-1.06648904e-07 + 8.19420154e-07j, 1.15361101e-08 - 6.32581646e-07j], + [-1.02700963e-07 - 3.73904463e-07j, 3.86095787e-08 + 4.33155345e-07j], + ] + ) + self.expected_solution = np.array( + [-0.04192569 - 0.36502722j, -3.65284496 - 4.05194938j] + ) + + def setUp(self): + pass + + def test_simple_solve_tf(self): + X = self.magnetic_data + Y = self.electric_data + z = simple_solve_tf(Y, X) + assert np.isclose(z, self.expected_solution, rtol=1e-8).all() + return z + + def test_direct_solve_tf(self): + X = self.magnetic_data + Y = self.electric_data + z = direct_solve_tf(Y, X) + assert np.isclose(z, self.expected_solution, rtol=1e-8).all() + return z + + +def main(): + unittest.main() + + +if __name__ == "__main__": + main()