Skip to content

Commit

Permalink
Merge pull request #67 from nico-franco-gomez/dev
Browse files Browse the repository at this point in the history
v0.4.7
  • Loading branch information
nico-franco-gomez authored Oct 24, 2024
2 parents d54757a + a2af157 commit db0b04a
Show file tree
Hide file tree
Showing 26 changed files with 572 additions and 158 deletions.
26 changes: 26 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,32 @@ adheres to `Semantic Versioning <http://semver.org/spec/v2.0.0.html>`_.
- Validation for results from tests in every module (so far many tests are
only regarding functionality)

`0.4.7 <https://pypi.org/project/dsptoolbox/0.4.7>`_ -
---------------------
Added
~~~~~
- new `dft` in ``transforms`` for computing DFTs with any resolution
- `lpc` in ``transforms``
- `ExponentialAverageFilter` in ``filterbanks``
- support for python 3.13

Misc
~~~~
- improved precision of parallel filter by adding a third feed-forward
coefficient to least-squares approximation
- replaced convolve with oaconvolve in multiple places for optimal handling
with different signal lengths
- made framed signal methods available in ``dsptoolbox.tools``
- general doc corrections and additions
- added numba as new dependency for parallelizing some functions. It will be
installed and used automatically if the current python environment is 3.12 or
below. Support for numba and python 3.13 is not yet available.

Bugfix
~~~~~~
- fixed problem with group delay designer
- fixed a problem with array dimensions in autoregressive coefficients estimation

`0.4.6 <https://pypi.org/project/dsptoolbox/0.4.6>`_ -
---------------------

Expand Down
2 changes: 1 addition & 1 deletion dsptoolbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,4 @@
"tools",
]

