Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add signal_flatintervals function for flatline detection #940

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
2 changes: 2 additions & 0 deletions neurokit2/signal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .signal_filter import signal_filter
from .signal_findpeaks import signal_findpeaks
from .signal_fixpeaks import signal_fixpeaks
from .signal_flatintervals import signal_flatintervals
from .signal_flatline import signal_flatline
from .signal_formatpeaks import signal_formatpeaks
from .signal_interpolate import signal_interpolate
Expand Down Expand Up @@ -41,6 +42,7 @@
"signal_distort",
"signal_interpolate",
"signal_detrend",
"signal_flatintervals",
"signal_findpeaks",
"signal_fixpeaks",
"signal_formatpeaks",
Expand Down
121 changes: 121 additions & 0 deletions neurokit2/signal/signal_flatintervals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
import numpy as np


def signal_flatintervals(signal, sampling_rate, threshold=0.01, duration_min=60):
"""Finds flatline areas in a signal.

Parameters
----------
signal : Union[list, np.array]
The signal as a vector of values.
sampling_rate : int
The sampling rate of the signal, i.e. how many samples (values) per second.
threshold : float, optional
Flatline threshold relative to the biggest change in the signal.
This is the percentage of the maximum value of absolute consecutive differences.
Default: 0.01 (= 1% of the biggest change in the signal)
duration_min : int, optional
Determines how fine-grained the resulting signal is,
i.e. how long (in seconds) can a flatline part be without being recognised as such.
Default: 60 (seconds)

Returns
-------
list
Returns a list of tuples:
[(flatline_starts1, flatline_ends1), (flatline_starts2, flatline_ends2), ...]
flatline_starts: Index where a flatline part starts.
flatline_ends: Index where a flatline part ends.

Examples
--------
.. ipython:: python

import neurokit2 as nk
import numpy as np

sampling_rate = 128
one_minute = 60 # seconds

ecg = nk.ecg_simulate(duration=10 * one_minute, sampling_rate=sampling_rate)
flatline_1 = np.full(10 * one_minute * sampling_rate, -4.0)
flatline_2 = np.zeros(10 * one_minute * sampling_rate)
signal = np.concatenate([ecg, flatline_1, ecg, flatline_2, ecg, flatline_1])

nk.signal_flatintervals(signal, sampling_rate=sampling_rate)

"""

# Identify flanks: +1 for beginning plateau; -1 for ending plateau.
flanks = np.diff(_find_flatlines(signal, sampling_rate, threshold, duration_min).astype(int))

flatline_starts = np.flatnonzero(flanks > 0)
flatline_ends = np.flatnonzero(flanks < 0)

# Correct offsets from moving average
flatline_starts = flatline_starts + sampling_rate * duration_min

# Insert start marker at signal start if a start marker is missing
if len(flatline_starts) < len(flatline_ends):
flatline_starts = np.insert(flatline_starts, 0, 0)

# Insert end marker at signal end if an end marker is missing
if len(flatline_ends) < len(flatline_starts):
flatline_ends = np.append(flatline_ends, [len(signal) - 1])

# Return instances where start < end (start >= end might occur due to offset correction).
return [(start, end) for start, end in zip(flatline_starts, flatline_ends) if start < end]


def _find_flatlines(signal, sampling_rate, threshold=0.01, duration_min=60):
"""Finds flatline areas in a signal.

Parameters
----------
signal : Union[list, np.array]
The signal as a vector of values.
sampling_rate : int
The sampling rate of the signal, i.e. how many samples (values) per second.
threshold : float, optional
Flatline threshold relative to the biggest change in the signal.
This is the percentage of the maximum value of absolute consecutive differences.
Default: 0.01 (= 1% of the biggest change in the signal)
duration_min : int, optional
Determines how fine-grained the resulting signal is,
i.e. how long (in seconds) can a flatline part be without being recognised as such.
Default: 60 (seconds)

Returns
-------
np.array
Returns a signal that is True/1 where there is a sufficiently long
flatline part in the signal and False/0 otherwise.
Note: Returned signal is shorter than the original signal by (sampling_rate * duration_min) - 1.

"""
abs_diff = np.abs(np.diff(signal))
threshold = threshold * np.max(abs_diff)

return _moving_average(abs_diff >= threshold, sampling_rate * duration_min) < threshold


def _moving_average(signal, window_size):
"""Moving window average on a signal.

Parameters
----------
signal : Union[list, np.array]
The signal as a vector of values.
window_size : int
How many consequtive samples are used for averaging.

Returns
-------
np.array
Returns a signal of averages from the original signal.
Note: The returned signal is shorter than the original signal by window_size - 1.

"""

return np.convolve(signal, np.ones(window_size), "valid") / window_size
Loading
Loading