Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: adapt code so it works with different versions of networkx #1137

Merged
merged 3 commits into from
Sep 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 15 additions & 22 deletions mriqc/engine/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ def run(self, graph, config, updatehash=False):
else:
if result:
if result["traceback"]:
notrun.append(
self._clean_queue(jobid, graph, result=result)
)
notrun.append(self._clean_queue(jobid, graph, result=result))
errors.append("".join(result["traceback"]))
else:
self._task_finished_cb(jobid)
Expand Down Expand Up @@ -289,12 +287,8 @@ def _submit_mapnode(self, jobid):
"lil",
)
self.depidx[-numnodes:, jobid] = 1
self.proc_done = np.concatenate(
(self.proc_done, np.zeros(numnodes, dtype=bool))
)
self.proc_pending = np.concatenate(
(self.proc_pending, np.zeros(numnodes, dtype=bool))
)
self.proc_done = np.concatenate((self.proc_done, np.zeros(numnodes, dtype=bool)))
self.proc_pending = np.concatenate((self.proc_pending, np.zeros(numnodes, dtype=bool)))
return False

def _local_hash_check(self, jobid, graph):
Expand All @@ -311,11 +305,7 @@ def _local_hash_check(self, jobid, graph):
overwrite = self.procs[jobid].overwrite
always_run = self.procs[jobid].interface.always_run

if (
cached
and updated
and (overwrite is False or overwrite is None and not always_run)
):
if cached and updated and (overwrite is False or overwrite is None and not always_run):
try:
self._task_finished_cb(jobid, cached=True)
self._remove_node_dirs()
Expand All @@ -339,18 +329,23 @@ def _task_finished_cb(self, jobid, cached=False):
rowview = self.depidx.getrowview(jobid)
rowview[rowview.nonzero()] = 0
if jobid not in self.mapnodesubids:
self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0
try:
self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0
except NotImplementedError:
self.refidx[self.refidx[:, [jobid]].nonzero()[0], jobid] = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this addressing? Is there an issue open for this failure condition?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I changed l. 351, MRIQC was running into a NotImplementedError because they did not implement the feature for 1D arrays. Sorry, I cannot find again the exact error message so I don't remember which software was issuing the error. But, the error message suggested this workaround.


def _generate_dependency_list(self, graph):
"""Generate a dependency list for a list of graphs."""
import numpy as np
import networkx as nx
from nipype.pipeline.engine.utils import topological_sort

try:
from networkx import to_scipy_sparse_array
except ImportError: # NetworkX < 2.7
from networkx import to_scipy_sparse_matrix as to_scipy_sparse_array

self.procs, _ = topological_sort(graph)
self.depidx = nx.to_scipy_sparse_matrix(
graph, nodelist=self.procs, format="lil"
)
self.depidx = to_scipy_sparse_array(graph, nodelist=self.procs, format="lil")
self.refidx = self.depidx.astype(int)
self.proc_done = np.zeros(len(self.procs), dtype=bool)
self.proc_pending = np.zeros(len(self.procs), dtype=bool)
Expand Down Expand Up @@ -505,9 +500,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
# Check to see if a job is available (jobs with all dependencies run)
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
# See also https://github.com/nipy/nipype/issues/2372
jobids = np.flatnonzero(
~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__()
)
jobids = np.flatnonzero(~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__())

# Check available resources by summing all threads and memory used
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
Expand Down
Loading