From eee352c5b825d3abf9343e36afca2f7e91b28001 Mon Sep 17 00:00:00 2001 From: Jonas Chromik Date: Thu, 14 Dec 2023 14:06:44 +0100 Subject: [PATCH 1/8] introduce signal_flatintervals to find flatline intervals in signals --- neurokit2/signal/__init__.py | 2 + neurokit2/signal/signal_flatintervals.py | 103 +++++++++++++++++++++++ tests/tests_signal.py | 22 +++++ 3 files changed, 127 insertions(+) create mode 100644 neurokit2/signal/signal_flatintervals.py diff --git a/neurokit2/signal/__init__.py b/neurokit2/signal/__init__.py index 68d56cde6d..260a89ecd9 100644 --- a/neurokit2/signal/__init__.py +++ b/neurokit2/signal/__init__.py @@ -9,6 +9,7 @@ from .signal_filter import signal_filter from .signal_findpeaks import signal_findpeaks from .signal_fixpeaks import signal_fixpeaks +from .signal_flatintervals import signal_flatintervals from .signal_flatline import signal_flatline from .signal_formatpeaks import signal_formatpeaks from .signal_interpolate import signal_interpolate @@ -41,6 +42,7 @@ "signal_distort", "signal_interpolate", "signal_detrend", + "signal_flatintervals", "signal_findpeaks", "signal_fixpeaks", "signal_formatpeaks", diff --git a/neurokit2/signal/signal_flatintervals.py b/neurokit2/signal/signal_flatintervals.py new file mode 100644 index 0000000000..c14d79abf2 --- /dev/null +++ b/neurokit2/signal/signal_flatintervals.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +import numpy as np + + +def signal_flatintervals(signal, sampling_rate, threshold=0.01, tolerance=60): + """Finds flatline areas in a signal. + + Parameters + ---------- + signal : Union[list, np.array] + The signal as a vector of values. + sampling_rate : int + The sampling rate of the signal, i.e. how many samples (values) per second. + threshold : float, optional + Flatline threshold relative to the biggest change in the signal. + This is the percentage of the maximum value of absolute consecutive differences. + Default: 0.01 (= 1% of the biggest change in the signal) + tolerance : int, optional + Determines how fine-grained the resulting signal is, + i.e. how long (in seconds) can a flatline part be without being recognised as such. + Default: 60 (seconds) + + Returns + ------- + list + Returns a list of tuples: + flatline_starts: Indices where a flatline part starts. + flatline_ends: Indices where a flatline part ends. + + """ + + # Identify flanks: +1 for beginning plateau; -1 for ending plateau. + flanks = np.diff(_find_flatlines(signal, sampling_rate, threshold, tolerance).astype(int)) + + flatline_starts = np.flatnonzero(flanks > 0) + flatline_ends = np.flatnonzero(flanks < 0) + + # Correct offsets from moving average + flatline_starts = flatline_starts + sampling_rate * tolerance + + # Insert start marker at signal start if a start marker is missing + if len(flatline_starts) < len(flatline_ends): + flatline_starts = np.insert(flatline_starts, 0, 0) + + # Insert end marker at signal end if an end marker is missing + if len(flatline_ends) < len(flatline_starts): + flatline_ends = np.append(flatline_ends, [len(signal) - 1]) + + # Return instances where start < end (start >= end might occur due to offset correction). + return [(start, end) for start, end in zip(flatline_starts, flatline_ends) if start < end] + + +def _find_flatlines(signal, sampling_rate, threshold=0.01, tolerance=60): + """Finds flatline areas in a signal. + + Parameters + ---------- + signal : Union[list, np.array] + The signal as a vector of values. + sampling_rate : int + The sampling rate of the signal, i.e. how many samples (values) per second. + threshold : float, optional + Flatline threshold relative to the biggest change in the signal. + This is the percentage of the maximum value of absolute consecutive differences. + Default: 0.01 (= 1% of the biggest change in the signal) + tolerance : int, optional + Determines how fine-grained the resulting signal is, + i.e. how long (in seconds) can a flatline part be without being recognised as such. + Default: 60 (seconds) + + Returns + ------- + np.array + Returns a signal that is True/1 where there is a sufficiently long + flatline part in the signal and False/0 otherwise. + Note: Returned signal is shorter than the original signal by (sampling_rate * tolerance) - 1. + + """ + abs_diff = np.abs(np.diff(signal)) + threshold = threshold * np.max(abs_diff) + + return _moving_average(abs_diff >= threshold, sampling_rate * tolerance) < threshold + + +def _moving_average(signal, window_size): + """Moving window average on a signal. + + Parameters + ---------- + signal : Union[list, np.array] + The signal as a vector of values. + window_size : int + How many consequtive samples are used for averaging. + + Returns + ------- + np.array + Returns a signal of averages from the original signal. + Note: The returned signal is shorter than the original signal by window_size - 1. + + """ + + return np.convolve(signal, np.ones(window_size), 'valid') / window_size diff --git a/tests/tests_signal.py b/tests/tests_signal.py index 399ee7babd..8b646bd01d 100644 --- a/tests/tests_signal.py +++ b/tests/tests_signal.py @@ -217,6 +217,28 @@ def test_signal_interpolate(): assert interpolated[-1] == signal[-1] +def test_signal_flatintervals(): + sampling_rate = 128 + one_minute = 60 + tolerance = 60 + + ecg = nk.ecg_simulate(duration=10*one_minute, sampling_rate=sampling_rate) + flatline_1 = np.full(10 * one_minute * sampling_rate, -4.0) + flatline_2 = np.zeros(10 * one_minute * sampling_rate) + signal = np.concatenate([ecg, flatline_1, ecg, flatline_2, ecg, flatline_1]) + + flatintervals = np.array(nk.signal_flatintervals(signal, sampling_rate, tolerance=tolerance)) / sampling_rate + + assert len(flatintervals) == 3 + ground_truth = np.array([(10, 20), (30, 40), (50, 60)]) * one_minute + + for interval, truth in zip(flatintervals, ground_truth): + interval_begin, interval_end = interval + true_begin, true_end = truth + assert interval_begin > true_begin and interval_begin < true_begin + tolerance + assert interval_end > true_end - tolerance and interval_end < true_end + + def test_signal_findpeaks(): signal1 = np.cos(np.linspace(start=0, stop=30, num=1000)) From 1e08c3c66d67f20608d85f8fbed2e21270afe676 Mon Sep 17 00:00:00 2001 From: Jonas Chromik Date: Thu, 14 Dec 2023 14:25:44 +0100 Subject: [PATCH 2/8] run code checks --- neurokit2/signal/signal_flatintervals.py | 6 +- tests/tests_signal.py | 118 +++++++---------------- 2 files changed, 37 insertions(+), 87 deletions(-) diff --git a/neurokit2/signal/signal_flatintervals.py b/neurokit2/signal/signal_flatintervals.py index c14d79abf2..5e392fff8e 100644 --- a/neurokit2/signal/signal_flatintervals.py +++ b/neurokit2/signal/signal_flatintervals.py @@ -31,7 +31,7 @@ def signal_flatintervals(signal, sampling_rate, threshold=0.01, tolerance=60): # Identify flanks: +1 for beginning plateau; -1 for ending plateau. flanks = np.diff(_find_flatlines(signal, sampling_rate, threshold, tolerance).astype(int)) - + flatline_starts = np.flatnonzero(flanks > 0) flatline_ends = np.flatnonzero(flanks < 0) @@ -97,7 +97,7 @@ def _moving_average(signal, window_size): np.array Returns a signal of averages from the original signal. Note: The returned signal is shorter than the original signal by window_size - 1. - + """ - return np.convolve(signal, np.ones(window_size), 'valid') / window_size + return np.convolve(signal, np.ones(window_size), "valid") / window_size diff --git a/tests/tests_signal.py b/tests/tests_signal.py index 8b646bd01d..73021acacf 100644 --- a/tests/tests_signal.py +++ b/tests/tests_signal.py @@ -16,20 +16,15 @@ def test_signal_simulate(): # Warning for nyquist criterion - with pytest.warns( - nk.misc.NeuroKitWarning, match=r"Skipping requested frequency.*cannot be resolved.*" - ): + with pytest.warns(nk.misc.NeuroKitWarning, match=r"Skipping requested frequency.*cannot be resolved.*"): nk.signal_simulate(sampling_rate=100, frequency=11, silent=False) # Warning for period duration - with pytest.warns( - nk.misc.NeuroKitWarning, match=r"Skipping requested frequency.*since its period of.*" - ): + with pytest.warns(nk.misc.NeuroKitWarning, match=r"Skipping requested frequency.*since its period of.*"): nk.signal_simulate(duration=1, frequency=0.1, silent=False) def test_signal_smooth(): - # TODO: test kernels other than "boxcar" signal = np.cos(np.linspace(start=0, stop=20, num=1000)) smooth1 = nk.signal_smooth(signal, kernel="boxcar", size=100) @@ -48,7 +43,6 @@ def test_signal_smooth_boxcar(): def test_signal_binarize(): - signal = np.cos(np.linspace(start=0, stop=20, num=1000)) binary = nk.signal_binarize(signal) assert len(binary) == 1000 @@ -58,24 +52,15 @@ def test_signal_binarize(): def test_signal_resample(): - signal = np.cos(np.linspace(start=0, stop=20, num=50)) downsampled_interpolation = nk.signal_resample( signal, method="interpolation", sampling_rate=1000, desired_sampling_rate=500 ) - downsampled_numpy = nk.signal_resample( - signal, method="numpy", sampling_rate=1000, desired_sampling_rate=500 - ) - downsampled_pandas = nk.signal_resample( - signal, method="pandas", sampling_rate=1000, desired_sampling_rate=500 - ) - downsampled_fft = nk.signal_resample( - signal, method="FFT", sampling_rate=1000, desired_sampling_rate=500 - ) - downsampled_poly = nk.signal_resample( - signal, method="poly", sampling_rate=1000, desired_sampling_rate=500 - ) + downsampled_numpy = nk.signal_resample(signal, method="numpy", sampling_rate=1000, desired_sampling_rate=500) + downsampled_pandas = nk.signal_resample(signal, method="pandas", sampling_rate=1000, desired_sampling_rate=500) + downsampled_fft = nk.signal_resample(signal, method="FFT", sampling_rate=1000, desired_sampling_rate=500) + downsampled_poly = nk.signal_resample(signal, method="poly", sampling_rate=1000, desired_sampling_rate=500) # Upsample upsampled_interpolation = nk.signal_resample( @@ -90,12 +75,8 @@ def test_signal_resample(): upsampled_pandas = nk.signal_resample( downsampled_pandas, method="pandas", sampling_rate=500, desired_sampling_rate=1000 ) - upsampled_fft = nk.signal_resample( - downsampled_fft, method="FFT", sampling_rate=500, desired_sampling_rate=1000 - ) - upsampled_poly = nk.signal_resample( - downsampled_poly, method="poly", sampling_rate=500, desired_sampling_rate=1000 - ) + upsampled_fft = nk.signal_resample(downsampled_fft, method="FFT", sampling_rate=500, desired_sampling_rate=1000) + upsampled_poly = nk.signal_resample(downsampled_poly, method="poly", sampling_rate=500, desired_sampling_rate=1000) # Check rez = pd.DataFrame( @@ -111,7 +92,6 @@ def test_signal_resample(): def test_signal_detrend(): - signal = np.cos(np.linspace(start=0, stop=10, num=1000)) # Low freq signal += np.cos(np.linspace(start=0, stop=100, num=1000)) # High freq signal += 3 # Add baseline @@ -130,7 +110,6 @@ def test_signal_detrend(): def test_signal_filter(): - signal = np.cos(np.linspace(start=0, stop=10, num=1000)) # Low freq signal += np.cos(np.linspace(start=0, stop=100, num=1000)) # High freq filtered = nk.signal_filter(signal, highcut=10) @@ -148,9 +127,7 @@ def test_signal_filter(): powerline = np.sin(2 * np.pi * 50 * (samples / sampling_rate)) signal_corrupted = signal + powerline - signal_clean = nk.signal_filter( - signal_corrupted, sampling_rate=sampling_rate, method="powerline" - ) + signal_clean = nk.signal_filter(signal_corrupted, sampling_rate=sampling_rate, method="powerline") # import matplotlib.pyplot as plt # figure, (ax0, ax1, ax2) = plt.subplots(nrows=3, ncols=1, sharex=True) @@ -167,8 +144,7 @@ def test_signal_filter(): order = 2 signal_bandstop = nk.signal_filter( - signal_corrupted, sampling_rate=sampling_rate, lowcut=lowcut, highcut=highcut, method="butterworth", - order=order + signal_corrupted, sampling_rate=sampling_rate, lowcut=lowcut, highcut=highcut, method="butterworth", order=order ) freqs = [highcut, lowcut] @@ -186,28 +162,31 @@ def test_signal_filter(): assert np.allclose(signal_bandstop, signal_bandstop_scipy, atol=0.2) + def test_signal_filter_with_missing(): sampling_rate = 100 duration_not_missing = 10 frequency = 2 signal = np.concatenate( - [ - nk.signal_simulate(duration=duration_not_missing, sampling_rate=sampling_rate, frequency=frequency, random_state=42), - [np.nan] * 1000, - nk.signal_simulate(duration=duration_not_missing, sampling_rate=sampling_rate, frequency=frequency, random_state=43), - ] + [ + nk.signal_simulate( + duration=duration_not_missing, sampling_rate=sampling_rate, frequency=frequency, random_state=42 + ), + [np.nan] * 1000, + nk.signal_simulate( + duration=duration_not_missing, sampling_rate=sampling_rate, frequency=frequency, random_state=43 + ), + ] ) samples = np.arange(len(signal)) powerline = np.sin(2 * np.pi * 50 * (samples / sampling_rate)) signal_corrupted = signal + powerline - signal_clean = nk.signal_filter( - signal_corrupted, sampling_rate=sampling_rate, method="powerline" - ) + signal_clean = nk.signal_filter(signal_corrupted, sampling_rate=sampling_rate, method="powerline") assert signal_clean.size == signal.size assert np.allclose(signal_clean, signal, atol=0.2, equal_nan=True) -def test_signal_interpolate(): +def test_signal_interpolate(): x_axis = np.linspace(start=10, stop=30, num=10) signal = np.cos(x_axis) @@ -222,7 +201,7 @@ def test_signal_flatintervals(): one_minute = 60 tolerance = 60 - ecg = nk.ecg_simulate(duration=10*one_minute, sampling_rate=sampling_rate) + ecg = nk.ecg_simulate(duration=10 * one_minute, sampling_rate=sampling_rate) flatline_1 = np.full(10 * one_minute * sampling_rate, -4.0) flatline_2 = np.zeros(10 * one_minute * sampling_rate) signal = np.concatenate([ecg, flatline_1, ecg, flatline_2, ecg, flatline_1]) @@ -240,19 +219,15 @@ def test_signal_flatintervals(): def test_signal_findpeaks(): - signal1 = np.cos(np.linspace(start=0, stop=30, num=1000)) info1 = nk.signal_findpeaks(signal1) - signal2 = np.concatenate( - [np.arange(0, 20, 0.1), np.arange(17, 30, 0.1), np.arange(30, 10, -0.1)] - ) + signal2 = np.concatenate([np.arange(0, 20, 0.1), np.arange(17, 30, 0.1), np.arange(30, 10, -0.1)]) info2 = nk.signal_findpeaks(signal2) assert len(info1["Peaks"]) > len(info2["Peaks"]) def test_signal_merge(): - signal1 = np.cos(np.linspace(start=0, stop=10, num=100)) signal2 = np.cos(np.linspace(start=0, stop=20, num=100)) @@ -262,7 +237,6 @@ def test_signal_merge(): def test_signal_rate(): # since singal_rate wraps signal_period, the latter is tested as well - # Test with array. duration = 10 sampling_rate = 1000 @@ -286,15 +260,11 @@ def test_signal_rate(): # since singal_rate wraps signal_period, the latter is ) rsp_cleaned = nk.rsp_clean(rsp, sampling_rate=sampling_rate) signals, info = nk.rsp_peaks(rsp_cleaned) - rate = nk.signal_rate( - signals, sampling_rate=sampling_rate, desired_length=duration * sampling_rate - ) + rate = nk.signal_rate(signals, sampling_rate=sampling_rate, desired_length=duration * sampling_rate) assert rate.shape == (signals.shape[0],) # Test with dictionary.produced from rsp_findpeaks. - rate = nk.signal_rate( - info, sampling_rate=sampling_rate, desired_length=duration * sampling_rate - ) + rate = nk.signal_rate(info, sampling_rate=sampling_rate, desired_length=duration * sampling_rate) assert rate.shape == (duration * sampling_rate,) @@ -305,7 +275,6 @@ def test_signal_period(): def test_signal_plot(): - # Test with array signal = nk.signal_simulate(duration=10, sampling_rate=1000) nk.signal_plot(signal, sampling_rate=1000) @@ -350,7 +319,6 @@ def test_signal_plot(): def test_signal_power(): - signal1 = nk.signal_simulate(duration=20, frequency=1, sampling_rate=500) pwr1 = nk.signal_power(signal1, [[0.9, 1.6], [1.4, 2.0]], sampling_rate=500) @@ -361,10 +329,7 @@ def test_signal_power(): def test_signal_timefrequency(): - - signal = nk.signal_simulate(duration=50, frequency=5) + 2 * nk.signal_simulate( - duration=50, frequency=20 - ) + signal = nk.signal_simulate(duration=50, frequency=5) + 2 * nk.signal_simulate(duration=50, frequency=20) # short-time fourier transform frequency, time, stft = nk.signal_timefrequency( @@ -378,9 +343,7 @@ def test_signal_timefrequency(): assert np.sum(stft[indices_freq5]) < np.sum(stft[indices_freq20]) # wavelet transform - frequency, time, cwtm = nk.signal_timefrequency( - signal, method="cwt", max_frequency=50, show=False - ) + frequency, time, cwtm = nk.signal_timefrequency(signal, method="cwt", max_frequency=50, show=False) assert len(frequency) == cwtm.shape[0] assert len(time) == cwtm.shape[1] @@ -389,9 +352,7 @@ def test_signal_timefrequency(): assert np.sum(cwtm[indices_freq5]) < np.sum(cwtm[indices_freq20]) # wvd - frequency, time, wvd = nk.signal_timefrequency( - signal, method="wvd", max_frequency=50, show=False - ) + frequency, time, wvd = nk.signal_timefrequency(signal, method="wvd", max_frequency=50, show=False) assert len(frequency) == wvd.shape[0] assert len(time) == wvd.shape[1] indices_freq5 = np.logical_and(frequency > 3, frequency < 7) @@ -399,9 +360,7 @@ def test_signal_timefrequency(): assert np.sum(wvd[indices_freq5]) < np.sum(wvd[indices_freq20]) # pwvd - frequency, time, pwvd = nk.signal_timefrequency( - signal, method="pwvd", max_frequency=50, show=False - ) + frequency, time, pwvd = nk.signal_timefrequency(signal, method="pwvd", max_frequency=50, show=False) assert len(frequency) == pwvd.shape[0] assert len(time) == pwvd.shape[1] indices_freq5 = np.logical_and(frequency > 3, frequency < 7) @@ -422,15 +381,11 @@ def test_signal_distort(): signal = nk.signal_simulate(duration=10, frequency=0.5, sampling_rate=10) # Warning for nyquist criterion - with pytest.warns( - nk.misc.NeuroKitWarning, match=r"Skipping requested noise frequency.*cannot be resolved.*" - ): + with pytest.warns(nk.misc.NeuroKitWarning, match=r"Skipping requested noise frequency.*cannot be resolved.*"): nk.signal_distort(signal, sampling_rate=10, noise_amplitude=1, silent=False) # Warning for period duration - with pytest.warns( - nk.misc.NeuroKitWarning, match=r"Skipping requested noise frequency.*since its period of.*" - ): + with pytest.warns(nk.misc.NeuroKitWarning, match=r"Skipping requested noise frequency.*since its period of.*"): signal = nk.signal_simulate(duration=1, frequency=1, sampling_rate=10) nk.signal_distort(signal, noise_amplitude=1, noise_frequency=0.1, silent=False) @@ -451,13 +406,8 @@ def test_signal_surrogate(): assert np.allclose(np.mean(x), np.mean(surrogate)) assert np.allclose(np.var(x), np.var(surrogate)) # Check distribution - assert np.allclose( - np.histogram(x, 10, (0, 1))[0], - np.histogram(surrogate, 10, (0, 1))[0], - atol=1 - ) + assert np.allclose(np.histogram(x, 10, (0, 1))[0], np.histogram(surrogate, 10, (0, 1))[0], atol=1) # Check spectrum assert ( - np.mean(np.abs(np.abs(np.fft.rfft(surrogate - np.mean(surrogate))) - - np.abs(np.fft.rfft(x - np.mean(x))))) < 0.1 + np.mean(np.abs(np.abs(np.fft.rfft(surrogate - np.mean(surrogate))) - np.abs(np.fft.rfft(x - np.mean(x))))) < 0.1 ) From 563f98ca100b005e470e81a7da1ad6a5dc1009e7 Mon Sep 17 00:00:00 2001 From: Jonas Chromik Date: Thu, 14 Dec 2023 14:32:36 +0100 Subject: [PATCH 3/8] Add example to docs --- neurokit2/signal/signal_flatintervals.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/neurokit2/signal/signal_flatintervals.py b/neurokit2/signal/signal_flatintervals.py index 5e392fff8e..7710caee9b 100644 --- a/neurokit2/signal/signal_flatintervals.py +++ b/neurokit2/signal/signal_flatintervals.py @@ -24,9 +24,23 @@ def signal_flatintervals(signal, sampling_rate, threshold=0.01, tolerance=60): ------- list Returns a list of tuples: - flatline_starts: Indices where a flatline part starts. - flatline_ends: Indices where a flatline part ends. + [(flatline_starts1, flatline_ends1), (flatline_starts2, flatline_ends2), ...] + flatline_starts: Index where a flatline part starts. + flatline_ends: Index where a flatline part ends. + Examples + -------- + .. ipython:: python + + import neurokit2 as nk + + ecg = nk.ecg_simulate(duration=10 * one_minute, sampling_rate=sampling_rate) + flatline_1 = np.full(10 * one_minute * sampling_rate, -4.0) + flatline_2 = np.zeros(10 * one_minute * sampling_rate) + signal = np.concatenate([ecg, flatline_1, ecg, flatline_2, ecg, flatline_1]) + + nk.signal_flatintervals(signal) + """ # Identify flanks: +1 for beginning plateau; -1 for ending plateau. From ed3ab1442fa8386c546b1d025e67799290d7f489 Mon Sep 17 00:00:00 2001 From: Jonas Chromik Date: Thu, 14 Dec 2023 14:42:31 +0100 Subject: [PATCH 4/8] fix example in docs for signal_flatintervals --- neurokit2/signal/signal_flatintervals.py | 1 + 1 file changed, 1 insertion(+) diff --git a/neurokit2/signal/signal_flatintervals.py b/neurokit2/signal/signal_flatintervals.py index 7710caee9b..0382682866 100644 --- a/neurokit2/signal/signal_flatintervals.py +++ b/neurokit2/signal/signal_flatintervals.py @@ -33,6 +33,7 @@ def signal_flatintervals(signal, sampling_rate, threshold=0.01, tolerance=60): .. ipython:: python import neurokit2 as nk + import numpy as np ecg = nk.ecg_simulate(duration=10 * one_minute, sampling_rate=sampling_rate) flatline_1 = np.full(10 * one_minute * sampling_rate, -4.0) From 18eadf3b949d27edf10470de118dfec3476bdc17 Mon Sep 17 00:00:00 2001 From: Jonas Chromik Date: Wed, 20 Dec 2023 14:57:44 +0100 Subject: [PATCH 5/8] rename parameter tolerance to duration_min --- neurokit2/signal/signal_flatintervals.py | 16 ++++++++-------- tests/tests_signal.py | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/neurokit2/signal/signal_flatintervals.py b/neurokit2/signal/signal_flatintervals.py index 0382682866..cbedfe740c 100644 --- a/neurokit2/signal/signal_flatintervals.py +++ b/neurokit2/signal/signal_flatintervals.py @@ -2,7 +2,7 @@ import numpy as np -def signal_flatintervals(signal, sampling_rate, threshold=0.01, tolerance=60): +def signal_flatintervals(signal, sampling_rate, threshold=0.01, duration_min=60): """Finds flatline areas in a signal. Parameters @@ -15,7 +15,7 @@ def signal_flatintervals(signal, sampling_rate, threshold=0.01, tolerance=60): Flatline threshold relative to the biggest change in the signal. This is the percentage of the maximum value of absolute consecutive differences. Default: 0.01 (= 1% of the biggest change in the signal) - tolerance : int, optional + duration_min : int, optional Determines how fine-grained the resulting signal is, i.e. how long (in seconds) can a flatline part be without being recognised as such. Default: 60 (seconds) @@ -45,13 +45,13 @@ def signal_flatintervals(signal, sampling_rate, threshold=0.01, tolerance=60): """ # Identify flanks: +1 for beginning plateau; -1 for ending plateau. - flanks = np.diff(_find_flatlines(signal, sampling_rate, threshold, tolerance).astype(int)) + flanks = np.diff(_find_flatlines(signal, sampling_rate, threshold, duration_min).astype(int)) flatline_starts = np.flatnonzero(flanks > 0) flatline_ends = np.flatnonzero(flanks < 0) # Correct offsets from moving average - flatline_starts = flatline_starts + sampling_rate * tolerance + flatline_starts = flatline_starts + sampling_rate * duration_min # Insert start marker at signal start if a start marker is missing if len(flatline_starts) < len(flatline_ends): @@ -65,7 +65,7 @@ def signal_flatintervals(signal, sampling_rate, threshold=0.01, tolerance=60): return [(start, end) for start, end in zip(flatline_starts, flatline_ends) if start < end] -def _find_flatlines(signal, sampling_rate, threshold=0.01, tolerance=60): +def _find_flatlines(signal, sampling_rate, threshold=0.01, duration_min=60): """Finds flatline areas in a signal. Parameters @@ -78,7 +78,7 @@ def _find_flatlines(signal, sampling_rate, threshold=0.01, tolerance=60): Flatline threshold relative to the biggest change in the signal. This is the percentage of the maximum value of absolute consecutive differences. Default: 0.01 (= 1% of the biggest change in the signal) - tolerance : int, optional + duration_min : int, optional Determines how fine-grained the resulting signal is, i.e. how long (in seconds) can a flatline part be without being recognised as such. Default: 60 (seconds) @@ -88,13 +88,13 @@ def _find_flatlines(signal, sampling_rate, threshold=0.01, tolerance=60): np.array Returns a signal that is True/1 where there is a sufficiently long flatline part in the signal and False/0 otherwise. - Note: Returned signal is shorter than the original signal by (sampling_rate * tolerance) - 1. + Note: Returned signal is shorter than the original signal by (sampling_rate * duration_min) - 1. """ abs_diff = np.abs(np.diff(signal)) threshold = threshold * np.max(abs_diff) - return _moving_average(abs_diff >= threshold, sampling_rate * tolerance) < threshold + return _moving_average(abs_diff >= threshold, sampling_rate * duration_min) < threshold def _moving_average(signal, window_size): diff --git a/tests/tests_signal.py b/tests/tests_signal.py index 73021acacf..ca2a2ad325 100644 --- a/tests/tests_signal.py +++ b/tests/tests_signal.py @@ -199,14 +199,14 @@ def test_signal_interpolate(): def test_signal_flatintervals(): sampling_rate = 128 one_minute = 60 - tolerance = 60 + duration_min = 60 ecg = nk.ecg_simulate(duration=10 * one_minute, sampling_rate=sampling_rate) flatline_1 = np.full(10 * one_minute * sampling_rate, -4.0) flatline_2 = np.zeros(10 * one_minute * sampling_rate) signal = np.concatenate([ecg, flatline_1, ecg, flatline_2, ecg, flatline_1]) - flatintervals = np.array(nk.signal_flatintervals(signal, sampling_rate, tolerance=tolerance)) / sampling_rate + flatintervals = np.array(nk.signal_flatintervals(signal, sampling_rate, duration_min=duration_min)) / sampling_rate assert len(flatintervals) == 3 ground_truth = np.array([(10, 20), (30, 40), (50, 60)]) * one_minute @@ -214,8 +214,8 @@ def test_signal_flatintervals(): for interval, truth in zip(flatintervals, ground_truth): interval_begin, interval_end = interval true_begin, true_end = truth - assert interval_begin > true_begin and interval_begin < true_begin + tolerance - assert interval_end > true_end - tolerance and interval_end < true_end + assert interval_begin > true_begin and interval_begin < true_begin + duration_min + assert interval_end > true_end - duration_min and interval_end < true_end def test_signal_findpeaks(): From c9f58f88f93ef38a45f31073dd18296143b3bd49 Mon Sep 17 00:00:00 2001 From: Jonas Chromik Date: Wed, 20 Dec 2023 15:18:25 +0100 Subject: [PATCH 6/8] adjusts tests_signal.py to increase test coverage (and code checks/formatting) --- neurokit2/signal/signal_flatintervals.py | 2 +- tests/tests_signal.py | 33 +++++++++++++++++++----- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/neurokit2/signal/signal_flatintervals.py b/neurokit2/signal/signal_flatintervals.py index cbedfe740c..aa73ad11bb 100644 --- a/neurokit2/signal/signal_flatintervals.py +++ b/neurokit2/signal/signal_flatintervals.py @@ -41,7 +41,7 @@ def signal_flatintervals(signal, sampling_rate, threshold=0.01, duration_min=60) signal = np.concatenate([ecg, flatline_1, ecg, flatline_2, ecg, flatline_1]) nk.signal_flatintervals(signal) - + """ # Identify flanks: +1 for beginning plateau; -1 for ending plateau. diff --git a/tests/tests_signal.py b/tests/tests_signal.py index ca2a2ad325..9f92448a2c 100644 --- a/tests/tests_signal.py +++ b/tests/tests_signal.py @@ -197,24 +197,45 @@ def test_signal_interpolate(): def test_signal_flatintervals(): + # parameters sampling_rate = 128 one_minute = 60 duration_min = 60 + # signal components ecg = nk.ecg_simulate(duration=10 * one_minute, sampling_rate=sampling_rate) flatline_1 = np.full(10 * one_minute * sampling_rate, -4.0) flatline_2 = np.zeros(10 * one_minute * sampling_rate) - signal = np.concatenate([ecg, flatline_1, ecg, flatline_2, ecg, flatline_1]) - flatintervals = np.array(nk.signal_flatintervals(signal, sampling_rate, duration_min=duration_min)) / sampling_rate + # test signals + signal1 = np.concatenate([ecg, flatline_1, ecg, flatline_2, ecg, flatline_1]) + signal2 = np.concatenate([flatline_1, ecg, flatline_2, ecg, flatline_1, ecg]) + ground_truth1 = np.array([(10, 20), (30, 40), (50, 60)]) * one_minute + ground_truth2 = np.array([(0, 10), (20, 30), (40, 50)]) * one_minute - assert len(flatintervals) == 3 - ground_truth = np.array([(10, 20), (30, 40), (50, 60)]) * one_minute + # flatline interval detection + flatintervals1 = ( + np.array(nk.signal_flatintervals(signal1, sampling_rate, duration_min=duration_min)) / sampling_rate + ) + flatintervals2 = ( + np.array(nk.signal_flatintervals(signal2, sampling_rate, duration_min=duration_min)) / sampling_rate + ) + + # check correct number of flatline intervals + assert len(flatintervals1) == 3 + assert len(flatintervals2) == 3 + + # check correct detection of flatline intervals + for interval, truth in zip(flatintervals1, ground_truth1): + interval_begin, interval_end = interval + true_begin, true_end = truth + assert interval_begin >= true_begin and interval_begin < true_begin + duration_min + assert interval_end > true_end - duration_min and interval_end < true_end - for interval, truth in zip(flatintervals, ground_truth): + for interval, truth in zip(flatintervals2, ground_truth2): interval_begin, interval_end = interval true_begin, true_end = truth - assert interval_begin > true_begin and interval_begin < true_begin + duration_min + assert interval_begin >= true_begin and interval_begin < true_begin + duration_min assert interval_end > true_end - duration_min and interval_end < true_end From 59d0c830123da64445c5e1553e298a0b37d566fe Mon Sep 17 00:00:00 2001 From: Jonas Chromik Date: Wed, 20 Dec 2023 15:39:14 +0100 Subject: [PATCH 7/8] fix error in docs example --- neurokit2/signal/signal_flatintervals.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/neurokit2/signal/signal_flatintervals.py b/neurokit2/signal/signal_flatintervals.py index aa73ad11bb..4f3177b3da 100644 --- a/neurokit2/signal/signal_flatintervals.py +++ b/neurokit2/signal/signal_flatintervals.py @@ -35,6 +35,8 @@ def signal_flatintervals(signal, sampling_rate, threshold=0.01, duration_min=60) import neurokit2 as nk import numpy as np + one_minute = 60 # seconds + ecg = nk.ecg_simulate(duration=10 * one_minute, sampling_rate=sampling_rate) flatline_1 = np.full(10 * one_minute * sampling_rate, -4.0) flatline_2 = np.zeros(10 * one_minute * sampling_rate) From 607d482a1cf9a97d3e9095de1d81b78db3a5ec38 Mon Sep 17 00:00:00 2001 From: Jonas Chromik Date: Thu, 21 Dec 2023 09:33:35 +0100 Subject: [PATCH 8/8] another attempt at fixing doc test --- neurokit2/signal/signal_flatintervals.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/neurokit2/signal/signal_flatintervals.py b/neurokit2/signal/signal_flatintervals.py index 4f3177b3da..b1cf44c461 100644 --- a/neurokit2/signal/signal_flatintervals.py +++ b/neurokit2/signal/signal_flatintervals.py @@ -35,6 +35,7 @@ def signal_flatintervals(signal, sampling_rate, threshold=0.01, duration_min=60) import neurokit2 as nk import numpy as np + sampling_rate = 128 one_minute = 60 # seconds ecg = nk.ecg_simulate(duration=10 * one_minute, sampling_rate=sampling_rate) @@ -42,7 +43,7 @@ def signal_flatintervals(signal, sampling_rate, threshold=0.01, duration_min=60) flatline_2 = np.zeros(10 * one_minute * sampling_rate) signal = np.concatenate([ecg, flatline_1, ecg, flatline_2, ecg, flatline_1]) - nk.signal_flatintervals(signal) + nk.signal_flatintervals(signal, sampling_rate=sampling_rate) """