Skip to content

Commit

Permalink
Fixed optional input when the input type is list or record
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Jan 4, 2025
1 parent c2a08e2 commit 6267b03
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
2 changes: 2 additions & 0 deletions streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ def _create_token_processor(
force_deep_listing=force_deep_listing,
only_propagate_secondary_files=only_propagate_secondary_files,
),
optional=optional,
)
# Enum type: -> create output processor
elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)):
Expand Down Expand Up @@ -684,6 +685,7 @@ def _create_token_processor(
)
for port_type in port_type.fields
},
optional=optional,
)
elif isinstance(port_type, MutableSequence):
optional = "null" in port_type
Expand Down
27 changes: 21 additions & 6 deletions streamflow/deployment/connector/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -1781,7 +1781,7 @@ async def _populate_instance(self, name: str) -> None:
f"in deployment {self.deployment_name}: [{returncode}]: {stdout}"
)
# Get inner location mount points
if self._wraps_local():
if False and self._wraps_local():
fs_mounts = {
disk.device: disk.mountpoint
for disk in psutil.disk_partitions(all=True)
Expand All @@ -1793,7 +1793,7 @@ async def _populate_instance(self, name: str) -> None:
location=self._inner_location.location,
command=[
"cat",
"/proc/1/mountinfo",
"/proc/self/mountinfo",
],
capture_output=True,
)
Expand All @@ -1803,13 +1803,25 @@ async def _populate_instance(self, name: str) -> None:
for line in stdout.splitlines()
if line.split(" - ")[1].split()[0] not in FS_TYPES_TO_SKIP
}
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Host mount points: {fs_mounts}")
fs_host_mounts = {
line.split()[4]: line.split()[3] for line in stdout.splitlines()
}
else:
raise WorkflowExecutionException(
f"FAILED retrieving volume mounts from `/proc/1/mountinfo` "
f"FAILED retrieving volume mounts from `/proc/self/mountinfo` "
f"in deployment {self.connector.deployment_name}: [{returncode}]: {stdout}"
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Host mount points: {fs_mounts}")

def _get_host_mount(path):
for mnt_point, root in fs_host_mounts.values():
if root == path:
logger.debug(f"host mount of {root} is {mnt_point}")
return mnt_point
logger.debug(f"host mount of {path} is {path}")
return path

# Get the list of bind mounts for the container instance
stdout, returncode = await self.run(
location=location,
Expand Down Expand Up @@ -1843,7 +1855,10 @@ async def _populate_instance(self, name: str) -> None:
else None
)
if host_mount is not None:
binds[dst] = host_mount
# if dst == os.path.join(os.sep, "tmp", "streamflow"): # fixme
# logger.debug(f"HARDCODED: replaced {host_mount} with {dst}")
# host_mount = dst

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
binds[dst] = _get_host_mount(host_mount)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Container binds: {binds}")
else:
Expand Down

0 comments on commit 6267b03

Please sign in to comment.