__version__ = "0.4.6"
__version__ = "0.4.7"
109 changes: 91 additions & 18 deletions dsptoolbox/_general_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1341,8 +1341,10 @@ def _get_fractional_impulse_peak_index(
roots = np.roots(pol)
# Get only root between 0 and 1
roots = roots[
(roots == roots.real) # Real roots
& (roots <= 1) # Range
# Real roots
(roots == roots.real)
# Range
& (roots <= 1)
& (roots >= 0)
].real
try:
Expand Down Expand Up @@ -1796,32 +1798,34 @@ def _get_correlation_of_latencies(

def __levison_durbin_recursion(
autocorrelation: NDArray[np.float64],
) -> tuple[NDArray[np.float64], float]:
"""Levinson-Durbin recursion to be applied to the autocorrelation
estimate.
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Levinson-Durbin recursion to be applied to the autocorrelation estimate.
It is always computed along the first (most outer) axis.
Parameters
----------
autocorrelation : NDArray[np.float64]
Autocorrelation function with only positive lags and length of
`order + 1`, where `order` corresponds to the order of the AR
estimation. It should be a flat array (only one channel).
estimation. It can have any shape, but the AR parameters are always
computed along the outer axis.
Returns
-------
reflection_coefficients : NDArray[np.float64]
Denominator coefficients.
prediction_error : float
Denominator coefficients with shape (coefficient, ...).
prediction_error : NDArray[np.float64]
Variance of the remaining error.
"""
prediction_error = autocorrelation[0] # Signal variance
autocorr_coefficients = autocorrelation[1:]
num_coefficients = len(autocorr_coefficients)
ar_parameters = np.zeros(num_coefficients)
prediction_error = autocorrelation[0, ...].copy() # Signal variance
autocorr_coefficients = autocorrelation[1:, ...].copy()

num_coefficients = autocorr_coefficients.shape[0]
ar_parameters = np.zeros_like(autocorr_coefficients)

for order in range(num_coefficients):
reflection_value = autocorr_coefficients[order]
reflection_value = autocorr_coefficients[order].copy()
if order == 0:
reflection_coefficient = -reflection_value / prediction_error
else:
Expand All @@ -1831,7 +1835,7 @@ def __levison_durbin_recursion(
)
reflection_coefficient = -reflection_value / prediction_error
prediction_error *= 1.0 - reflection_coefficient**2.0
if prediction_error <= 0:
if np.any(prediction_error <= 0):
raise ValueError("Invalid prediction error: Singular Matrix")
ar_parameters[order] = reflection_coefficient

Expand All @@ -1841,7 +1845,7 @@ def __levison_durbin_recursion(
half_order = (order + 1) // 2
for lag in range(half_order):
reverse_lag = order - lag - 1
save_value = ar_parameters[lag]
save_value = ar_parameters[lag].copy()
ar_parameters[lag] = (
save_value
+ reflection_coefficient * ar_parameters[reverse_lag]
Expand All @@ -1850,8 +1854,76 @@ def __levison_durbin_recursion(
ar_parameters[reverse_lag] += (
reflection_coefficient * save_value
)

# Add first coefficient a0
return np.hstack([1.0, ar_parameters]), prediction_error
ndim = ar_parameters.ndim
pad_width = tuple([(1, 0)] + [(0, 0)] * (ndim - 1))
return (
np.pad(ar_parameters, pad_width, mode="constant", constant_values=1.0),
prediction_error,
)


def __yw_ar_estimation(
time_data: NDArray[np.float64], order: int
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Compute the autoregressive coefficients for an AR process using the
Levinson-Durbin recursion to solve the Yule-Walker equations. This is done
from the biased autocorrelation.
Parameters
----------
time_data : NDArray[np.float64]
Time data with up to three dimensions. The AR parameters are always
computed along the first (outer) axis.
order : int
Recursion order.
Returns
-------
NDArray[np.float64]
Reflection coefficients with shape (coefficient, ...).
NDArray[np.float64]
Variance of the remaining error.
"""
assert (
time_data.ndim <= 3
), "This function only accepts a signal with one, two or three dimensions"

length_td = time_data.shape[0]
if time_data.ndim == 1:
autocorrelation = (
correlate(time_data, time_data, "full")[
length_td - 1 : length_td + order
]
/ length_td
)
elif time_data.ndim == 2:
autocorrelation = np.zeros((order + 1, time_data.shape[1]))
for i in range(time_data.shape[1]):
# Biased autocorrelation (only positive lags)
autocorrelation[:, i] = (
correlate(time_data[:, i], time_data[:, i], "full")[
length_td - 1 : length_td + order
]
/ length_td
)
else:
autocorrelation = np.zeros(
(order + 1, time_data.shape[1], time_data.shape[2])
)
for ii in range(time_data.shape[2]):
for i in range(time_data.shape[1]):
# Biased autocorrelation (only positive lags)
autocorrelation[:, i, ii] = (
correlate(
time_data[:, i, ii], time_data[:, i, ii], "full"
)[length_td - 1 : length_td + order]
/ length_td
)

return __levison_durbin_recursion(autocorrelation)


def __burg_ar_estimation(
Expand All @@ -1871,7 +1943,8 @@ def __burg_ar_estimation(
Returns
-------
NDArray[np.float64]
Denominator (reflection) coefficients.
Denominator (reflection) coefficients with shape (coefficient,
channel).
NDArray[np.float64]
Variances of the prediction error.
Expand Down Expand Up @@ -1930,4 +2003,4 @@ def __burg_ar_estimation(
fwd_pred_error = fwd_pred_error[1:]
bwd_pred_error = bwd_pred_error[:-1]

return ar_coeffs.squeeze(), den[0]
return ar_coeffs.squeeze() if onedim else ar_coeffs, den[0]
26 changes: 17 additions & 9 deletions dsptoolbox/_standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,16 +831,17 @@ def _detrend(


def _get_framed_signal(
td: NDArray[np.float64],
time_data: NDArray[np.float64],
window_length_samples: int,
step_size: int,
keep_last_frames: bool = True,
) -> NDArray[np.float64]:
"""This method computes a framed version of a signal and returns it.
"""This function turns a signal into (possibly) overlaping time frames.
The original data gets copied.
Parameters
----------
td : NDArray[np.float64]
time_data : NDArray[np.float64]
Signal with shape (time samples, channels).
window_length_samples : int
Window length in samples.
Expand All @@ -852,10 +853,17 @@ def _get_framed_signal(
Returns
-------
td_framed : NDArray[np.float64]
time_data_framed : NDArray[np.float64]
Framed signal with shape (time samples, frames, channels).
Notes
-----
- Perfect reconstruction from this representation can be achieved when the
signal is zero-padded at the edges where the window does not yet meet
the COLA condition. Otherwise, these sections might be distorted.
"""
assert time_data.ndim == 2, "Time data should have exactly two dimensions."
# Force casting to integers
if type(window_length_samples) is not int:
window_length_samples = int(window_length_samples)
Expand All @@ -866,12 +874,12 @@ def _get_framed_signal(
n_frames, padding_samp = _compute_number_frames(
window_length_samples,
step_size,
td.shape[0],
time_data.shape[0],
zero_padding=keep_last_frames,
)
td = _pad_trim(td, td.shape[0] + padding_samp)
td = _pad_trim(time_data, time_data.shape[0] + padding_samp)
td_framed = np.zeros(
(window_length_samples, n_frames, td.shape[1]), dtype="float"
(window_length_samples, n_frames, td.shape[1]), dtype=np.float64
)

# Create time frames
Expand All @@ -897,7 +905,7 @@ def _reconstruct_framed_signal(
Parameters
----------
td_framed : NDArray[np.float64]
Framed signal with shape (time samples, frame, channel).
Framed signal with shape (time samples, frames, channels).
step_size : int
Step size in samples between frames (also known as hop length).
window : str, NDArray[np.float64], optional
Expand All @@ -916,7 +924,7 @@ def _reconstruct_framed_signal(
Returns
-------
td : NDArray[np.float64]
Reconstructed signal.
Reconstructed signal with shape (time samples, channels).
"""
assert (
Expand Down
61 changes: 61 additions & 0 deletions dsptoolbox/classes/exponential_average_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import numpy as np

from .._general_helpers import _get_smoothing_factor_ema
from .realtime_filter import RealtimeFilter


class ExponentialAverageFilter(RealtimeFilter):
def __init__(
self,
increase_time_s: float,
decrease_time_s: float,
sampling_rate_hz: int,
accuracy_step_response: float = 0.95,
):
"""The exponential average filter is a one-pole IIR filter which
smoothes a the input (lowpass filter). It can have a different
coefficients for increasing and decreasing values.
Parameters
----------
increase_time_s : float
Time it would take the filter to obtain the given `accuracy` in
the step response. This is applied for increasing values.
decrease_time_s : float
Time it would take the filter to obtain the given `accuracy` in
the step response. This is applied for decreasing values.
sampling_rate_hz : int
Sampling rate of the filter.
accuracy_step_response : float, optional
This represents the value that the step response should reach after
the increase or decrease time. It has to in ]0; 1[. Default: 0.95.
"""
self.sampling_rate_hz = sampling_rate_hz
self.increase_coefficient = _get_smoothing_factor_ema(
increase_time_s, self.sampling_rate_hz, accuracy_step_response
)
self.decrease_coefficient = _get_smoothing_factor_ema(
decrease_time_s, self.sampling_rate_hz, accuracy_step_response
)
self.set_n_channels(1)

def set_n_channels(self, n_channels: int):
self.state = np.zeros((1, n_channels))

def reset_state(self):
self.state.fill(0.0)

def process_sample(self, x: float, channel: int):
if x > self.state: # Ascending
y = (
x * self.increase_coefficient
+ (1 - self.increase_coefficient) * self.state[0, channel]
)
else: # Descending
y = (
x * self.decrease_coefficient
+ (1 - self.decrease_coefficient) * self.state[0, channel]
)
self.state[0, channel] = y
return y
10 changes: 5 additions & 5 deletions dsptoolbox/classes/filter_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ def _lfilter_fir(
assert x.ndim == 2, "Filtering only works on 2D-arrays"

# Convolving
y = sig.convolve(x, b[..., None], mode="full")
y = sig.oaconvolve(x, b[..., None], mode="full", axes=0)

# Use zi's and take zf's
if zi is not None:
Expand Down Expand Up @@ -612,8 +612,8 @@ def _filter_and_downsample(
for ch in range(poly.shape[2]):
temp = np.zeros(new_time_data.shape[0])
for n in range(poly.shape[1]):
temp += sig.convolve(
poly[:, n, ch], b_poly[:, n, 0], mode="full"
temp += sig.oaconvolve(
poly[:, n, ch], b_poly[:, n, 0], mode="full", axes=0
)
new_time_data[:, ch] = temp
# Take correct values from vector
Expand Down Expand Up @@ -689,8 +689,8 @@ def _filter_and_upsample(
# a better way to do it without the loops...
for ch in range(time_data.shape[1]):
for ind in range(up_factor):
new_time_data[ind::up_factor, ch] = sig.convolve(
time_data[:, ch], b_poly[:, ind, 0], mode="full"
new_time_data[ind::up_factor, ch] = sig.oaconvolve(
time_data[:, ch], b_poly[:, ind, 0], mode="full", axes=0
)

# Take right samples from filtered signal
Expand Down
Loading

0 comments on commit db0b04a

Please sign in to comment.