Skip to content

Commit

Permalink
Merge pull request #1317 from nipreps/fix/me-metadadata-multiplicity
Browse files Browse the repository at this point in the history
ENH: Crawl dataset's metadata only once and before Nipype's workflow
  • Loading branch information
oesteban authored Aug 16, 2024
2 parents be53a7f + cf1ea8f commit 0cf1ae6
Show file tree
Hide file tree
Showing 10 changed files with 500 additions and 218 deletions.
27 changes: 4 additions & 23 deletions mriqc/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,6 @@ def _bids_filter(value):

def parse_args(args=None, namespace=None):
"""Parse args and run further checks on the command line."""
from contextlib import suppress
from json import loads
from logging import DEBUG, FileHandler
from pathlib import Path
Expand All @@ -490,6 +489,7 @@ def parse_args(args=None, namespace=None):
from mriqc import __version__
from mriqc._warnings import DATE_FMT, LOGGER_FMT, _LogFormatter
from mriqc.messages import PARTICIPANT_START
from mriqc.utils.misc import initialize_meta_and_data

parser = _build_parser()
opts = parser.parse_args(args, namespace)
Expand Down Expand Up @@ -554,10 +554,9 @@ def parse_args(args=None, namespace=None):
if output_dir == bids_dir:
parser.error(
'The selected output folder is the same as the input BIDS folder. '
'Please modify the output path (suggestion: %s).'
% bids_dir
f'Please modify the output path (suggestion: {bids_dir}).'
/ 'derivatives'
/ ('mriqc-%s' % version.split('+')[0])
/ ('mriqc-{}'.format(version.split('+')[0]))
)

if bids_dir in work_dir.parents:
Expand Down Expand Up @@ -642,11 +641,7 @@ def parse_args(args=None, namespace=None):
f'MRIQC is unable to process the following modalities: {", ".join(unknown_mods)}.'
)

# Estimate the biggest file size / leave 1GB if some file does not exist (datalad)
with suppress(FileNotFoundError):
config.workflow.biggest_file_gb = _get_biggest_file_size_gb(
config.workflow.inputs.values()
)
initialize_meta_and_data()

# set specifics for alternative populations
if opts.species.lower() != 'human':
Expand All @@ -660,17 +655,3 @@ def parse_args(args=None, namespace=None):
config.workflow.fd_radius = 7.5
# block uploads for the moment; can be reversed before wider release
config.execution.no_sub = True


def _get_biggest_file_size_gb(files):
"""Identify the largest file size (allows multi-echo groups)."""

import os

sizes = []
for file in files:
if isinstance(file, (list, tuple)):
sizes.append(_get_biggest_file_size_gb(file))
else:
sizes.append(os.path.getsize(file))
return max(sizes) / (1024**3)
5 changes: 4 additions & 1 deletion mriqc/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,10 @@ def main(argv=None):
)
),
)
config.to_filename(config.execution.log_dir / f'config-{config.execution.run_uuid}.toml')
config.to_filename(
config.execution.log_dir / f'config-{config.execution.run_uuid}.toml',
store_inputs=False, # Inputs are not necessary anymore
)
sys.exit(exitcode)


Expand Down
46 changes: 43 additions & 3 deletions mriqc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
from __future__ import annotations

import os
import pickle
import sys
from contextlib import suppress
from pathlib import Path
Expand Down Expand Up @@ -576,8 +577,8 @@ class workflow(_Config):

