Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed some CWL translator issues #628

Merged
merged 9 commits into from
Jan 19, 2025
2 changes: 1 addition & 1 deletion streamflow/core/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def add_path(self, path: str):
self.paths.add(path)

def __repr__(self):
return f"Storage(mount_point={self.mount_point}, size={self.size}, paths={self.paths})"
return f"Storage(mount_point={self.mount_point}, size={self.size}, bind={self.bind}, paths={self.paths})"

def __add__(self, other: Any) -> Storage:
if not isinstance(other, Storage):
Expand Down
4 changes: 2 additions & 2 deletions streamflow/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import uuid
from abc import ABC, abstractmethod
from collections.abc import MutableMapping, MutableSequence
from enum import Enum
from enum import IntEnum
from typing import TYPE_CHECKING, TypeVar, cast

from streamflow.core import utils
Expand Down Expand Up @@ -205,7 +205,7 @@ async def save(self, context: StreamFlowContext) -> None:
)


class Status(Enum):
class Status(IntEnum):
WAITING = 0
FIREABLE = 1
RUNNING = 2
Expand Down
102 changes: 42 additions & 60 deletions streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import urllib.parse
from collections.abc import MutableMapping, MutableSequence
from enum import Enum
from pathlib import PurePosixPath
from pathlib import Path, PurePosixPath
from types import ModuleType
from typing import Any, cast, get_args

