Skip to content

Commit

Permalink
wip: preparing new interfaces
Browse files Browse the repository at this point in the history
Here I will continue creating the diffusion QC Nipype interface and also
transfer the preprocessing that was in the QC module.

Resolves: nipreps#1215.
  • Loading branch information
oesteban committed Mar 22, 2024
1 parent 40886c2 commit 27d22f7
Showing 1 changed file with 312 additions and 8 deletions.
320 changes: 312 additions & 8 deletions mriqc/interfaces/diffusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@
# https://www.nipreps.org/community/licensing/
#
"""Interfaces for manipulating DWI data."""
from __future__ import annotations

import nibabel as nb
import numpy as np
from dipy.reconst.dti import TensorFit
from nipype.interfaces.base import (
BaseInterfaceInputSpec as _BaseInterfaceInputSpec,
)
from nipype.interfaces.base import (
BaseInterfaceInputSpec,
File,
InputMultiObject,
OutputMultiObject,
SimpleInterface,
TraitedSpec,
isdefined,
traits,
)
from nipype.interfaces.base import (
BaseInterfaceInputSpec as _BaseInterfaceInputSpec,
)
from nipype.interfaces.base import (
TraitedSpec as _TraitedSpec,
)
Expand Down Expand Up @@ -428,6 +434,8 @@ class _DipyDTIInputSpec(_BaseInterfaceInputSpec):

class _DipyDTIOutputSpec(_TraitedSpec):
out_fa = File(exists=True, desc='output FA file')
out_fa_clean = File(exists=True, desc='output FA file after fixing degenerate vectors')
out_cfa = File(exists=True, desc='output color FA file')
out_md = File(exists=True, desc='output MD file')


Expand All @@ -440,6 +448,7 @@ class DipyDTI(SimpleInterface):
def _run_interface(self, runtime):
from dipy.core.gradients import gradient_table_from_bvals_bvecs
from dipy.reconst.dti import TensorModel
from dipy.reconst.dti import fractional_anisotropy, color_fa
from dipy.reconst.fwdti import FreeWaterTensorModel
from nipype.utils.filemanip import fname_presuffix

Expand Down Expand Up @@ -468,9 +477,8 @@ def _run_interface(self, runtime):
)

# Extract the FA
fa_data = np.array(fwdtifit.fa, dtype='float32')
fa_data[np.isnan(fa_data)] = 0
fa_data = np.clip(fa_data, 0, 1)
fa_data = fractional_anisotropy(fwdtifit.evals)
fa_data[np.isnan(fa_data)] = -1000 # Arbitrary value to mark up NaNs

