Skip to content

Commit

Permalink
This commit fixes several CWL translator issues.
Browse files Browse the repository at this point in the history
 - the retrieval of the step name. Before this commit, it was expected that the `out` attribute of `WorkflowStep` was always a list of `string`. Instead, it is a list of `Any`, following the typing hint of the `cwl_utils.parser.WorkflowStep` constructor. In particular, it can have `WorkflowStepOutput` object elements.
 - the creation of the `CommandOutputProcessor` when a schema is defined. Before this commit, the `CommandInputRecordSchema` was not included in the `RecordSchema` case. Similar for `EnumSchema` and `ArraySchema` cases, respectively.
  • Loading branch information
LanderOtto committed Dec 18, 2024
1 parent da3f532 commit 44e2c9a
Showing 1 changed file with 76 additions and 59 deletions.
135 changes: 76 additions & 59 deletions streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def _create_command_output_processor(
optional: bool = False,
) -> CommandOutputProcessor:
# Array type: -> MapCommandOutputProcessor
if isinstance(port_type, get_args(cwl_utils.parser.OutputArraySchema)):
if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)):
return CWLMapCommandOutputProcessor(
name=port_name,
workflow=workflow,
Expand All @@ -284,7 +284,7 @@ def _create_command_output_processor(
),
)
# Enum type: -> create command output processor
elif isinstance(port_type, get_args(cwl_utils.parser.OutputEnumSchema)):
elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)):
# Process InlineJavascriptRequirement
requirements = context["hints"] | context["requirements"]
expression_lib, full_js = _process_javascript_requirement(requirements)
Expand Down Expand Up @@ -312,7 +312,7 @@ def _create_command_output_processor(
optional=optional,
)
# Record type: -> ObjectCommandOutputProcessor
elif isinstance(port_type, get_args(cwl_utils.parser.OutputRecordSchema)):
elif isinstance(port_type, get_args(cwl_utils.parser.RecordSchema)):
# Process InlineJavascriptRequirement
requirements = context["hints"] | context["requirements"]
expression_lib, full_js = _process_javascript_requirement(requirements)
Expand Down Expand Up @@ -1504,6 +1504,7 @@ def _handle_optional_input_variables(
cwl_element: cwl_utils.parser.WorkflowStep,
inner_cwl_element: cwl_utils.parser.Process,
cwl_name_prefix: str,
inner_cwl_name_prefix: str,
default_ports: MutableMapping[str, Port],
name_prefix: str,
step_name: str,
Expand All @@ -1512,17 +1513,13 @@ def _handle_optional_input_variables(
inner_input_ports, outer_input_ports = set(), set()
# Get inner CWL object input names
for element_input in inner_cwl_element.inputs:
inner_cwl_name_prefix = utils.get_name(
name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True
)
global_name = utils.get_name(
step_name, inner_cwl_name_prefix, element_input.id
)
port_name = posixpath.relpath(global_name, step_name)
inner_input_ports.add(port_name)
# Get WorkflowStep input names
for element_input in cwl_element.in_:
step_name = utils.get_name(name_prefix, cwl_name_prefix, cwl_element.id)
cwl_step_name = utils.get_name(
name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True
)
Expand Down Expand Up @@ -2113,9 +2110,61 @@ def _translate_workflow_step(
utils.get_name(step_name, cwl_step_name, n)
for n in cwl_element.scatter or []
]

# Process inner element
run_command = cwl_element.run
if cwl_utils.parser.is_process(run_command):
run_command.cwlVersion = context["version"]
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
if ":" in run_command.id.split("#")[-1]:
cwl_step_name = utils.get_name(
name_prefix,
cwl_name_prefix,
cwl_element.id,
preserve_cwl_prefix=True,
)
inner_cwl_name_prefix = (
step_name
if context["version"] == "v1.0"
else posixpath.join(cwl_step_name, "run")
)
else:
inner_cwl_name_prefix = utils.get_name(
name_prefix,
cwl_name_prefix,
run_command.id,
preserve_cwl_prefix=True,
)
inner_context = context
else:
run_command = cwl_element.loadingOptions.fetcher.urljoin(
cwl_element.loadingOptions.fileuri, run_command
)
run_command = cwl_utils.parser.load_document_by_uri(
run_command, loadingOptions=cwl_element.loadingOptions
)
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
inner_cwl_name_prefix = (
utils.get_name(posixpath.sep, posixpath.sep, run_command.id)
if "#" in run_command.id
else posixpath.sep
)
inner_context = {**context, **{"version": run_command.cwlVersion}}

# Handle optional input variables
default_ports = {}
self._handle_optional_input_variables(
cwl_element=cwl_element,
inner_cwl_element=run_command,
cwl_name_prefix=cwl_name_prefix,
inner_cwl_name_prefix=inner_cwl_name_prefix,
default_ports=default_ports,
name_prefix=name_prefix,
step_name=step_name,
workflow=workflow,
)
# Process inputs
input_ports = {}
default_ports = {}
value_from_transformers = {}
input_dependencies = {}
for element_input in cwl_element.in_:
Expand Down Expand Up @@ -2335,7 +2384,15 @@ def _translate_workflow_step(
external_output_ports = {}
internal_output_ports = {}
for element_output in cwl_element.out:
global_name = utils.get_name(step_name, cwl_step_name, element_output)
global_name = utils.get_name(
step_name,
cwl_step_name,
(
element_output
if isinstance(element_output, str)
else element_output.id
),
)
port_name = posixpath.relpath(global_name, step_name)
# Retrieve or create output port
if global_name not in self.output_ports:
Expand Down Expand Up @@ -2546,7 +2603,15 @@ def _translate_workflow_step(
# Add skip ports if there is a condition
if cwl_condition:
for element_output in cwl_element.out:
global_name = utils.get_name(step_name, cwl_step_name, element_output)
global_name = utils.get_name(
step_name,
cwl_step_name,
(
element_output
if isinstance(element_output, str)
else element_output.id
),
)
port_name = posixpath.relpath(global_name, step_name)
skip_port = (
external_output_ports[global_name]
Expand All @@ -2558,62 +2623,14 @@ def _translate_workflow_step(
)
# Update output ports with the internal ones
self.output_ports |= internal_output_ports
# Process inner element
run_command = cwl_element.run
if cwl_utils.parser.is_process(run_command):
run_command.cwlVersion = context["version"]
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
if ":" in run_command.id.split("#")[-1]:
cwl_step_name = utils.get_name(
name_prefix,
cwl_name_prefix,
cwl_element.id,
preserve_cwl_prefix=True,
)
inner_cwl_name_prefix = (
step_name
if context["version"] == "v1.0"
else posixpath.join(cwl_step_name, "run")
)
else:
inner_cwl_name_prefix = utils.get_name(
name_prefix,
cwl_name_prefix,
run_command.id,
preserve_cwl_prefix=True,
)
else:
run_command = cwl_element.loadingOptions.fetcher.urljoin(
cwl_element.loadingOptions.fileuri, run_command
)
run_command = cwl_utils.parser.load_document_by_uri(
run_command, loadingOptions=cwl_element.loadingOptions
)
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
inner_cwl_name_prefix = (
utils.get_name(posixpath.sep, posixpath.sep, run_command.id)
if "#" in run_command.id
else posixpath.sep
)
context = {**context, **{"version": run_command.cwlVersion}}
self._recursive_translate(
workflow=workflow,
cwl_element=run_command,
context=context
context=inner_context
| {"requirements": {k: v for k, v in requirements.items() if k != "Loop"}},
name_prefix=step_name,
cwl_name_prefix=inner_cwl_name_prefix,
)
# Handle optional input variables
self._handle_optional_input_variables(
cwl_element=cwl_element,
inner_cwl_element=run_command,
cwl_name_prefix=cwl_name_prefix,
default_ports=default_ports,
name_prefix=name_prefix,
step_name=step_name,
workflow=workflow,
)
# Update output ports with the external ones
self.output_ports |= external_output_ports

Expand Down

0 comments on commit 44e2c9a

Please sign in to comment.