Expand Down Expand Up @@ -82,7 +82,12 @@
OnlyNonNullTransformer,
ValueFromTransformer,
)
from streamflow.cwl.utils import LoadListing, SecondaryFile, resolve_dependencies
from streamflow.cwl.utils import (
LoadListing,
SecondaryFile,
process_embedded_tool,
resolve_dependencies,
)
from streamflow.cwl.workflow import CWLWorkflow
from streamflow.deployment.utils import get_binding_config
from streamflow.log_handler import logger
Expand Down Expand Up @@ -250,10 +255,12 @@ def _create_command_output_processor(
port_target: Target | None,
port_type: (
str
| cwl_utils.parser.InputSchema
| cwl_utils.parser.OutputSchema
| MutableSequence[
str,
cwl_utils.parser.OutputSchema,
cwl_utils.parser.InputSchema,
]
),
cwl_element: (
Expand All @@ -267,7 +274,7 @@ def _create_command_output_processor(
optional: bool = False,
) -> CommandOutputProcessor:
# Array type: -> MapCommandOutputProcessor
if isinstance(port_type, get_args(cwl_utils.parser.OutputArraySchema)):
if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)):
return CWLMapCommandOutputProcessor(
name=port_name,
workflow=workflow,
Expand All @@ -284,7 +291,7 @@ def _create_command_output_processor(
),
)
# Enum type: -> create command output processor
elif isinstance(port_type, get_args(cwl_utils.parser.OutputEnumSchema)):
elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)):
# Process InlineJavascriptRequirement
requirements = context["hints"] | context["requirements"]
expression_lib, full_js = _process_javascript_requirement(requirements)
Expand Down Expand Up @@ -312,7 +319,7 @@ def _create_command_output_processor(
optional=optional,
)
# Record type: -> ObjectCommandOutputProcessor
elif isinstance(port_type, get_args(cwl_utils.parser.OutputRecordSchema)):
elif isinstance(port_type, get_args(cwl_utils.parser.RecordSchema)):
# Process InlineJavascriptRequirement
requirements = context["hints"] | context["requirements"]
expression_lib, full_js = _process_javascript_requirement(requirements)
Expand Down Expand Up @@ -622,6 +629,7 @@ def _create_token_processor(
force_deep_listing=force_deep_listing,
only_propagate_secondary_files=only_propagate_secondary_files,
),
optional=optional,
)
# Enum type: -> create output processor
elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)):
Expand Down Expand Up @@ -679,6 +687,7 @@ def _create_token_processor(
)
for port_type in port_type.fields
},
optional=optional,
)
elif isinstance(port_type, MutableSequence):
optional = "null" in port_type
Expand Down Expand Up @@ -1075,7 +1084,7 @@ def _get_load_listing(
def _get_path(element_id: str) -> str:
path = element_id
if "#" in path:
path = path.split("#")[-1]
path = path.split("#")[0]
if path.startswith("file://"):
path = urllib.parse.unquote(path[7:])
return path
Expand Down Expand Up @@ -1406,6 +1415,9 @@ def __init__(
| cwl_utils.parser.Workflow
) = cwl_definition
self.cwl_inputs: MutableMapping[str, Any] = cwl_inputs

if cwl_inputs_path is not None:
cwl_inputs_path = _get_path(Path(cwl_inputs_path).resolve().as_uri())
self.cwl_inputs_path: str | None = cwl_inputs_path
self.default_map: MutableMapping[str, Any] = {}
self.deployment_map: MutableMapping[str, DeployStep] = {}
Expand Down Expand Up @@ -1504,6 +1516,7 @@ def _handle_optional_input_variables(
cwl_element: cwl_utils.parser.WorkflowStep,
inner_cwl_element: cwl_utils.parser.Process,
cwl_name_prefix: str,
inner_cwl_name_prefix: str,
default_ports: MutableMapping[str, Port],
name_prefix: str,
step_name: str,
Expand All @@ -1512,17 +1525,13 @@ def _handle_optional_input_variables(
inner_input_ports, outer_input_ports = set(), set()
# Get inner CWL object input names
for element_input in inner_cwl_element.inputs:
inner_cwl_name_prefix = utils.get_name(
name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True
)
global_name = utils.get_name(
step_name, inner_cwl_name_prefix, element_input.id
)
port_name = posixpath.relpath(global_name, step_name)
inner_input_ports.add(port_name)
# Get WorkflowStep input names
for element_input in cwl_element.in_:
step_name = utils.get_name(name_prefix, cwl_name_prefix, cwl_element.id)
cwl_step_name = utils.get_name(
name_prefix, cwl_name_prefix, cwl_element.id, preserve_cwl_prefix=True
)
Expand Down Expand Up @@ -2113,9 +2122,30 @@ def _translate_workflow_step(
utils.get_name(step_name, cwl_step_name, n)
for n in cwl_element.scatter or []
]

# Process inner element
run_command, inner_cwl_name_prefix, inner_context = process_embedded_tool(
cwl_element=cwl_element,
step_name=step_name,
name_prefix=name_prefix,
cwl_name_prefix=cwl_name_prefix,
context=context,
)

# Handle optional input variables
default_ports = {}
self._handle_optional_input_variables(
cwl_element=cwl_element,
inner_cwl_element=run_command,
cwl_name_prefix=cwl_name_prefix,
inner_cwl_name_prefix=inner_cwl_name_prefix,
default_ports=default_ports,
name_prefix=name_prefix,
step_name=step_name,
workflow=workflow,
)
# Process inputs
input_ports = {}
default_ports = {}
value_from_transformers = {}
input_dependencies = {}
for element_input in cwl_element.in_:
Expand Down Expand Up @@ -2558,62 +2588,14 @@ def _translate_workflow_step(
)
# Update output ports with the internal ones
self.output_ports |= internal_output_ports
# Process inner element
run_command = cwl_element.run
if cwl_utils.parser.is_process(run_command):
run_command.cwlVersion = context["version"]
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
if ":" in run_command.id.split("#")[-1]:
cwl_step_name = utils.get_name(
name_prefix,
cwl_name_prefix,
cwl_element.id,
preserve_cwl_prefix=True,
)
inner_cwl_name_prefix = (
step_name
if context["version"] == "v1.0"
else posixpath.join(cwl_step_name, "run")
)
else:
inner_cwl_name_prefix = utils.get_name(
name_prefix,
cwl_name_prefix,
run_command.id,
preserve_cwl_prefix=True,
)
else:
run_command = cwl_element.loadingOptions.fetcher.urljoin(
cwl_element.loadingOptions.fileuri, run_command
)
run_command = cwl_utils.parser.load_document_by_uri(
run_command, loadingOptions=cwl_element.loadingOptions
)
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
inner_cwl_name_prefix = (
utils.get_name(posixpath.sep, posixpath.sep, run_command.id)
if "#" in run_command.id
else posixpath.sep
)
context = {**context, **{"version": run_command.cwlVersion}}
self._recursive_translate(
workflow=workflow,
cwl_element=run_command,
context=context
context=inner_context
| {"requirements": {k: v for k, v in requirements.items() if k != "Loop"}},
name_prefix=step_name,
cwl_name_prefix=inner_cwl_name_prefix,
)
# Handle optional input variables
self._handle_optional_input_variables(
cwl_element=cwl_element,
inner_cwl_element=run_command,
cwl_name_prefix=cwl_name_prefix,
default_ports=default_ports,
name_prefix=name_prefix,
step_name=step_name,
workflow=workflow,
)
# Update output ports with the external ones
self.output_ports |= external_output_ports

Expand Down
56 changes: 53 additions & 3 deletions streamflow/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import cwl_utils.expression
import cwl_utils.parser
import cwl_utils.parser.utils
from cwl_utils.parser.cwl_v1_2_utils import CONTENT_LIMIT

from streamflow.core.context import StreamFlowContext
Expand Down Expand Up @@ -84,7 +85,8 @@ async def _get_contents(
):
if (cwl_version not in ("v1.0", "v.1.1")) and size > CONTENT_LIMIT:
raise WorkflowExecutionException(
f"Cannot read contents from files larger than {CONTENT_LIMIT / 1024}kB"
f"Cannot read contents from files larger than "
f"{CONTENT_LIMIT / 1024}kB: file {str(path)} is {size / 1024}kB"
)
return await path.read_text(n=CONTENT_LIMIT)

Expand Down Expand Up @@ -606,10 +608,10 @@ async def get_file_token(
def get_name(
name_prefix: str,
cwl_name_prefix: str,
element_id: str,
element_id: Any,
preserve_cwl_prefix: bool = False,
) -> str:
name = element_id.split("#")[-1]
name = (element_id if isinstance(element_id, str) else element_id.id).split("#")[-1]
return (
posixpath.join(posixpath.sep, name)
if preserve_cwl_prefix
Expand Down Expand Up @@ -663,6 +665,54 @@ class LoadListing(Enum):
deep_listing = 2


def process_embedded_tool(
cwl_element: cwl_utils.parser.WorkflowStep,
cwl_name_prefix: str,
step_name: str,
name_prefix: str,
context: MutableMapping[str, Any],
):
run_command = cwl_element.run
inner_context = dict(context)
if cwl_utils.parser.is_process(run_command):
run_command.cwlVersion = context["version"]
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
if ":" in run_command.id.split("#")[-1]:
cwl_step_name = get_name(
name_prefix,
cwl_name_prefix,
cwl_element.id,
preserve_cwl_prefix=True,
)
inner_cwl_name_prefix = (
step_name
if context["version"] == "v1.0"
else posixpath.join(cwl_step_name, "run")
)
else:
inner_cwl_name_prefix = get_name(
name_prefix,
cwl_name_prefix,
run_command.id,
preserve_cwl_prefix=True,
)
else:
run_command = cwl_element.loadingOptions.fetcher.urljoin(
cwl_element.loadingOptions.fileuri, run_command
)
run_command = cwl_utils.parser.load_document_by_uri(
run_command, loadingOptions=cwl_element.loadingOptions
)
cwl_utils.parser.utils.convert_stdstreams_to_files(run_command)
inner_cwl_name_prefix = (
get_name(posixpath.sep, posixpath.sep, run_command.id)
if "#" in run_command.id
else posixpath.sep
)
inner_context |= {"version": run_command.cwlVersion}
return run_command, inner_cwl_name_prefix, inner_context


async def process_secondary_files(
context: StreamFlowContext,
cwl_version: str,
Expand Down
10 changes: 8 additions & 2 deletions streamflow/data/manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import logging
import os
import posixpath
from collections.abc import MutableMapping, MutableSequence
Expand Down Expand Up @@ -101,7 +102,9 @@ def __init__(self, context: StreamFlowContext):
self.context: StreamFlowContext = context

def __repr__(self):
return self._node_repr(next(iter(self._filesystem.children.values())), 0)
return "\n".join(
self._node_repr(node, 0) for node in self._filesystem.children.values()
)

def _node_repr(self, node: _RemotePathNode, level: int) -> str:
tree = level * "\t" + "|-- " + repr(node) + "\n"
Expand Down Expand Up @@ -353,7 +356,8 @@ async def transfer_data(
src_path, context=self.context, location=src_location
).resolve()
) is None:
logger.info(f"Remote file system: {repr(self.path_mapper)}")
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Remote file system: {repr(self.path_mapper)}")
raise WorkflowExecutionException(
f"Error retrieving realpath for {src_path} on location {src_location} "
f"while transferring it to {dst_path} on deployment {dst_connector.deployment_name}"
Expand Down Expand Up @@ -420,6 +424,8 @@ async def transfer_data(
self.register_relation(src_data_location, dst_data_location)
# Otherwise, raise an exception
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Remote file system: {repr(self.path_mapper)}")
raise WorkflowExecutionException(
f"No data locations found for path {src_path} "
f"while trying to map {dst_path} on {dst_location}"
Expand Down
Loading
Loading