fa_nii = nb.Nifti1Image(
fa_data,
Expand All @@ -480,8 +488,6 @@ def _run_interface(self, runtime):

fa_nii.header.set_xyzt_units('mm')
fa_nii.header.set_intent('estimate', name='Fractional Anisotropy (FA)')
fa_nii.header['cal_max'] = 1.0
fa_nii.header['cal_min'] = 0.0

self._results['out_fa'] = fname_presuffix(
self.inputs.in_file,
Expand All @@ -491,6 +497,49 @@ def _run_interface(self, runtime):

fa_nii.to_filename(self._results['out_fa'])

# Clamp the FA to remove degenerate tensors and round for stability
fa_clean = np.clip(np.round(fa_data, 3), 0, 1)

fa_clean_nii = nb.Nifti1Image(
fa_clean,
img.affine,
None,
)

fa_clean_nii.header.set_xyzt_units('mm')
fa_clean_nii.header.set_intent(
'estimate',
name='Fractional Anisotropy (FA) after removing degenerate vectors')
fa_clean_nii.header['cal_max'] = 1.0
fa_clean_nii.header['cal_min'] = 0.0

self._results['out_fa_clean'] = fname_presuffix(
self.inputs.in_file,
suffix='desc-clean_fa',
newpath=runtime.cwd,
)
fa_clean_nii.to_filename(self._results['out_fa_clean'])

# Extract the color FA
cfa_data = color_fa(fa_clean, fwdtifit.evecs)
cfa_nii = nb.Nifti1Image(
cfa_data,
img.affine,
None,
)

cfa_nii.header.set_xyzt_units('mm')
cfa_nii.header.set_intent('estimate', name='Fractional Anisotropy (FA)')
cfa_nii.header['cal_max'] = 1.0
cfa_nii.header['cal_min'] = 0.0

self._results['out_cfa'] = fname_presuffix(
self.inputs.in_file,
suffix='cfa',
newpath=runtime.cwd,
)
cfa_nii.to_filename(self._results['out_cfa'])

# Extract the AD
self._results['out_md'] = fname_presuffix(
self.inputs.in_file,
Expand All @@ -511,6 +560,110 @@ def _run_interface(self, runtime):
return runtime


class _DiffusionQCInputSpec(BaseInterfaceInputSpec):
in_b0 = File(exists=True, mandatory=True, desc='input b=0 average')
in_shells = InputMultiObject(
File(exists=True),
mandatory=True,
desc='DWI data after HMC and split by shells (indexed by in_bval)'
)
in_bvec = File(exists=True, mandatory=True, desc='input motion corrected file')
in_bval = traits.List(traits.Float, minlen=1, desc='b-values')
in_bvec = traits.List(
traits.Tuple(traits.Float, traits.Float, traits.Float),
minlen=1,
desc='b-vectors',
)
in_mask = File(exists=True, mandatory=True, desc='input probabilistic brain mask')
in_fa = File(exists=True, mandatory=True, desc='input FA map')
in_md = File(exists=True, mandatory=True, desc='input MD map')
direction = traits.Enum(
'all',
'x',
'y',
'-x',
'-y',
usedefault=True,
desc='direction for GSR computation',
)
in_fwhm = traits.List(traits.Float, mandatory=True, desc='smoothness estimated with AFNI')


class _DiffusionQCOutputSpec(TraitedSpec):
fber = traits.Float
efc = traits.Float
snr = traits.Float
gsr = traits.Dict
tsnr = traits.Float
fd = traits.Dict
fwhm = traits.Dict(desc='full width half-maximum measure')
size = traits.Dict
spacing = traits.Dict
summary = traits.Dict

out_qc = traits.Dict(desc='output flattened dictionary with all measures')


class DiffusionQC(SimpleInterface):
"""Computes :abbr:`QC (Quality Control)` measures on the input DWI EPI scan."""

input_spec = _DiffusionQCInputSpec
output_spec = _DiffusionQCOutputSpec

def _run_interface(self, runtime):
from mriqc.qc import anatomical as aqc
from mriqc.qc import diffusion as dqc

# Get the mean EPI data and get it ready
b0nii = nb.load(self.inputs.in_b0)
b0data = np.round(
np.nan_to_num(np.asanyarray(b0nii.dataobj)),
2,
)
b0data[b0data < 0] = 0

# Get the FA data and get it ready
fanii = nb.load(self.inputs.in_fa)
fadata = np.round(
np.nan_to_num(np.asanyarray(fanii.dataobj)),
2,
)

# Get EPI data (with hmc done) and get it ready
hmcnii = nb.load(self.inputs.in_hmc)
hmcdata = np.round(
np.nan_to_num(np.asanyarray(hmcnii.dataobj)),
2,
)
hmcdata[hmcdata < 0] = 0

# Get brain mask data
msknii = nb.load(self.inputs.in_mask)
mskdata = np.round( # Protect the thresholding with a rounding for stability
np.asanyarray(msknii.dataobj),
1,
) > 0
if np.sum(mskdata) < 100:
raise RuntimeError(
'Detected less than 100 voxels belonging to the brain mask. '
'MRIQC failed to process this dataset.'
)

# Summary stats
rois = {
'fg': mskdata,
'bg': 1.0 - mskdata,
}
stats = aqc.summary_stats(b0data, rois)
self._results['summary'] = stats

self._results['cc_snr'] = dqc.cc_snr(
fadata
)

return runtime


def _rms(estimator, X):
"""
Callable to pass to GridSearchCV that will calculate a distance score.
Expand Down Expand Up @@ -545,3 +698,154 @@ def _extract_b0(in_file, b0_ixs, out_path=None):

def _exp_func(t, A, K, C):
return A * np.exp(K * t) + C


def get_spike_mask(
data: np.ndarray,
z_threshold: float = 3.0,
grouping_vals: np.ndarray | None = None,
bmag: int | None = None,
) -> np.ndarray:
"""
Creates a binary mask classifying voxels in the data array as spike or non-spike.
This function identifies voxels with signal intensities exceeding a threshold based
on standard deviations above the mean. The threshold can be applied globally to
the entire data array, or it can be calculated for groups of voxels defined by
the `grouping_vals` parameter.
Parameters
----------
data : ndarray (float, 3D or 4D)
The data array to be thresholded.
z_threshold : float, optional (default=3.0)
The number of standard deviations to use above the mean as the threshold
multiplier.
grouping_vals : ndarray (int, same shape as data or 3D), optional
If provided, this array is used to group voxels for thresholding. Voxels
with the same value in `grouping_vals` are considered to belong to the same
group. The threshold will be calculated independently for each group.
- If `grouping_vals` has the same shape as `data` (4D), it is assumed to be
a mask where each voxel value indicates the group it belongs to.
- If `grouping_vals` has a 3D shape, it is assumed to represent b-values
corresponding to each voxel in the 4D `data` array. In this case, voxels
with the same b-value are grouped together.
bmag : int, optional
The order of magnitude for b-value rounding (used only if
`grouping_vals` is provided as b-values). Default: None (derived from max b-value).
Returns:
-------
ndarray (bool, same shape as data)
A binary mask where True values indicate voxels classified as spikes and
False values indicate non-spikes. The mask has the same shape as the input
data array.
"""
from dipy.core.gradients import round_bvals, unique_bvals_magnitude

if grouping_vals is None:
threshold = np.round((z_threshold * np.std(data)) + np.mean(data), 3)
spike_mask = np.round(data, 3) > threshold
return spike_mask

threshold_mask = np.zeros(data.shape)

rounded_grouping_vals = round_bvals(grouping_vals, bmag)
gvals = unique_bvals_magnitude(grouping_vals, bmag)

if grouping_vals.shape == data.shape:
for gval in gvals:
gval_data = data[rounded_grouping_vals == gval]
gval_threshold = ((z_threshold * np.std(gval_data))
+ np.mean(gval_data))
threshold_mask[rounded_grouping_vals == gval] = (
gval_threshold * np.ones(gval_data.shape))
else:
for gval in gvals:
gval_data = data[..., rounded_grouping_vals == gval]
gval_threshold = ((z_threshold * np.std(gval_data))
+ np.mean(gval_data))
threshold_mask[..., rounded_grouping_vals == gval] = (
gval_threshold * np.ones(gval_data.shape))

spike_mask = data > threshold_mask

return spike_mask


def noise_b0(data, gtab, mask=None):
"""
Estimate noise in raw dMRI based on b0 variance.
Parameters
----------
"""
if mask is None:
mask = np.ones(data.shape[:3], dtype=bool)
b0 = data[..., ~gtab.b0s_mask]
return np.percentile(np.var(b0[mask], -1), (25, 50, 75))


def segment_corpus_callosum(
in_cfa: np.ndarray,
mask: np.ndarray,
threshold: tuple[float, float, float, float, float, float] = (
0.6,
1,
0,
0.1,
0,
0.1,
),
) -> tuple[np.ndarray, np.ndarray]:
"""
Segments the corpus callosum (CC) from a color FA map.
Parameters
----------
in_cfa : :obj:`~numpy.ndarray`
The color FA (cFA) map.
mask : :obj:`~numpy.ndarray` (bool, 3D)
A brain mask used to define the initial bounding box.
threshold : :obj:`tuple`, optional
An iterable that defines the minimum and maximum values to use for
the thresholding of the cFA. Values are specified as
(R_min, R_max, G_min, G_max, B_min, B_max).
Returns
-------
cc_mask: :obj:`~numpy.ndarray`
The final binary mask of the segmented CC.
Notes
-----
This implementation was derived from
:obj:`dipy.segment.mask.segment_from_cfa`.
The CC mask is then cleaned-up for spurious off voxels with
:obj:`dipy.segment.mask.clean_cc_mask`
"""
from dipy.segment.mask import bounding_box, clean_cc_mask

# Prepare a bounding box of the CC
cc_box = np.zeros_like(mask, dtype=bool)
mins, maxs = bounding_box(mask) # mask needs to be volume
mins = np.array(mins)
maxs = np.array(maxs)
diff = (maxs - mins) // 4
bounds_min = mins + diff
bounds_max = maxs - diff
cc_box[
bounds_min[0]:bounds_max[0],
bounds_min[1]:bounds_max[1],
bounds_min[2]:bounds_max[2]
] = True

include = (
(in_cfa >= threshold[0::2])
& (in_cfa <= threshold[1::2])
& cc_box[..., None]
)
cc_mask = clean_cc_mask(np.all(include, axis=-1))
return cc_mask

0 comments on commit 27d22f7

Please sign in to comment.