Skip to content

Commit

Permalink
CalcJob: Fix bug causing exception after restarting daemon (#5886)
Browse files Browse the repository at this point in the history
In `v2.2.0`, a feature was added to attach monitors to a `CalcJob` for
which the `Waiting` (see `aiida.engine.processes.calcjob.tasks`) state
of the process was modified. The `_command` attribute was moved inside
the `data` argument passed to the constructor and was initialized only
in the constructor. However, when the process is loaded from a
serialized checkpoint, such as after a daemon restart, the constructor
is not called but `load_instance_state` is. In this case, the `_command`
attribute would not be set and when referenced it would cause an
exception.

The solution is to add the `_command` attribute to the `auto_persist`
decorator as this will be called on `load_instance_state` and will
reinitialize these attributes from the persisted checkpoint.

A similar problem was lurking with the `_monitors` attribute which was
also only initialized in the constructor. Since this has a custom data
type `CalcJobMonitors` which may not be serializable by `auto_persist`
a property `monitors` is added that will recreate the attribute and
populate it with the `CalcJobMonitors` instance with the monitors
specified in the inputs.
  • Loading branch information
sphuber authored Feb 10, 2023
1 parent 209143b commit 46ff10a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
21 changes: 16 additions & 5 deletions aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ async def do_kill():
return result


@plumpy.persistence.auto_persist('msg', 'data', '_monitor_result')
@plumpy.persistence.auto_persist('msg', 'data', '_command', '_monitor_result')
class Waiting(plumpy.process_states.Waiting):
"""The waiting state for the `CalcJob` process."""

Expand All @@ -451,15 +451,26 @@ def __init__(
self._monitor_result: CalcJobMonitorResult | None = None
self._monitors: CalcJobMonitors | None = None

if 'monitors' in self.process.node.inputs:
self._monitors = CalcJobMonitors(self.process.node.inputs.monitors)

if isinstance(self.data, dict):
self._command = self.data['command']
self._monitor_result = self.data.get('monitor_result', None)
else:
self._command = self.data

@property
def monitors(self) -> CalcJobMonitors | None:
"""Return the collection of monitors if specified in the inputs.
:return: Instance of ``CalcJobMonitors`` containing monitors if specified in the process' input.
"""
if not hasattr(self, '_monitors'):
self._monitors = None

if self._monitors is None and 'monitors' in self.process.node.inputs:
self._monitors = CalcJobMonitors(self.process.node.inputs.monitors)

return self._monitors

@property
def process(self) -> 'CalcJob':
"""
Expand Down Expand Up @@ -504,7 +515,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
process_status = f'Monitoring scheduler: job state {scheduler_state_string}'
node.set_process_status(process_status)
job_done = await self._launch_task(task_update_job, node, self.process.runner.job_manager)
monitor_result = await self._monitor_job(node, transport_queue, self._monitors)
monitor_result = await self._monitor_job(node, transport_queue, self.monitors)

if monitor_result and monitor_result.action is CalcJobMonitorAction.KILL:
await self._kill_job(node, transport_queue)
Expand Down
35 changes: 35 additions & 0 deletions tests/engine/processes/calcjobs/test_calc_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,41 @@ def test_monitor_result_action_disable_self(get_calcjob_builder, entry_points, c
assert len([record for record in caplog.records if 'Disable self.' in record.message]) == 1


def test_restart_after_daemon_reset(get_calcjob_builder, daemon_client, submit_and_await):
"""Test that a job can be restarted when it is launched and the daemon is restarted.
This is a regression test for https://github.com/aiidateam/aiida-core/issues/5882.
"""
import time

import plumpy

daemon_client.start_daemon()

# Launch a job with a one second sleep to ensure it doesn't finish before we get the chance to restart the daemon.
# A monitor is added to ensure that those are properly reinitialized in the ``Waiting`` state of the process.
builder = get_calcjob_builder()
builder.metadata.options.sleep = 1
builder.monitors = {'monitor': orm.Dict({'entry_point': 'core.always_kill', 'disabled': True})}
node = submit_and_await(builder, plumpy.ProcessState.WAITING)

daemon_client.restart_daemon(wait=True)

start_time = time.time()
timeout = 10

while node.process_state not in [plumpy.ProcessState.FINISHED, plumpy.ProcessState.EXCEPTED]:

if node.is_excepted:
raise AssertionError(f'The process excepted: {node.exception}')

if time.time() - start_time >= timeout:
raise AssertionError(f'process failed to terminate within timeout, current state: {node.process_state}')

assert node.is_finished, node.process_state
assert node.is_finished_ok, node.exit_status


class TestImport:
"""Test the functionality to import existing calculations completed outside of AiiDA."""

Expand Down

0 comments on commit 46ff10a

Please sign in to comment.