diff --git a/changes.d/6570.feat.md b/changes.d/6570.feat.md new file mode 100644 index 0000000000..1d3f13269c --- /dev/null +++ b/changes.d/6570.feat.md @@ -0,0 +1 @@ +Using `cylc set` without specifying `--out` on a task where success is optional now sets success pathway outputs instead of doing nothing. diff --git a/cylc/flow/id_match.py b/cylc/flow/id_match.py index de3fa21cb1..e128e9c5e3 100644 --- a/cylc/flow/id_match.py +++ b/cylc/flow/id_match.py @@ -98,7 +98,8 @@ def filter_ids( * If IDTokens.Cycle all CyclePoints with any matching tasks will be returned. warn: - Whether to log a warning if no matching tasks are found. + Whether to log a warning if no matching tasks are found in the + pool. TODO: Consider using wcmatch which would add support for diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 67d3c58ec8..468a1d3a7e 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2184,7 +2184,8 @@ class Meta: description = sstrip(""" Set task prerequisites or outputs. - By default, set all required outputs for target task(s). + By default, set all required outputs for target task(s) (including + `submitted`, `started` and `succeeded` even if they are optional). Setting prerequisites contributes to the task's readiness to run. diff --git a/cylc/flow/run_modes/skip.py b/cylc/flow/run_modes/skip.py index 4e0789ed98..325f6da6a3 100644 --- a/cylc/flow/run_modes/skip.py +++ b/cylc/flow/run_modes/skip.py @@ -17,23 +17,31 @@ """ from logging import INFO from typing import ( - TYPE_CHECKING, Dict, List, Tuple) + TYPE_CHECKING, + Dict, + List, + Optional, + Set, + Tuple, +) from cylc.flow import LOG from cylc.flow.exceptions import WorkflowConfigError +from cylc.flow.run_modes import RunMode from cylc.flow.task_outputs import ( + TASK_OUTPUT_FAILED, + TASK_OUTPUT_STARTED, TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUCCEEDED, - TASK_OUTPUT_FAILED, - TASK_OUTPUT_STARTED ) -from cylc.flow.run_modes import RunMode + if TYPE_CHECKING: - from cylc.flow.taskdef import TaskDef + from typing_extensions import Literal + from cylc.flow.task_job_mgr import TaskJobManager from cylc.flow.task_proxy import TaskProxy - from typing_extensions import Literal + from cylc.flow.taskdef import TaskDef def submit_task_job( @@ -79,13 +87,18 @@ def submit_task_job( } ) task_job_mgr.workflow_db_mgr.put_update_task_state(itask) - for output in process_outputs(itask, rtconfig): + for output in sorted( + process_outputs(itask, rtconfig), + key=itask.state.outputs.output_sort_key, + ): task_job_mgr.task_events_mgr.process_message(itask, INFO, output) return True -def process_outputs(itask: 'TaskProxy', rtconfig: Dict) -> List[str]: +def process_outputs( + itask: 'TaskProxy', rtconfig: Optional[dict] = None +) -> Set[str]: """Process Skip Mode Outputs: * By default, all required outputs will be generated plus succeeded @@ -96,13 +109,13 @@ def process_outputs(itask: 'TaskProxy', rtconfig: Dict) -> List[str]: succeeded or failed then succeeded will be produced. Return: - A list of outputs to emit. + A set of outputs to emit. """ # Always produce `submitted` & `started` outputs first: - result: List[str] = [TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED] + result: Set[str] = {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED} - conf_outputs = list(rtconfig['skip']['outputs']) + conf_outputs = list(rtconfig['skip']['outputs']) if rtconfig else [] # Send the rest of our outputs, unless they are succeeded or failed, # which we hold back, to prevent warnings about pre-requisites being @@ -117,26 +130,22 @@ def process_outputs(itask: 'TaskProxy', rtconfig: Dict) -> List[str]: trigger = itask.state.outputs._message_to_trigger[message] # Send message unless it be succeeded/failed. if ( - trigger not in { - TASK_OUTPUT_SUCCEEDED, - TASK_OUTPUT_FAILED, - TASK_OUTPUT_SUBMITTED, - TASK_OUTPUT_STARTED, - } + trigger not in {TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED} and (not conf_outputs or trigger in conf_outputs) ): - result.append(message) + result.add(message) # Add optional outputs specified in skip settings: - for message, trigger in itask.state.outputs._message_to_trigger.items(): - if trigger in conf_outputs and trigger not in result: - result.append(message) + result.update( + message + for message, trigger in itask.state.outputs._message_to_trigger.items() + if trigger in conf_outputs + ) - # Send succeeded/failed last. if TASK_OUTPUT_FAILED in conf_outputs: - result.append(TASK_OUTPUT_FAILED) - elif TASK_OUTPUT_SUCCEEDED not in result: - result.append(TASK_OUTPUT_SUCCEEDED) + result.add(TASK_OUTPUT_FAILED) + else: + result.add(TASK_OUTPUT_SUCCEEDED) return result diff --git a/cylc/flow/scripts/set.py b/cylc/flow/scripts/set.py index adc3cf8449..ec5b6e5828 100755 --- a/cylc/flow/scripts/set.py +++ b/cylc/flow/scripts/set.py @@ -20,7 +20,8 @@ Command to manually set task prerequisites and outputs in running workflows. -By default, it sets all required outputs (note "succeeded" may be optional). +By default, it sets all required outputs (including "submitted", "started" and +"succeeded" even if they are optional). Setting task prerequisites: - contributes to the task's readiness to run, and @@ -35,14 +36,15 @@ - contributes to a task's completion, and - spawns downstream tasks that depend on those outputs -Note setting final outputs (succeeded, failed, expired) also sets task state. -Setting the started and submitted outputs spawns downstream tasks that depend -on them but does not affect task state, because there is no running job. +Note setting final outputs ("succeeded", "failed", "expired") also sets task +state. Setting the "started" and "submitted" outputs spawns downstream tasks +that depend on them but does not affect task state, because there is no +running job. Implied outputs are set automatically: - - started implies submitted - - succeeded and failed imply started - - custom outputs and expired do not imply other outputs + - "started" implies "submitted" + - "succeeded" and "failed" imply "started" + - custom outputs and "expired" do not imply other outputs For custom outputs, use the output names not the associated task messages: [runtime] diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 4732f43d58..f827a57d64 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -2010,14 +2010,25 @@ def _set_outputs_itask( itask: 'TaskProxy', outputs: Iterable[str], ) -> None: - """Set requested outputs on a task proxy and spawn children.""" + """Set requested outputs on a task proxy and spawn children. + + If no outputs were specified and the task has no required outputs to + set, set the "success pathway" outputs in the same way that skip mode + does. + """ + outputs = set(outputs) if not outputs: - outputs = itask.state.outputs.iter_required_messages() + outputs = set( + # Set required outputs by default + itask.state.outputs.iter_required_messages() + ) or ( + # Set success pathway outputs + get_skip_mode_outputs(itask) + ) else: # --out=skip is a shortcut to setting all the outputs that # skip mode would. - outputs = set(outputs) - skips = [] + skips: Set[str] = set() if RunMode.SKIP.value in outputs: # Check for broadcasts to task: outputs.remove(RunMode.SKIP.value) @@ -2386,7 +2397,8 @@ def filter_task_proxies( ids: ID strings. warn_no_active: - Whether to log a warning if no matching active tasks are found. + Whether to log a warning if no matching tasks are found in the + pool. inactive: If True, unmatched IDs will be checked against taskdefs and cycle, and any matches will be returned in the second diff --git a/pytest.ini b/pytest.ini index ae3ad912fe..6a359ad15c 100644 --- a/pytest.ini +++ b/pytest.ini @@ -27,6 +27,7 @@ addopts = --verbose # disable pytest-tornasync because it conflicts with pytest-asyncio's auto mode -p no:tornado -m "not linkcheck" +verbosity_assertions = 2 testpaths = cylc/flow/ tests/unit/ diff --git a/tests/integration/run_modes/test_skip.py b/tests/integration/run_modes/test_skip.py index 29d34de7a7..9ad56c3a58 100644 --- a/tests/integration/run_modes/test_skip.py +++ b/tests/integration/run_modes/test_skip.py @@ -112,13 +112,13 @@ async def test_skip_mode_outputs( Skip mode proposal point 2 https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md """ - graph = """ + graph = r""" # By default, all required outputs will be generated # plus succeeded if success is optional: foo? & foo:required_out => success_if_optional & required_outs # The outputs submitted and started are always produced - # and do not need to be defined in outputs: + # and do not need to be defined in [runtime][X][skip]outputs: foo:submitted => submitted_always foo:started => started_always diff --git a/tests/integration/scripts/test_set.py b/tests/integration/scripts/test_set.py index 41fcb7e3bf..713774a67c 100644 --- a/tests/integration/scripts/test_set.py +++ b/tests/integration/scripts/test_set.py @@ -19,11 +19,36 @@ Note: see also functional tests """ +from secrets import token_hex + +from cylc.flow.commands import ( + run_cmd, + set_prereqs_and_outputs, +) from cylc.flow.cycling.integer import IntegerPoint from cylc.flow.data_messages_pb2 import PbTaskProxy from cylc.flow.data_store_mgr import TASK_PROXIES +from cylc.flow.flow_mgr import FLOW_ALL from cylc.flow.scheduler import Scheduler -from cylc.flow.task_state import TASK_STATUS_SUCCEEDED, TASK_STATUS_WAITING +from cylc.flow.task_outputs import ( + TASK_OUTPUT_STARTED, + TASK_OUTPUT_SUBMITTED, + TASK_OUTPUT_SUCCEEDED, +) +from cylc.flow.task_state import ( + TASK_STATUS_SUCCEEDED, + TASK_STATUS_WAITING, +) + + +def outputs_section(*names: str) -> dict: + """Create outputs section with random messages for the given output names. + """ + return { + 'outputs': { + name: token_hex() for name in names + } + } async def test_set_parentless_spawning( @@ -164,3 +189,77 @@ async def test_pre_all(flow, scheduler, run): schd.pool.set_prereqs_and_outputs(['1/z'], [], ['all'], ['all']) warn_or_higher = [i for i in log.records if i.levelno > 30] assert warn_or_higher == [] + + +async def test_no_outputs_given(flow, scheduler, start): + """Test `cylc set` without providing any outputs. + + It should set the "success pathway" outputs. + """ + schd: Scheduler = scheduler( + flow({ + 'scheduling': { + 'graph': { + 'R1': r""" + foo? => alpha + foo:submitted? => bravo + foo:started? => charlie + foo:x => xray + # Optional custom outputs not emitted: + foo:y? => yankee + # Non-success-pathway outputs not emitted: + foo:submit-failed? => delta + """, + }, + }, + 'runtime': { + 'foo': outputs_section('x', 'y'), + }, + }) + ) + async with start(schd): + foo = schd.pool.get_tasks()[0] + await run_cmd( + set_prereqs_and_outputs(schd, [foo.identity], [FLOW_ALL]) + ) + assert set(foo.state.outputs.get_completed_outputs()) == { + TASK_OUTPUT_SUBMITTED, + TASK_OUTPUT_STARTED, + TASK_OUTPUT_SUCCEEDED, + 'x' + } + assert schd.pool.get_task_ids() == { + '1/alpha', + '1/bravo', + '1/charlie', + '1/xray', + } + + +async def test_completion_expr(flow, scheduler, start): + """Test `cylc set` without providing any outputs on a task that has a + custom completion expression.""" + conf = { + 'scheduling': { + 'graph': { + 'R1': 'foo? | foo:x? => bar' + }, + }, + 'runtime': { + 'foo': { + **outputs_section('x'), + 'completion': '(succeeded or x) or failed' + }, + }, + } + schd: Scheduler = scheduler(flow(conf)) + async with start(schd): + foo = schd.pool.get_tasks()[0] + await run_cmd( + set_prereqs_and_outputs(schd, [foo.identity], [FLOW_ALL]) + ) + assert set(foo.state.outputs.get_completed_outputs()) == { + TASK_OUTPUT_SUBMITTED, + TASK_OUTPUT_STARTED, + TASK_OUTPUT_SUCCEEDED, + } diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index ffb255c0cf..04395cf20a 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -96,15 +96,6 @@ } -def pool_get_task_ids( - pool: List['TaskProxy'] -) -> List[str]: - """Return sorted list of IDs of tasks in a task pool.""" - return sorted( - [itask.identity for itask in pool.get_tasks()] - ) - - def get_task_ids( name_point_list: Iterable[Tuple[str, Union['PointBase', str, int]]] ) -> List[str]: @@ -1252,8 +1243,8 @@ async def test_set_failed_complete( ): """Test manual completion of an incomplete failed task.""" id_ = flow(one_conf) - schd = scheduler(id_) - async with start(schd, level=logging.DEBUG) as log: + schd: Scheduler = scheduler(id_) + async with start(schd, level=logging.DEBUG): one = schd.pool.get_tasks()[0] one.state_reset(is_queued=False) @@ -1266,7 +1257,7 @@ async def test_set_failed_complete( regex="failed.* did not complete the required outputs") # Set failed task complete via default "set" args. - schd.pool.set_prereqs_and_outputs([one.identity], None, None, ['all']) + schd.pool.set_prereqs_and_outputs([one.identity], [], [], ['all']) assert log_filter( contains=f'[{one}] removed from active task pool: completed') @@ -1308,49 +1299,45 @@ async def test_set_prereqs( } } ) - schd = scheduler(id_) + schd: Scheduler = scheduler(id_) - async with start(schd) as log: + async with start(schd): # it should start up with foo, bar, baz - assert ( - pool_get_task_ids(schd.pool) == [ - "20400101T0000Z/bar", - "20400101T0000Z/baz", - "20400101T0000Z/foo"] - ) + assert schd.pool.get_task_ids() == { + "20400101T0000Z/bar", + "20400101T0000Z/baz", + "20400101T0000Z/foo", + } # try to set an invalid prereq of qux schd.pool.set_prereqs_and_outputs( - ["20400101T0000Z/qux"], None, ["20400101T0000Z/foo:a"], ['all']) + ["20400101T0000Z/qux"], [], ["20400101T0000Z/foo:a"], ['all']) assert log_filter( contains='20400101T0000Z/qux does not depend on "20400101T0000Z/foo:a"' ) # it should not add 20400101T0000Z/qux to the pool - assert ( - pool_get_task_ids(schd.pool) == [ - "20400101T0000Z/bar", - "20400101T0000Z/baz", - "20400101T0000Z/foo"] - ) + assert schd.pool.get_task_ids() == { + "20400101T0000Z/bar", + "20400101T0000Z/baz", + "20400101T0000Z/foo", + } # set one prereq of inactive task 20400101T0000Z/qux schd.pool.set_prereqs_and_outputs( ["20400101T0000Z/qux"], - None, + [], ["20400101T0000Z/foo:succeeded"], ['all']) # it should add 20400101T0000Z/qux to the pool - assert ( - pool_get_task_ids(schd.pool) == [ - "20400101T0000Z/bar", - "20400101T0000Z/baz", - "20400101T0000Z/foo", - "20400101T0000Z/qux" - ] - ) + assert schd.pool.get_task_ids() == { + "20400101T0000Z/bar", + "20400101T0000Z/baz", + "20400101T0000Z/foo", + "20400101T0000Z/qux", + } # get the 20400101T0000Z/qux task proxy qux = schd.pool.get_task(ISO8601Point("20400101T0000Z"), "qux") @@ -1359,7 +1346,7 @@ async def test_set_prereqs( # set its other prereqs (test implicit "succeeded" and "succeed") # and truncated cycle point schd.pool.set_prereqs_and_outputs( - ["2040/qux"], None, ["2040/bar", "2040/baz:succeed"], ['all']) + ["2040/qux"], [], ["2040/bar", "2040/baz:succeed"], ['all']) # it should now be fully satisfied assert qux.state.prerequisites_all_satisfied() @@ -1381,14 +1368,14 @@ async def test_set_bad_prereqs( 'initial cycle point': '2040', 'graph': {'R1': "foo => bar"}}, }) - schd = scheduler(id_) + schd: Scheduler = scheduler(id_) def set_prereqs(prereqs): """Shorthand so only varible under test given as arg""" schd.pool.set_prereqs_and_outputs( - ["2040/bar"], None, prereqs, ['all']) + ["2040/bar"], [], prereqs, ['all']) - async with start(schd) as log: + async with start(schd): # Invalid: task name wildcard: set_prereqs(["2040/*"]) assert log_filter(contains='Invalid prerequisite task name') @@ -1428,12 +1415,12 @@ async def test_set_outputs_live( } } ) - schd = scheduler(id_) + schd: Scheduler = scheduler(id_) - async with start(schd) as log: + async with start(schd): # it should start up with just 1/foo - assert pool_get_task_ids(schd.pool) == ["1/foo"] + assert schd.pool.get_task_ids() == {"1/foo"} # fake failed foo = schd.pool.get_task(IntegerPoint("1"), "foo") @@ -1441,10 +1428,8 @@ async def test_set_outputs_live( schd.pool.task_events_mgr.process_message(foo, 1, 'failed') # set foo:x: it should spawn bar but not baz - schd.pool.set_prereqs_and_outputs(["1/foo"], ["x"], None, ['all']) - assert ( - pool_get_task_ids(schd.pool) == ["1/bar", "1/foo"] - ) + schd.pool.set_prereqs_and_outputs(["1/foo"], ["x"], [], ['all']) + assert schd.pool.get_task_ids() == {"1/bar", "1/foo"} # Foo should have been removed from the queue: assert '1/foo' not in [ i.identity for i @@ -1452,21 +1437,17 @@ async def test_set_outputs_live( ] # set foo:succeed: it should spawn baz but foo remains incomplete. schd.pool.set_prereqs_and_outputs( - ["1/foo"], ["succeeded"], None, ['all']) - assert ( - pool_get_task_ids(schd.pool) == ["1/bar", "1/baz", "1/foo"] - ) + ["1/foo"], ["succeeded"], [], ['all']) + assert schd.pool.get_task_ids() == {"1/bar", "1/baz", "1/foo"} # it should complete implied outputs (submitted, started) too assert log_filter(contains="setting implied output: submitted") assert log_filter(contains="setting implied output: started") # set foo (default: all required outputs) to complete y. - schd.pool.set_prereqs_and_outputs(["1/foo"], None, None, ['all']) + schd.pool.set_prereqs_and_outputs(["1/foo"], [], [], ['all']) assert log_filter(contains="output 1/foo:succeeded completed") - assert ( - pool_get_task_ids(schd.pool) == ["1/bar", "1/baz"] - ) + assert schd.pool.get_task_ids() == {"1/bar", "1/baz"} async def test_set_outputs_live2( @@ -1490,10 +1471,10 @@ async def test_set_outputs_live2( 'b': 'yacht'}}} } ) - schd = scheduler(id_) + schd: Scheduler = scheduler(id_) async with start(schd) as log: - schd.pool.set_prereqs_and_outputs(["1/foo"], None, None, ['all']) + schd.pool.set_prereqs_and_outputs(["1/foo"], [], [], ['all']) assert not log_filter( contains="did not complete required outputs: ['a', 'b']" ) @@ -1525,24 +1506,22 @@ async def test_set_outputs_future( } } ) - schd = scheduler(id_) + schd: Scheduler = scheduler(id_) async with start(schd) as log: # it should start up with just 1/a - assert pool_get_task_ids(schd.pool) == ["1/a"] + assert schd.pool.get_task_ids() == {"1/a"} # setting inactive task b succeeded should spawn c but not b schd.pool.set_prereqs_and_outputs( - ["1/b"], ["succeeded"], None, ['all']) - assert ( - pool_get_task_ids(schd.pool) == ["1/a", "1/c"] - ) + ["1/b"], ["succeeded"], [], ['all']) + assert schd.pool.get_task_ids() == {"1/a", "1/c"} schd.pool.set_prereqs_and_outputs( items=["1/a"], outputs=["x", "y", "cheese"], - prereqs=None, + prereqs=[], flow=['all'] ) assert log_filter(contains="output 1/a:cheese not found") @@ -1589,20 +1568,21 @@ async def test_set_outputs_from_skip_settings( } ) validate(id_) - schd = scheduler(id_) + schd: Scheduler = scheduler(id_) async with start(schd): # it should start up with just tasks a: - assert pool_get_task_ids(schd.pool) == ['1/a', '2/a'] + assert schd.pool.get_task_ids() == {'1/a', '2/a'} # setting 1/a output to skip should set output x, but not # y (because y is optional). schd.pool.set_prereqs_and_outputs( - ['1/a'], ['skip'], None, ['all']) - assert (pool_get_task_ids(schd.pool) == [ + ['1/a'], ['skip'], [], ['all']) + assert schd.pool.get_task_ids() == { '1/after_asucceeded', '1/after_ax', - '2/a']) + '2/a', + } # Check that the presence of "skip" in outputs doesn't # trigger a warning: @@ -1610,13 +1590,14 @@ async def test_set_outputs_from_skip_settings( # You should be able to set skip as part of a list of outputs: schd.pool.set_prereqs_and_outputs( - ['2/a'], ['skip', 'y'], None, ['all']) - assert (pool_get_task_ids(schd.pool) == [ + ['2/a'], ['skip', 'y'], [], ['all']) + assert schd.pool.get_task_ids() == { '1/after_asucceeded', '1/after_ax', '2/after_asucceeded', '2/after_ax', - '2/after_ay']) + '2/after_ay', + } async def test_prereq_satisfaction( @@ -1648,12 +1629,10 @@ async def test_prereq_satisfaction( schd: Scheduler = scheduler(id_) async with start(schd): # it should start up with just 1/a - assert pool_get_task_ids(schd.pool) == ["1/a"] + assert schd.pool.get_task_ids() == {"1/a"} # spawn b - schd.pool.set_prereqs_and_outputs(["1/a"], ["x"], None, ['all']) - assert ( - pool_get_task_ids(schd.pool) == ["1/a", "1/b"] - ) + schd.pool.set_prereqs_and_outputs(["1/a"], ["x"], [], ['all']) + assert schd.pool.get_task_ids() == {"1/a", "1/b"} b = schd.pool.get_task(IntegerPoint("1"), "b") @@ -1662,7 +1641,7 @@ async def test_prereq_satisfaction( # set valid and invalid prerequisites, by label and message. schd.pool.set_prereqs_and_outputs( prereqs=["1/a:xylophone", "1/a:y", "1/a:w", "1/a:z"], - items=["1/b"], outputs=None, flow=['all'] + items=["1/b"], outputs=[], flow=['all'] ) assert log_filter(contains="1/a:z not found") assert log_filter(contains="1/a:w not found") @@ -1995,7 +1974,7 @@ async def test_remove_by_suicide( schd: 'Scheduler' = scheduler(id_) async with start(schd, level=logging.DEBUG) as log: # it should start up with 1/a and 1/b - assert pool_get_task_ids(schd.pool) == ["1/a", "1/b"] + assert schd.pool.get_task_ids() == {"1/a", "1/b"} a = schd.pool.get_task(IntegerPoint("1"), "a") # mark 1/a as failed and ensure 1/b is removed by suicide trigger @@ -2003,7 +1982,7 @@ async def test_remove_by_suicide( assert log_filter( regex="1/b.*removed from active task pool: suicide trigger" ) - assert pool_get_task_ids(schd.pool) == ["1/a"] + assert schd.pool.get_task_ids() == {"1/a"} # ensure that we are able to bring 1/b back by triggering it log.clear() diff --git a/tests/unit/run_modes/test_skip_units.py b/tests/unit/run_modes/test_skip_units.py index bf5ffa7be6..18b3f73c42 100644 --- a/tests/unit/run_modes/test_skip_units.py +++ b/tests/unit/run_modes/test_skip_units.py @@ -95,15 +95,18 @@ def test_process_outputs(outputs, required, expect): # Create a mocked up task-proxy: rtconf = {'skip': {'outputs': outputs}} itask = SimpleNamespace( - tdef=SimpleNamespace( - rtconfig=rtconf), + tdef=SimpleNamespace(rtconfig=rtconf), state=SimpleNamespace( outputs=SimpleNamespace( iter_required_messages=lambda *a, **k: iter(required), - _message_to_trigger={v: v for v in required} - ))) + _message_to_trigger={v: v for v in required}, + ) + ), + ) - assert process_outputs(itask, rtconf) == ['submitted', 'started'] + expect + assert process_outputs(itask, rtconf) == {'submitted', 'started'}.union( + expect + ) def test_skip_mode_validate(caplog, log_filter): diff --git a/tests/unit/test_task_outputs.py b/tests/unit/test_task_outputs.py index 2306e41234..8d150d64c0 100644 --- a/tests/unit/test_task_outputs.py +++ b/tests/unit/test_task_outputs.py @@ -15,16 +15,20 @@ # along with this program. If not, see . from types import SimpleNamespace +from typing import ( + Optional, + Set, +) import pytest from cylc.flow.task_outputs import ( - TASK_OUTPUTS, TASK_OUTPUT_EXPIRED, TASK_OUTPUT_FAILED, - TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUBMIT_FAILED, + TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUTS, TaskOutputs, get_completion_expression, get_trigger_completion_variable_maps, @@ -247,46 +251,66 @@ def test_format_completion_status(): ) -def test_iter_required_outputs(): - """It should yield required outputs only.""" - # this task has three required outputs and one optional output - outputs = TaskOutputs( - tdef( +@pytest.mark.parametrize( + 'required, optional, expected_required, expected_expression', [ + # this task has three required outputs and one optional output + pytest.param( {TASK_OUTPUT_SUCCEEDED, 'x', 'y'}, - {'z'} - ) - ) - assert set(outputs.iter_required_messages()) == { - TASK_OUTPUT_SUCCEEDED, - 'x', - 'y', - } - - # this task does not have any required outputs (besides the implicitly - # required submitted/started outputs) - outputs = TaskOutputs( - tdef( - # Note: validation should prevent this at the config level + {'z'}, + {TASK_OUTPUT_SUCCEEDED, 'x', 'y'}, + None, + id="3-required-1-optional", + ), + # this task does not have any required outputs (besides the implicitly + # required submitted/started outputs) + # Note: validation should prevent this at the config level + pytest.param( {TASK_OUTPUT_SUCCEEDED, 'x', 'y'}, {TASK_OUTPUT_FAILED}, # task may fail - ) - ) - assert set(outputs.iter_required_messages()) == set() - - # the preconditions expiry/submitted are excluded from this logic when - # defined as optional: - outputs = TaskOutputs( - tdef( + set(), + None, + id="no-required-outputs", + ), + # the preconditions expiry/submitted are excluded from this logic when + # defined as optional: + pytest.param( {TASK_OUTPUT_SUCCEEDED, 'x', 'y'}, {TASK_OUTPUT_EXPIRED}, # task may expire - ) - ) - assert outputs._completion_expression == '(succeeded and x and y) or expired' - assert set(outputs.iter_required_messages()) == { - TASK_OUTPUT_SUCCEEDED, - 'x', - 'y', - } + {TASK_OUTPUT_SUCCEEDED, 'x', 'y'}, + '(succeeded and x and y) or expired', + id="expiry-submitted", + ), + # NOTE: a required output might not be required! + # If success is optional, then apparently-required outputs are made + # implicitly optional. See + # https://github.com/cylc/cylc-flow/pull/6505#issuecomment-2517781523 + pytest.param( + {'x'}, + {TASK_OUTPUT_SUCCEEDED}, + set(), + '(x and succeeded) or failed', + id="implicit-optional", + ), + pytest.param( + set(), + {'x', TASK_OUTPUT_SUCCEEDED}, + set(), + 'succeeded or failed', + id="all-optional", + ), + ] +) +def test_iter_required_outputs( + required: Set[str], + optional: Set[str], + expected_required: Set[str], + expected_expression: Optional[str], +): + """It should yield required outputs only.""" + outputs = TaskOutputs(tdef(required, optional)) + if expected_expression: + assert outputs._completion_expression == expected_expression + assert set(outputs.iter_required_messages()) == expected_required def test_iter_required_outputs__disable():