From a54a125f6268971b56827ad2b73b12a3ebf67d77 Mon Sep 17 00:00:00 2001 From: kkappler Date: Fri, 10 Jan 2025 16:58:40 -0800 Subject: [PATCH] Move some xarray tools from aurora --- mth5/timeseries/xarray_helpers.py | 120 ++++++++++++++++++++++ tests/timeseries/test_xarray_helpers.py | 126 ++++++++++++++++++++++++ 2 files changed, 246 insertions(+) create mode 100644 mth5/timeseries/xarray_helpers.py create mode 100644 tests/timeseries/test_xarray_helpers.py diff --git a/mth5/timeseries/xarray_helpers.py b/mth5/timeseries/xarray_helpers.py new file mode 100644 index 00000000..f82ade87 --- /dev/null +++ b/mth5/timeseries/xarray_helpers.py @@ -0,0 +1,120 @@ +""" +Module containing helper functions for working with xarray objects. +""" +import numpy as np +import xarray as xr +from typing import Optional, Union + + +def covariance_xr( + X: xr.DataArray, aweights: Optional[Union[np.ndarray, None]] = None +) -> xr.DataArray: + """ + Compute the covariance matrix with numpy.cov. + + Parameters + ---------- + X: xarray.core.dataarray.DataArray + Multivariate time series as an xarray + aweights: array_like, optional + Doc taken from numpy cov follows: + 1-D array of observation vector weights. These relative weights are + typically large for observations considered "important" and smaller for + observations considered less "important". If ``ddof=0`` the array of + weights can be used to assign probabilities to observation vectors. + + Returns + ------- + S: xarray.DataArray + The covariance matrix of the data in xarray form. + """ + + channels = list(X.coords["variable"].values) + + S = xr.DataArray( + np.cov(X.values, rowvar=False, aweights=aweights), + dims=["channel_1", "channel_2"], + coords={"channel_1": channels, "channel_2": channels}, + ) + return S + + +def initialize_xrda_1d( + channels: list, + dtype: Optional[type] = None, + value: Optional[Union[complex, float, bool]] = 0, +) -> xr.DataArray: + """ + Returns a 1D xr.DataArray with variable "channel", having values channels named by the input list. + + Parameters + ---------- + channels: list + The channels in the multivariate array + dtype: type, optional + The datatype to initialize the array. + Common cases are complex, float, and bool + value: Union[complex, float, bool], optional + The default value to assign the array + + Returns + ------- + xrda: xarray.core.dataarray.DataArray + An xarray container for the channels, initialized to zeros. + """ + k = len(channels) + xrda = xr.DataArray( + np.zeros(k, dtype=dtype), + dims=[ + "variable", + ], + coords={ + "variable": channels, + }, + ) + if value != 0: + data = value * np.ones(k, dtype=dtype) + xrda.data = data + return xrda + + +def initialize_xrda_2d( + channels: list, + dtype: Optional[type] = complex, + value: Optional[Union[complex, float, bool]] = 0, + dims: Optional[list] = None, +) -> xr.DataArray: + """ + Returns a 2D xr.DataArray with dimensions channel_1 and channel_2. + + Parameters + ---------- + channels: list + The channels in the multivariate array + dtype: type, optional + The datatype to initialize the array. + Common cases are complex, float, and bool + value: Union[complex, float, bool], optional + The default value to assign the array + dims: list, optional + List of two channel lists for the two dimensions. If None, uses the same channels for both dimensions. + + Returns + ------- + xrda: xarray.core.dataarray.DataArray + An xarray container for the channel variances etc., initialized to zeros. + """ + if dims is None: + dims = [channels, channels] + + K = len(channels) + xrda = xr.DataArray( + np.zeros((K, K), dtype=dtype), + dims=["channel_1", "channel_2"], + coords={"channel_1": dims[0], "channel_2": dims[1]}, + ) + if value != 0: + data = value * np.ones(xrda.shape, dtype=dtype) + xrda.data = data + + return xrda diff --git a/tests/timeseries/test_xarray_helpers.py b/tests/timeseries/test_xarray_helpers.py new file mode 100644 index 00000000..f52d4f7e --- /dev/null +++ b/tests/timeseries/test_xarray_helpers.py @@ -0,0 +1,126 @@ +""" +Tests for xarray helper functions. +""" +import numpy as np +import xarray as xr +import pytest + +from mth5.timeseries.xarray_helpers import covariance_xr, initialize_xrda_1d, initialize_xrda_2d + + +def test_covariance_xr(): + """Test covariance computation with xarray.""" + # Create sample data + channels = ["ex", "ey", "hx", "hy"] + times = np.arange(100) + data = np.random.randn(len(times), len(channels)) + + # Create xarray DataArray + X = xr.DataArray( + data, + dims=["time", "variable"], + coords={ + "time": times, + "variable": channels + } + ) + + # Compute covariance + S = covariance_xr(X) + + # Check dimensions and coordinates + assert S.dims == ("channel_1", "channel_2") + assert list(S.coords["channel_1"].values) == channels + assert list(S.coords["channel_2"].values) == channels + + # Check symmetry + assert np.allclose(S.values, S.values.T) + + # Check against numpy covariance + np_cov = np.cov(X.values.T) + assert np.allclose(S.values, np_cov) + + +def test_covariance_xr_with_weights(): + """Test weighted covariance computation with xarray.""" + # Create sample data + channels = ["ex", "ey", "hx", "hy"] + times = np.arange(100) + data = np.random.randn(len(times), len(channels)) + weights = np.random.rand(len(times)) + + # Create xarray DataArray + X = xr.DataArray( + data, + dims=["time", "variable"], + coords={ + "time": times, + "variable": channels + } + ) + + # Compute weighted covariance + S = covariance_xr(X, aweights=weights) + + # Check dimensions and coordinates + assert S.dims == ("channel_1", "channel_2") + assert list(S.coords["channel_1"].values) == channels + assert list(S.coords["channel_2"].values) == channels + + # Check symmetry + assert np.allclose(S.values, S.values.T) + + # Check against numpy weighted covariance + np_cov = np.cov(X.values.T, aweights=weights) + assert np.allclose(S.values, np_cov) + + +def test_initialize_xrda_1d(): + """Test initialization of 1D xarray DataArray.""" + # Test with default parameters + channels = ["ex", "ey", "hx", "hy"] + xrda = initialize_xrda_1d(channels) + + assert xrda.dims == ("variable",) + assert list(xrda.coords["variable"].values) == channels + assert np.allclose(xrda.values, np.zeros(len(channels))) + + # Test with custom dtype and value + xrda = initialize_xrda_1d(channels, dtype=complex, value=1+1j) + + assert xrda.dtype == complex + assert np.allclose(xrda.values, np.ones(len(channels)) * (1+1j)) + + # Test with boolean type + xrda = initialize_xrda_1d(channels, dtype=bool, value=True) + + assert xrda.dtype == bool + assert np.all(xrda.values == True) + + +def test_initialize_xrda_2d(): + """Test initialization of 2D xarray DataArray.""" + # Test with default parameters + channels = ["ex", "ey", "hx", "hy"] + xrda = initialize_xrda_2d(channels) + + assert xrda.dims == ("channel_1", "channel_2") + assert list(xrda.coords["channel_1"].values) == channels + assert list(xrda.coords["channel_2"].values) == channels + assert xrda.dtype == complex + assert np.allclose(xrda.values, np.zeros((len(channels), len(channels)))) + + # Test with custom dtype and value + xrda = initialize_xrda_2d(channels, dtype=float, value=1.5) + + assert xrda.dtype == float + assert np.allclose(xrda.values, np.ones((len(channels), len(channels))) * 1.5) + + # Test with different dimensions + channels2 = ["hx", "hy"] + xrda = initialize_xrda_2d(channels, dims=[channels, channels2]) + + assert xrda.dims == ("channel_1", "channel_2") + assert list(xrda.coords["channel_1"].values) == channels + assert list(xrda.coords["channel_2"].values) == channels2 + assert xrda.shape == (len(channels), len(channels2)) \ No newline at end of file