From 3c67b35787e72039edf032d1b1eb007a625412fa Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Thu, 21 Mar 2024 18:50:34 +0100 Subject: [PATCH] enh: deep revision of the new QC functions * Some "preprocessing" functions will be moved to previous steps in the pipeline (as compute nodes in the workflow). Therefore, they are moved into the interfaces module. * Clarified and re-worked docstrings * Added type annotations. * Added to documentation rendering --- docs/source/iqms/dwi.rst | 8 + mriqc/qc/__init__.py | 1 + mriqc/qc/diffusion.py | 388 ++++++++++++++++--------------- mriqc/qc/tests/test_diffusion.py | 33 +-- 4 files changed, 231 insertions(+), 199 deletions(-) create mode 100644 docs/source/iqms/dwi.rst diff --git a/docs/source/iqms/dwi.rst b/docs/source/iqms/dwi.rst new file mode 100644 index 000000000..640335b31 --- /dev/null +++ b/docs/source/iqms/dwi.rst @@ -0,0 +1,8 @@ +.. _iqms_dwi: + +IQMs for diffusion images +========================= +.. automodule:: mriqc.qc.diffusion + :members: + :undoc-members: + :show-inheritance: diff --git a/mriqc/qc/__init__.py b/mriqc/qc/__init__.py index e103e0888..7906704bd 100644 --- a/mriqc/qc/__init__.py +++ b/mriqc/qc/__init__.py @@ -57,6 +57,7 @@ iqms/t1w iqms/bold + iqms/dwi diff --git a/mriqc/qc/diffusion.py b/mriqc/qc/diffusion.py index d97954826..c787e6cd2 100644 --- a/mriqc/qc/diffusion.py +++ b/mriqc/qc/diffusion.py @@ -25,249 +25,269 @@ """ Image quality metrics for diffusion MRI data ============================================ -""" -import numpy as np -from dipy.core.gradients import GradientTable -from dipy.reconst.dti import TensorModel -from dipy.denoise.noise_estimate import piesno -from dipy.core.gradients import unique_bvals_magnitude -from dipy.core.gradients import round_bvals -from dipy.segment.mask import segment_from_cfa -from dipy.segment.mask import bounding_box +IQMs relating to spatial information +------------------------------------ +Definitions are given in the :ref:`summary of structural IQMs `. +.. _iqms_efc: -def noise_func(img, gtab): - pass +- **Entropy-focus criterion** (:py:func:`~mriqc.qc.anatomical.efc`). +.. _iqms_fber: -def noise_b0(data, gtab, mask=None): - """ - Estimate noise in raw dMRI based on b0 variance. +- **Foreground-Background energy ratio** (:py:func:`~mriqc.qc.anatomical.fber`, [Shehzad2015]_). - Parameters - ---------- - """ - if mask is None: - mask = np.ones(data.shape[:3], dtype=bool) - b0 = data[..., ~gtab.b0s_mask] - return np.percentile(np.var(b0[mask], -1), (25, 50, 75)) +.. _iqms_fwhm: +- **Full-width half maximum smoothness** (``fwhm_*``, see [Friedman2008]_). -def noise_piesno(data, n_channels=4): - """ - Estimate noise in raw dMRI data using the PIESNO [1]_ algorithm. +.. _iqms_snr: +- **Signal-to-noise ratio** (:py:func:`~mriqc.qc.anatomical.snr`). - Parameters - ---------- +.. _iqms_summary: - Returns - ------- +- **Summary statistics** (:py:func:`~mriqc.qc.anatomical.summary_stats`). +IQMs relating to diffusion weighting +------------------------------------ +IQMs specific to diffusion weighted imaging. - Notes - ----- +.. _iqms_piesno: - .. [1] Koay C.G., E. Ozarslan, C. Pierpaoli. Probabilistic Identification - and Estimation of Noise (PIESNO): A self-consistent approach and - its applications in MRI. JMR, 199(1):94-103, 2009. - """ - sigma, mask = piesno(data, N=n_channels, return_mask=True) - return sigma, mask +Noise in raw dMRI estimated with PIESNO (``piesno_sigma``) + Employs PIESNO (Probabilistic Identification and Estimation + of Noise) algorithm [1]_ to estimate the standard deviation (sigma) of the + noise in each voxel of a 4D dMRI data array. +.. _iqms_cc_snr: -def cc_snr(data, gtab, bmag=None, mask=None): - """ - Calculate worse-/best-case signal-to-noise ratio in the corpus callosum +SNR estimated in the Corpus Callosum (``cc_snr``) + Worst-case and best-case signal-to-noise ratio (SNR) within the corpus callosum. - Parameters - ---------- - data : ndarray +IQMs relating artifacts and other +--------------------------------- +IQMs targeting artifacts that are specific of DWI images. - gtab : GradientTable class instance or tuple +.. _iqms_spike_percentage: - bmag : int - From dipy.core.gradients: - The order of magnitude that the bvalues have to differ to be - considered an unique b-value. B-values are also rounded up to - this order of magnitude. Default: derive this value from the - maximal b-value provided: $bmag=log_{10}(max(bvals)) - 1$. +Global and slice-wise spike percentages (``spike_perc``) + Voxels classified as spikes. The spikes mask is calculated by identifing + voxels with signal intensities exceeding a threshold based on standard + deviations above the mean. - mask : numpy array - Boolean brain mask - """ - if isinstance(gtab, GradientTable): - pass +""" +from __future__ import annotations - if mask is None: - mask = np.ones(data.shape[:3]) +import numpy as np - tenmodel = TensorModel(gtab) - tensorfit = tenmodel.fit(data, mask=mask) - threshold = (0.6, 1, 0, 0.1, 0, 0.1) - CC_box = np.zeros_like(data[..., 0]) +def noise_b0( + in_b0: np.ndarray, + percentiles: tuple[float, float, float] = (25., 50., 75.), + mask: np.ndarray | None = None +) -> dict[str, float]: + """ + Estimates noise levels in raw dMRI data using the variance of the $b$=0 volumes. - mins, maxs = bounding_box(mask) # mask needs to be volume - mins = np.array(mins) - maxs = np.array(maxs) - diff = (maxs - mins) // 4 - bounds_min = mins + diff - bounds_max = maxs - diff + This function calculates the variance of the $b$=0 volumes in a 4D dMRI array + within a provided mask. It then computes the noise estimates at specified + percentiles of the variance distribution. This approach assumes that noise primarily + contributes to the lower end of the variance distribution. - CC_box[bounds_min[0]:bounds_max[0], - bounds_min[1]:bounds_max[1], - bounds_min[2]:bounds_max[2]] = 1 + Parameters: + ---------- + in_b0 : :obj:`~numpy.ndarray` + The 3D or 4D dMRI data array. If 4D, the first volume (assumed to be + the $b$=0 image) is used for noise estimation. + percentiles : :obj:`tuple`(float, float, float), optional (default=(25, 50, 75)) + A tuple of three integers specifying the percentiles of the variance + distribution to use for noise estimation. These percentiles represent + different noise levels within the data. + mask : :obj:`~numpy.ndarray`, optional (default=``None``) + A boolean mask used to restrict the noise estimation to specific brain regions. + If ``None``, a mask of ones with the same shape as the first 3 dimensions of + ``in_b0`` is used. + + Returns: + ------- + noise_estimates : :obj:`dict` + A dictionary containing the noise estimates at the specified percentiles: + * keys: :obj:`str` - Percentile values (e.g., '25', '50', '75'). + * values: :obj:`float` - Noise level estimates at the corresponding percentiles. + """ - mask_cc_part, cfa = segment_from_cfa(tensorfit, CC_box, threshold, - return_cfa=True) + if in_b0.ndim != 4: + return None - b0_data = data[..., gtab.b0s_mask] - std_signal = np.std(b0_data[mask_cc_part], axis=-1) + data = in_b0[ + np.ones(in_b0.shape[:3], dtype=bool) if mask is None else mask + ] + variance = np.var(data, -1) + noise_estimates = dict(zip( + (f'{p}' for p in percentiles), + np.percentile(variance, percentiles), + )) - # Per-shell calculation - rounded_bvals = round_bvals(gtab.bvals, bmag) - bvals = unique_bvals_magnitude(gtab.bvals, bmag) + return noise_estimates - cc_snr_best = np.zeros(gtab.bvals.shape) - cc_snr_worst = np.zeros(gtab.bvals.shape) - for ind, bval in enumerate(bvals): - if bval == 0: - mean_signal = np.mean(data[..., rounded_bvals == 0], axis=-1) - cc_snr_worst[ind] = np.mean(mean_signal / std_signal) - cc_snr_best[ind] = np.mean(mean_signal / std_signal) - continue +def noise_piesno(data: np.ndarray, n_channels: int = 4) -> (np.ndarray, np.ndarray): + """ + Estimates noise in raw diffusion MRI (dMRI) data using the PIESNO algorithm. - bval_data = data[..., rounded_bvals == bval] - bval_bvecs = gtab.bvecs[rounded_bvals == bval] + This function implements the PIESNO (Probabilistic Identification and Estimation + of Noise) algorithm [1]_ to estimate the standard deviation (sigma) of the + noise in each voxel of a 4D dMRI data array. The PIESNO algorithm assumes Rician + distributed signal and exploits the statistical properties of the noise to + separate it from the underlying signal. - axis_X = np.argmin(np.sum( - (bval_bvecs - np.array([1, 0, 0])) ** 2, axis=-1)) - axis_Y = np.argmin(np.sum( - (bval_bvecs - np.array([0, 1, 0])) ** 2, axis=-1)) - axis_Z = np.argmin(np.sum( - (bval_bvecs - np.array([0, 0, 1])) ** 2, axis=-1)) + Parameters + ---------- + data : :obj:`~numpy.ndarray` + The 4D raw dMRI data array. + n_channels : :obj:`int`, optional (default=4) + The number of diffusion-encoding channels in the data. This value is used + internally by the PIESNO algorithm. - data_X = bval_data[..., axis_X] - data_Y = bval_data[..., axis_Y] - data_Z = bval_data[..., axis_Z] + Returns + ------- + sigma : :obj:`~numpy.ndarray` + The estimated noise standard deviation for each voxel in the data array. + mask : :obj:`~numpy.ndarray` + A brain mask estimated by PIESNO. This mask identifies voxels containing + mostly noise and can be used for further processing. - mean_signal_X = np.mean(data_X[mask_cc_part]) - mean_signal_Y = np.mean(data_Y[mask_cc_part]) - mean_signal_Z = np.mean(data_Z[mask_cc_part]) + Notes + ----- - cc_snr_worst[ind] = np.mean(mean_signal_X / std_signal) - cc_snr_best[ind] = np.mean(np.mean(mean_signal_Y, - mean_signal_Z) / std_signal) + .. [1] Koay C.G., E. Ozarslan, C. Pierpaoli. Probabilistic Identification + and Estimation of Noise (PIESNO): A self-consistent approach and + its applications in MRI. JMR, 199(1):94-103, 2009. + """ + from dipy.denoise.noise_estimate import piesno - return cc_snr_worst, cc_snr_best + sigma, mask = piesno(data, N=n_channels, return_mask=True) + return sigma, mask -def get_spike_mask(data, z_threshold=3, grouping_vals=None, bmag=None): +def cc_snr( + in_b0: np.ndarray, + dwi_shells: list[np.ndarray], + cc_mask: np.ndarray, + b_values: np.ndarray, + b_vectors: np.ndarray, +) -> dict[int, (float, float)]: """ - Return binary mask of spike/no spike + Calculates the worst-case and best-case signal-to-noise ratio (SNR) within the corpus callosum. + + This function estimates the SNR in the corpus callosum (CC) by comparing the + mean signal intensity within the CC mask to the standard deviation of the background + signal (extracted from the b0 image). It performs separate calculations for + each diffusion-weighted imaging (DWI) shell. + + **Worst-case SNR:** The mean signal intensity along the diffusion direction with the + lowest signal is considered the worst-case scenario. + + **Best-case SNR:** The mean signal intensity averaged across the two diffusion + directions with the highest signal is considered the best-case scenario. Parameters ---------- - data : numpy array - Data to be thresholded - z_threshold : :obj:`float` - Number of standard deviations above the mean to use as spike threshold - grouping_vals : numpy array - Values by which to group data for thresholding (bvals or full mask) - bmag : int - From dipy.core.gradients: - The order of magnitude that the bvalues have to differ to be - considered an unique b-value. B-values are also rounded up to - this order of magnitude. Default: derive this value from the - maximal b-value provided: $bmag=log_{10}(max(bvals)) - 1$. + in_b0 : :obj:`~numpy.ndarray` (float, 3D) + T1-weighted or b0 image used for background signal estimation. + dwi_shells : list[:obj:`~numpy.ndarray` (float, 4D)] + List of DWI data for each diffusion shell. + cc_mask : :obj:`~numpy.ndarray` (bool, 3D) + Boolean mask of the corpus callosum. + b_values : :obj:`~numpy.ndarray` (int) + Array of b-values for each DWI volume in ``dwi_shells``. + b_vectors : :obj:`~numpy.ndarray` (float) + Array of diffusion-encoding vectors for each DWI volume in ``dwi_shells``. Returns - --------- - numpy array + ------- + cc_snr_estimates : :obj:`dict` + Dictionary containing SNR estimates for each b-value. Keys are the b-values + (integers), and values are tuples containing two elements: + * The first element is the worst-case SNR (float). + * The second element is the best-case SNR (float). """ - if grouping_vals is None: - threshold = (z_threshold * np.std(data)) + np.mean(data) - spike_mask = data > threshold - return spike_mask - - threshold_mask = np.zeros(data.shape) - - rounded_grouping_vals = round_bvals(grouping_vals, bmag) - gvals = unique_bvals_magnitude(grouping_vals, bmag) - if grouping_vals.shape == data.shape: - for gval in gvals: - gval_data = data[rounded_grouping_vals == gval] - gval_threshold = ((z_threshold * np.std(gval_data)) - + np.mean(gval_data)) - threshold_mask[rounded_grouping_vals == gval] = ( - gval_threshold * np.ones(gval_data.shape)) - else: - for gval in gvals: - gval_data = data[..., rounded_grouping_vals == gval] - gval_threshold = ((z_threshold * np.std(gval_data)) - + np.mean(gval_data)) - threshold_mask[..., rounded_grouping_vals == gval] = ( - gval_threshold * np.ones(gval_data.shape)) + std_signal = in_b0[cc_mask].std() - spike_mask = data > threshold_mask + cc_snr_estimates = {} - return spike_mask + # Shell-wise calculation + for bval, bvecs, shell_data in zip(b_values, b_vectors, dwi_shells): + if bval == 0: + cc_snr_estimates[int(bval)] = in_b0[cc_mask].mean() / std_signal + continue + # Find main directions of diffusion + axis_X = np.argmin(np.sum( + (bvecs - np.array([1, 0, 0])) ** 2, axis=-1)) + axis_Y = np.argmin(np.sum( + (bvecs - np.array([0, 1, 0])) ** 2, axis=-1)) + axis_Z = np.argmin(np.sum( + (bvecs - np.array([0, 0, 1])) ** 2, axis=-1)) -def get_slice_spike_percentage(data, z_threshold=3, slice_threshold=.05): - """ - Return percentage of slices spiking along each dimension + data_X = shell_data[..., axis_X] + data_Y = shell_data[..., axis_Y] + data_Z = shell_data[..., axis_Z] - Parameters - ---------- - data : numpy array - Data to be thresholded - z_threshold : :obj:`float` - Number of standard deviations above the mean to use as spike threshold. - slice_threshold : :obj:`float` - Percentage of slice elements that need to be above spike threshold - for slice to be considered spiking. + mean_signal_X = np.mean(data_X[cc_mask]) + mean_signal_Y = np.mean(data_Y[cc_mask]) + mean_signal_Z = np.mean(data_Z[cc_mask]) - Returns - --------- - array - """ - spike_mask = get_spike_mask(data, z_threshold) + cc_snr_estimates[int(bval)] = ( + np.mean(mean_signal_X / std_signal), # worst + np.mean(np.mean(mean_signal_Y, mean_signal_Z) / std_signal), # best + ) - ndim = data.ndim - slice_spike_percentage = np.zeros(ndim) + return cc_snr_estimates - for ii in range(ndim): - slice_spike_percentage[ii] = np.mean(np.mean(spike_mask, ii) - > slice_threshold) - return slice_spike_percentage +def spike_percentage( + data: np.ndarray, + spike_mask: np.ndarray, + slice_threshold: float = 0.05, +) -> dict[str, float | np.ndarray]: + """ + Calculates the percentage of voxels classified as spikes (global and slice-wise). + This function computes two metrics: -def get_global_spike_percentage(data, z_threshold=3): - """ - Return percentage of array elements spiking + * Global spike percentage: The average fraction of voxels exceeding the spike + threshold across the entire data array. + * Slice-wise spiking percentage: The fraction of slices along each dimension of + the data array where the average fraction of spiking voxels within the slice + exceeds a user-defined threshold (``slice_threshold``). - Parameters + Parameters: ---------- - data : numpy array - Data to be thresholded - z_threshold : :obj:`float` - Number of standard deviations above the mean to use as spike threshold - - Returns - --------- - float + data : :obj:`~numpy.ndarray` (float, 4D) + The data array used to generate the spike mask. + spike_mask : :obj:`~numpy.ndarray` (bool, same shape as data) + The binary mask indicating spike voxels (True) and non-spike voxels (False). + slice_threshold : :obj:`float`, optional (default=0.05) + The minimum fraction of voxels in a slice that must be classified as spikes + for the slice to be considered spiking. + + Returns: + ------- + :obj:`dict` + A dictionary containing the calculated spike percentages: + * 'spike_perc_global': :obj:`float` - Global percentage of spiking voxels. + * 'spike_perc_slice': :obj:`list` of :obj:`float` - List of slice-wise + spiking percentages for each dimension of the data array. """ - spike_mask = get_spike_mask(data, z_threshold) - global_spike_percentage = np.mean(np.ravel(spike_mask)) - - return global_spike_percentage + spike_perc_global = float(np.mean(np.ravel(spike_mask))) + spike_perc_slice = [ + float(np.mean(np.mean(spike_mask, axis=axis) > slice_threshold)) + for axis in range(data.ndim) + ] -def noise_func_for_shelled_data(shelled_data, gtab): - pass + return {'spike_perc_global': spike_perc_global, 'spike_perc_slice': spike_perc_slice} diff --git a/mriqc/qc/tests/test_diffusion.py b/mriqc/qc/tests/test_diffusion.py index 1cdd497ee..3058c9303 100644 --- a/mriqc/qc/tests/test_diffusion.py +++ b/mriqc/qc/tests/test_diffusion.py @@ -21,30 +21,33 @@ # https://www.nipreps.org/community/licensing/ # -import pytest -import numpy as np +import os.path as op + import nibabel as nib -from dipy.core.gradients import gradient_table +import numpy as np +import pytest +from dipy.core.gradients import gradient_table, round_bvals from dipy.data.fetcher import fetch_sherbrooke_3shell -from dipy.core.gradients import round_bvals -import os.path as op -from ..diffusion import (noise_func, - noise_func_for_shelled_data, - get_spike_mask, - get_slice_spike_percentage, - get_global_spike_percentage, - cc_snr) + +from ..diffusion import ( + cc_snr, + get_global_spike_percentage, + get_slice_spike_percentage, + get_spike_mask, + noise_func, + noise_func_for_shelled_data, +) -class DiffusionData(object): +class DiffusionData: def get_data(self): """ Generate test data """ _, path = fetch_sherbrooke_3shell() fnifti = op.join(path, 'HARDI193.nii.gz') - fnifti, bval, bvec = [op.join(path, f'HARDI193.{ext}') for - ext in ["nii.gz", "bval", "bvec"]] + fnifti, bval, bvec = (op.join(path, f'HARDI193.{ext}') for + ext in ['nii.gz', 'bval', 'bvec']) img = nib.load(fnifti) data = img.get_fdata() gtab = gradient_table(bval, bvec) @@ -62,7 +65,7 @@ def shelled_data(self): return out_data, gtab -@pytest.fixture +@pytest.fixture() def ddata(): return DiffusionData()