-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
246 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
""" | ||
Module containing helper functions for working with xarray objects. | ||
""" | ||
import numpy as np | ||
import xarray as xr | ||
from typing import Optional, Union | ||
|
||
|
||
def covariance_xr( | ||
X: xr.DataArray, aweights: Optional[Union[np.ndarray, None]] = None | ||
) -> xr.DataArray: | ||
""" | ||
Compute the covariance matrix with numpy.cov. | ||
Parameters | ||
---------- | ||
X: xarray.core.dataarray.DataArray | ||
Multivariate time series as an xarray | ||
aweights: array_like, optional | ||
Doc taken from numpy cov follows: | ||
1-D array of observation vector weights. These relative weights are | ||
typically large for observations considered "important" and smaller for | ||
observations considered less "important". If ``ddof=0`` the array of | ||
weights can be used to assign probabilities to observation vectors. | ||
Returns | ||
------- | ||
S: xarray.DataArray | ||
The covariance matrix of the data in xarray form. | ||
""" | ||
|
||
channels = list(X.coords["variable"].values) | ||
|
||
S = xr.DataArray( | ||
np.cov(X.values, rowvar=False, aweights=aweights), | ||
dims=["channel_1", "channel_2"], | ||
coords={"channel_1": channels, "channel_2": channels}, | ||
) | ||
return S | ||
|
||
|
||
def initialize_xrda_1d( | ||
channels: list, | ||
dtype: Optional[type] = None, | ||
value: Optional[Union[complex, float, bool]] = 0, | ||
) -> xr.DataArray: | ||
""" | ||
Returns a 1D xr.DataArray with variable "channel", having values channels named by the input list. | ||
Parameters | ||
---------- | ||
channels: list | ||
The channels in the multivariate array | ||
dtype: type, optional | ||
The datatype to initialize the array. | ||
Common cases are complex, float, and bool | ||
value: Union[complex, float, bool], optional | ||
The default value to assign the array | ||
Returns | ||
------- | ||
xrda: xarray.core.dataarray.DataArray | ||
An xarray container for the channels, initialized to zeros. | ||
""" | ||
k = len(channels) | ||
xrda = xr.DataArray( | ||
np.zeros(k, dtype=dtype), | ||
dims=[ | ||
"variable", | ||
], | ||
coords={ | ||
"variable": channels, | ||
}, | ||
) | ||
if value != 0: | ||
data = value * np.ones(k, dtype=dtype) | ||
xrda.data = data | ||
return xrda | ||
|
||
|
||
def initialize_xrda_2d( | ||
channels: list, | ||
dtype: Optional[type] = complex, | ||
value: Optional[Union[complex, float, bool]] = 0, | ||
dims: Optional[list] = None, | ||
) -> xr.DataArray: | ||
""" | ||
Returns a 2D xr.DataArray with dimensions channel_1 and channel_2. | ||
Parameters | ||
---------- | ||
channels: list | ||
The channels in the multivariate array | ||
dtype: type, optional | ||
The datatype to initialize the array. | ||
Common cases are complex, float, and bool | ||
value: Union[complex, float, bool], optional | ||
The default value to assign the array | ||
dims: list, optional | ||
List of two channel lists for the two dimensions. If None, uses the same channels for both dimensions. | ||
Returns | ||
------- | ||
xrda: xarray.core.dataarray.DataArray | ||
An xarray container for the channel variances etc., initialized to zeros. | ||
""" | ||
if dims is None: | ||
dims = [channels, channels] | ||
|
||
K = len(channels) | ||
xrda = xr.DataArray( | ||
np.zeros((K, K), dtype=dtype), | ||
dims=["channel_1", "channel_2"], | ||
coords={"channel_1": dims[0], "channel_2": dims[1]}, | ||
) | ||
if value != 0: | ||
data = value * np.ones(xrda.shape, dtype=dtype) | ||
xrda.data = data | ||
|
||
return xrda |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
""" | ||
Tests for xarray helper functions. | ||
""" | ||
import numpy as np | ||
import xarray as xr | ||
import pytest | ||
|
||
from mth5.timeseries.xarray_helpers import covariance_xr, initialize_xrda_1d, initialize_xrda_2d | ||
|
||
|
||
def test_covariance_xr(): | ||
"""Test covariance computation with xarray.""" | ||
# Create sample data | ||
channels = ["ex", "ey", "hx", "hy"] | ||
times = np.arange(100) | ||
data = np.random.randn(len(times), len(channels)) | ||
|
||
# Create xarray DataArray | ||
X = xr.DataArray( | ||
data, | ||
dims=["time", "variable"], | ||
coords={ | ||
"time": times, | ||
"variable": channels | ||
} | ||
) | ||
|
||
# Compute covariance | ||
S = covariance_xr(X) | ||
|
||
# Check dimensions and coordinates | ||
assert S.dims == ("channel_1", "channel_2") | ||
assert list(S.coords["channel_1"].values) == channels | ||
assert list(S.coords["channel_2"].values) == channels | ||
|
||
# Check symmetry | ||
assert np.allclose(S.values, S.values.T) | ||
|
||
# Check against numpy covariance | ||
np_cov = np.cov(X.values.T) | ||
assert np.allclose(S.values, np_cov) | ||
|
||
|
||
def test_covariance_xr_with_weights(): | ||
"""Test weighted covariance computation with xarray.""" | ||
# Create sample data | ||
channels = ["ex", "ey", "hx", "hy"] | ||
times = np.arange(100) | ||
data = np.random.randn(len(times), len(channels)) | ||
weights = np.random.rand(len(times)) | ||
|
||
# Create xarray DataArray | ||
X = xr.DataArray( | ||
data, | ||
dims=["time", "variable"], | ||
coords={ | ||
"time": times, | ||
"variable": channels | ||
} | ||
) | ||
|
||
# Compute weighted covariance | ||
S = covariance_xr(X, aweights=weights) | ||
|
||
# Check dimensions and coordinates | ||
assert S.dims == ("channel_1", "channel_2") | ||
assert list(S.coords["channel_1"].values) == channels | ||
assert list(S.coords["channel_2"].values) == channels | ||
|
||
# Check symmetry | ||
assert np.allclose(S.values, S.values.T) | ||
|
||
# Check against numpy weighted covariance | ||
np_cov = np.cov(X.values.T, aweights=weights) | ||
assert np.allclose(S.values, np_cov) | ||
|
||
|
||
def test_initialize_xrda_1d(): | ||
"""Test initialization of 1D xarray DataArray.""" | ||
# Test with default parameters | ||
channels = ["ex", "ey", "hx", "hy"] | ||
xrda = initialize_xrda_1d(channels) | ||
|
||
assert xrda.dims == ("variable",) | ||
assert list(xrda.coords["variable"].values) == channels | ||
assert np.allclose(xrda.values, np.zeros(len(channels))) | ||
|
||
# Test with custom dtype and value | ||
xrda = initialize_xrda_1d(channels, dtype=complex, value=1+1j) | ||
|
||
assert xrda.dtype == complex | ||
assert np.allclose(xrda.values, np.ones(len(channels)) * (1+1j)) | ||
|
||
# Test with boolean type | ||
xrda = initialize_xrda_1d(channels, dtype=bool, value=True) | ||
|
||
assert xrda.dtype == bool | ||
assert np.all(xrda.values == True) | ||
|
||
|
||
def test_initialize_xrda_2d(): | ||
"""Test initialization of 2D xarray DataArray.""" | ||
# Test with default parameters | ||
channels = ["ex", "ey", "hx", "hy"] | ||
xrda = initialize_xrda_2d(channels) | ||
|
||
assert xrda.dims == ("channel_1", "channel_2") | ||
assert list(xrda.coords["channel_1"].values) == channels | ||
assert list(xrda.coords["channel_2"].values) == channels | ||
assert xrda.dtype == complex | ||
assert np.allclose(xrda.values, np.zeros((len(channels), len(channels)))) | ||
|
||
# Test with custom dtype and value | ||
xrda = initialize_xrda_2d(channels, dtype=float, value=1.5) | ||
|
||
assert xrda.dtype == float | ||
assert np.allclose(xrda.values, np.ones((len(channels), len(channels))) * 1.5) | ||
|
||
# Test with different dimensions | ||
channels2 = ["hx", "hy"] | ||
xrda = initialize_xrda_2d(channels, dims=[channels, channels2]) | ||
|
||
assert xrda.dims == ("channel_1", "channel_2") | ||
assert list(xrda.coords["channel_1"].values) == channels | ||
assert list(xrda.coords["channel_2"].values) == channels2 | ||
assert xrda.shape == (len(channels), len(channels2)) |