analysis_level: list[str] = ['participant']
"""Level of analysis."""
biggest_file_gb: int = 1
"""Size of largest file in GB."""
biggest_file_gb: dict[int] = 1
"""Dictionary holding the size of largest file in GB (per modality)."""
deoblique: bool = False
"""Deoblique the functional scans during head motion correction preprocessing."""
despike: bool = False
Expand All @@ -590,6 +591,12 @@ class workflow(_Config):
"""Turn on FFT based spike detector (slow)."""
inputs: list[str | os.PathLike] | None = None
"""List of files to be processed with MRIQC."""
inputs_entities: dict[list[dict]]
"""List of entities corresponding to inputs."""
inputs_metadata: dict[list[dict | list[dict]]] | None = None
"""List of metadata corresponding to inputs."""
inputs_path: Path | None = None
"""Path to a pickle file with the input paths and metadata."""
min_len_dwi: int = 7
"""
Minimum DWI length to be considered a "processable" dataset
Expand All @@ -602,6 +609,21 @@ class workflow(_Config):
template_id: str = 'MNI152NLin2009cAsym'
"""TemplateFlow ID of template used for the anatomical processing."""

_hidden: tuple[str, ...] = ('inputs', 'inputs_entities', 'inputs_metadata')

@classmethod
def init(cls) -> None:
if cls.inputs_path is None:
cls.inputs_path = execution.work_dir / f'inputs-{execution.run_uuid}.pkl'

if cls.inputs_path.exists():
with open(cls.inputs_path, 'rb') as handle:
_inputs = pickle.load(handle)

cls.inputs = _inputs['paths']
cls.inputs_metadata = _inputs['metadata']
cls.inputs_entities = _inputs['entities']


class loggers:
"""Keep loggers easily accessible (see :py:func:`init`)."""
Expand Down Expand Up @@ -727,7 +749,10 @@ def dumps() -> str:
return dumps(get())


def to_filename(filename: str | os.PathLike | None = None) -> Path:
def to_filename(
filename: str | os.PathLike | None = None,
store_inputs: bool = True,
) -> Path:
"""Write settings to file."""

if filename:
Expand All @@ -738,6 +763,21 @@ def to_filename(filename: str | os.PathLike | None = None) -> Path:
settings.file_path.parent.mkdir(exist_ok=True, parents=True)
settings.file_path.write_text(dumps())
loggers.cli.debug(f'Saved MRIQC config file: {settings.file_path}.')

if store_inputs:
if workflow.inputs_path is None:
workflow.inputs_path = execution.work_dir / f'inputs-{execution.run_uuid}.pkl'

# Pickle inputs
with open(workflow.inputs_path, 'wb') as handle:
inputs_dict = {
'paths': workflow.inputs,
'metadata': workflow.inputs_metadata,
'entities': workflow.inputs_entities,
}
pickle.dump(inputs_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)

loggers.cli.debug(f'Saved MRIQC inputs file: {workflow.inputs_path}.')
return settings.file_path


Expand Down
10 changes: 7 additions & 3 deletions mriqc/interfaces/bids.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,19 @@

class IQMFileSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
in_file = Str(mandatory=True, desc='path of input file')
subject_id = Str(mandatory=True, desc='the subject id')
modality = Str(mandatory=True, desc='the qc type')
entities = traits.Dict(desc='entities corresponding to the input')
subject_id = Str(desc='the subject id')
session_id = traits.Either(None, Str, usedefault=True)
task_id = traits.Either(None, Str, usedefault=True)
acq_id = traits.Either(None, Str, usedefault=True)
rec_id = traits.Either(None, Str, usedefault=True)
run_id = traits.Either(None, traits.Int, usedefault=True)
dataset = Str(desc='dataset identifier')
dismiss_entities = traits.List(['part'], usedefault=True)
dismiss_entities = traits.List(
['datatype', 'part', 'echo', 'extension', 'suffix'],
usedefault=True,
)
metadata = traits.Dict()
provenance = traits.Dict()

Expand Down Expand Up @@ -156,7 +160,7 @@ def _run_interface(self, runtime):
)

# Fill in the "bids_meta" key
id_dict = {}
id_dict = self.inputs.entities if isdefined(self.inputs.entities) else {}
for comp in BIDS_COMP:
comp_val = getattr(self.inputs, comp, None)
if isdefined(comp_val) and comp_val is not None:
Expand Down
58 changes: 43 additions & 15 deletions mriqc/interfaces/webapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#
# https://www.nipreps.org/community/licensing/
#
import json

from nipype.interfaces.base import (
BaseInterfaceInputSpec,
Bunch,
Expand Down Expand Up @@ -116,6 +118,11 @@ class UploadIQMsInputSpec(BaseInterfaceInputSpec):
auth_token = Str(mandatory=True, desc='authentication token')
email = Str(desc='set sender email')
strict = traits.Bool(False, usedefault=True, desc='crash if upload was not successful')
modality = Str(
'undefined',
usedefault=True,
desc='override modality field if provided through metadata',
)


class UploadIQMsOutputSpec(TraitedSpec):
Expand All @@ -138,11 +145,12 @@ def _run_interface(self, runtime):

self._results['api_id'] = None

response = upload_qc_metrics(
response, payload = upload_qc_metrics(
self.inputs.in_iqms,
endpoint=self.inputs.endpoint,
auth_token=self.inputs.auth_token,
email=email,
modality=self.inputs.modality,
)

try:
Expand All @@ -151,21 +159,30 @@ def _run_interface(self, runtime):
# response did not give us an ID
errmsg = (
'QC metrics upload failed to create an ID for the record '
f'uplOADED. rEsponse from server follows: {response.text}'
f'uploaded. Response from server follows: {response.text}'
'\n\nPayload:\n'
f'{json.dumps(payload, indent=2)}'
)
config.loggers.interface.warning(errmsg)

if response.status_code == 201:
config.loggers.interface.info(messages.QC_UPLOAD_COMPLETE)
return runtime

errmsg = 'QC metrics failed to upload. Status %d: %s' % (
response.status_code,
response.text,
errmsg = '\n'.join(
[
'Unsuccessful upload.',
f'Server response status {response.status_code}:',
response.text,
'',
'',
'Payload:',
json.dumps(payload, indent=2),
]
)
config.loggers.interface.warning(errmsg)
if self.inputs.strict:
raise RuntimeError(response.text)
raise RuntimeError(errmsg)

return runtime

Expand All @@ -175,6 +192,7 @@ def upload_qc_metrics(
endpoint=None,
email=None,
auth_token=None,
modality=None,
):
"""
Upload qc metrics to remote repository.
Expand Down Expand Up @@ -205,33 +223,43 @@ def upload_qc_metrics(

# Extract metadata and provenance
meta = in_data.pop('bids_meta')

# For compatibility with WebAPI. Should be rolled back to int
if meta.get('run_id', None) is not None:
meta['run_id'] = '%d' % meta.get('run_id')

prov = in_data.pop('provenance')

# At this point, data should contain only IQMs
data = deepcopy(in_data)

# Check modality
modality = meta.get('modality', 'None')
modality = meta.get('modality', None) or meta.get('suffix', None) or modality
if modality not in ('T1w', 'bold', 'T2w'):
errmsg = (
'Submitting to MRIQCWebAPI: image modality should be "bold", "T1w", or "T2w", '
'(found "%s")' % modality
f'(found "{modality}")'
)
return Bunch(status_code=1, text=errmsg)

# Filter metadata values that aren't in whitelist
data['bids_meta'] = {k: meta[k] for k in META_WHITELIST if k in meta}

# Check for fields with appended _id
bids_meta_names = {k: k.replace('_id', '') for k in META_WHITELIST if k.endswith('_id')}
data['bids_meta'].update({k: meta[v] for k, v in bids_meta_names.items() if v in meta})

# For compatibility with WebAPI. Should be rolled back to int
if (run_id := data['bids_meta'].get('run_id', None)) is not None:
data['bids_meta']['run_id'] = f'{run_id}'

# One more chance for spelled-out BIDS entity acquisition
if (acq_id := meta.get('acquisition', None)) is not None:
data['bids_meta']['acq_id'] = acq_id

# Filter provenance values that aren't in whitelist
data['provenance'] = {k: prov[k] for k in PROV_WHITELIST if k in prov}

# Hash fields that may contain personal information
data['bids_meta'] = _hashfields(data['bids_meta'])

data['bids_meta']['modality'] = modality

if email:
data['provenance']['email'] = email

Expand All @@ -248,10 +276,10 @@ def upload_qc_metrics(
timeout=15,
)
except requests.ConnectionError as err:
errmsg = 'QC metrics failed to upload due to connection error shown below:\n%s' % err
errmsg = f'QC metrics failed to upload due to connection error shown below:\n{err}'
return Bunch(status_code=1, text=errmsg)

return response
return response, data


def _hashfields(data):
Expand Down
Loading

0 comments on commit 0cf1ae6

Please sign in to comment.