Skip to content

Commit

Permalink
Fix process-node handling by process monitor thread
Browse files Browse the repository at this point in the history
To avoid SQLAlchemy session conflicts, the process node MUST be fetched on each request (not locally stored) to ensure any given thread will handle its OWN fetched process node.
  • Loading branch information
edan-bainglass committed Nov 2, 2024
1 parent 2457e8d commit 017229f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 24 deletions.
21 changes: 11 additions & 10 deletions src/aiidalab_qe/app/result/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _on_kill_button_click(self, _):

def _on_update_results_button_click(self, _):
self.node_view.node = None
self.node_view.node = self._model.process_node
self.node_view.node = self._model.get_process_node()

def _on_clean_scratch_button_click(self, _):
self._model.clean_remote_data()
Expand All @@ -164,9 +164,9 @@ def _update_kill_button_layout(self):
if not self.rendered:
return
if (
not self._model.process
or self._model.process_node.is_finished
or self._model.process_node.is_excepted
not (process_node := self._model.get_process_node())
or process_node.is_finished
or process_node.is_excepted
or self.state
in (
self.State.SUCCESS,
Expand All @@ -180,31 +180,32 @@ def _update_kill_button_layout(self):
def _update_clean_scratch_button_layout(self):
if not self.rendered:
return
if self._model.process and self._model.process_node.is_terminated:
process_node = self._model.get_process_node()
if process_node and process_node.is_terminated:
self.clean_scratch_button.layout.display = "block"
else:
self.clean_scratch_button.layout.display = "none"

def _update_state(self):
if not self._model.process:
if not (process_node := self._model.get_process_node()):
self.state = self.State.INIT
elif self._model.process_node.process_state in (
elif process_node.process_state in (
ProcessState.CREATED,
ProcessState.RUNNING,
ProcessState.WAITING,
):
self.state = self.State.ACTIVE
self._model.process_info = PROCESS_RUNNING
elif (
self._model.process_node.process_state
process_node.process_state
in (
ProcessState.EXCEPTED,
ProcessState.KILLED,
)
or self._model.process_node.is_failed
or process_node.is_failed
):
self.state = self.State.FAIL
self._model.process_info = PROCESS_EXCEPTED
elif self._model.process_node.is_finished_ok:
elif process_node.is_finished_ok:
self.state = self.State.SUCCESS
self._model.process_info = PROCESS_COMPLETED
28 changes: 14 additions & 14 deletions src/aiidalab_qe/app/result/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import traitlets as tl

from aiida import orm
from aiida.common.exceptions import NotExistent
from aiida.engine.processes import control


Expand All @@ -14,25 +15,27 @@ class ResultsModel(tl.HasTraits):
process_info = tl.Unicode("")
process_remote_folder_is_clean = tl.Bool(False)

_process_node: orm.ProcessNode | None = None

@property
def process_node(self):
if self._process_node is None:
self._process_node = self._get_process_node()
return self._process_node
return self.get_process_node()

def update(self):
self._update_process_remote_folder_state()

def get_process_node(self):
try:
return orm.load_node(self.process) if self.process else None
except NotExistent:
return None

def kill_process(self):
if process := self._get_process_node():
control.kill_processes([process])
if process_node := self.get_process_node():
control.kill_processes([process_node])

def clean_remote_data(self):
if self.process_node is None:
if not (process_node := self.get_process_node()):
return
for called_descendant in self.process_node.called_descendants:
for called_descendant in process_node.called_descendants:
if isinstance(called_descendant, orm.CalcJobNode):
with contextlib.suppress(Exception):
called_descendant.outputs.remote_folder._clean()
Expand All @@ -42,14 +45,11 @@ def reset(self):
self.process = None
self.process_info = ""

def _get_process_node(self):
return orm.load_node(self.process) if self.process else None

def _update_process_remote_folder_state(self):
if self.process_node is None:
if not (process_node := self.get_process_node()):
return
cleaned = []
for called_descendant in self.process_node.called_descendants:
for called_descendant in process_node.called_descendants:
if isinstance(called_descendant, orm.CalcJobNode):
with contextlib.suppress(Exception):
cleaned.append(called_descendant.outputs.remote_folder.is_empty)
Expand Down

0 comments on commit 017229f

Please sign in to comment.