Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Jan 18, 2025
1 parent e2ce8a0 commit e40222a
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 95 deletions.
3 changes: 1 addition & 2 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
sphinx==8.1.3
sphinx-jsonschema==1.19.1
sphinx-rtd-theme==3.0.2
pygments==2.18.0
sphinx-rtd-theme==3.0.2
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ antlr4-python3-runtime==4.13.2
asyncssh==2.19.0
bcrypt==4.2.1
cachetools==5.5.0
cwl-utils@git+https://github.com/common-workflow-language/cwl-utils@33a706b
cwl-utils==0.36
importlib-metadata==8.5.0
Jinja2==3.1.5
jsonschema==4.23.0
Expand Down
3 changes: 2 additions & 1 deletion streamflow/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ async def _get_contents(
):
if (cwl_version not in ("v1.0", "v.1.1")) and size > CONTENT_LIMIT:
raise WorkflowExecutionException(
f"Cannot read contents from files larger than {CONTENT_LIMIT / 1024}kB: file {str(path)} is {size} kB"
f"Cannot read contents from files larger than "
f"{CONTENT_LIMIT / 1024}kB: file {str(path)} is {size / 1024} kB"
)
return await path.read_text(n=CONTENT_LIMIT)

Expand Down
109 changes: 20 additions & 89 deletions streamflow/deployment/connector/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,55 +80,21 @@ async def _get_storage_from_binds(
)


async def _resolve_bind(
container_connector: ContainerConnector, binds: MutableSequence[str] | None
) -> MutableSequence[str] | None:
src_paths, dst_paths = zip(*(v.split(":") for v in binds))
return (
[
f"{src}:{dst}"
for src, dst in zip(
await container_connector._resolve_paths(src_paths),
dst_paths,
)
]
if binds is not None
else None
def _parse_mount(mount: str) -> tuple[str, str]:
source = next(
part[4:]
for part in mount.split(",")
if part.startswith("src=") or part.startswith("source=")
)


async def _resolve_mount(
container_connector: ContainerConnector, mounts: MutableSequence[str] | None
) -> MutableSequence[str] | None:
if mounts is not None:
src = []
dst = []
flag = []
for mount in mounts:
flag.append([])
for part in mount.split(","):
if part.startswith("src="):
src.append(part[4:])
elif part.startswith("source="):
src.append(part[7:])
elif part.startswith("dst="):
dst.append(part[4:])
elif part.startswith("dest="):
dst.append(part[5:])
elif part.startswith("destination="):
dst.append(part[12:])
elif part.startswith("target="):
dst.append(part[7:])
else:
flag[-1].append(part)
resolved_src = await container_connector._resolve_paths(src)
if resolved_src != src and logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Resolved source path {src} to {resolved_src}")
return [
f"src={s},dst={d},{','.join(f)}" for s, d, f in zip(resolved_src, dst, flag)
]
else:
return None
destination = next(
part[4:]
for part in mount.split(",")
if part.startswith("dst=")
or part.startswith("dest=")
or part.startswith("destination=")
or part.startswith("target=")
)
return source, destination


class ContainerInstance:
Expand Down Expand Up @@ -570,18 +536,6 @@ async def _prepare_volumes(
command=["mkdir", "-p"] + sources,
)

async def _resolve_paths(self, paths: MutableSequence[str]) -> MutableSequence[str]:
if self._wraps_local():
return [os.path.realpath(p) for p in paths]
else:
return (
await self.connector.run(
location=self._inner_location.location,
command=["readlink", "-f", *paths],
capture_output=True,
)
)[0].split("\n")

def _wraps_local(self) -> bool:
return self._inner_location.local

Expand Down Expand Up @@ -1101,9 +1055,6 @@ async def deploy(self, external: bool) -> None:
await self._check_docker_installed()
# If the deployment is not external, deploy the container
if not external:
self.volume = await _resolve_bind(self, self.volume)
self.mount = await _resolve_mount(self, self.mount)

await self._prepare_volumes(self.volume, self.mount)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Using Docker {await self._get_docker_version()}.")
Expand Down Expand Up @@ -1781,20 +1732,19 @@ async def _populate_instance(self, name: str) -> None:
f"in deployment {self.deployment_name}: [{returncode}]: {stdout}"
)
# Get inner location mount points
if False and self._wraps_local():
if self._wraps_local():
fs_mounts = {
disk.device: disk.mountpoint
for disk in psutil.disk_partitions(all=True)
if disk.fstype not in FS_TYPES_TO_SKIP
and os.access(disk.mountpoint, os.R_OK)
}
fs_host_mounts = {} # todo
else:
stdout, returncode = await self.connector.run(
location=self._inner_location.location,
command=[
"cat",
"/proc/self/mountinfo",
"/proc/1/mountinfo",
],
capture_output=True,
)
Expand All @@ -1804,17 +1754,13 @@ async def _populate_instance(self, name: str) -> None:
for line in stdout.splitlines()
if line.split(" - ")[1].split()[0] not in FS_TYPES_TO_SKIP
}
fs_host_mounts = {
line.split()[4]: line.split()[3] for line in stdout.splitlines()
}
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Host mount points: {fs_mounts}")
else:
raise WorkflowExecutionException(
f"FAILED retrieving volume mounts from `/proc/self/mountinfo` "
f"FAILED retrieving volume mounts from `/proc/1/mountinfo` "
f"in deployment {self.connector.deployment_name}: [{returncode}]: {stdout}"
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Host mount points: {fs_mounts}")

# Get the list of bind mounts for the container instance
stdout, returncode = await self.run(
location=location,
Expand Down Expand Up @@ -1848,18 +1794,7 @@ async def _populate_instance(self, name: str) -> None:
else None
)
if host_mount is not None:
# Get host mount point if the `root` is defined (see man proc -> mountinfo)
binds[dst] = next(
(
mnt_point
for mnt_point, root in fs_host_mounts.items()
if root == host_mount
),
host_mount,
) # Return `mnt_point` if a `root` is equal to `host_mount` else return `host_mount`
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Host mount of {host_mount} is {binds[dst]}")

binds[dst] = host_mount
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Container binds: {binds}")
else:
Expand Down Expand Up @@ -1892,10 +1827,6 @@ async def deploy(self, external: bool) -> None:
if not external:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Using {await self._get_singularity_version()}.")

self.bind = await _resolve_bind(self, self.bind)
self.mount = await _resolve_mount(self, self.mount)

await self._prepare_volumes(self.bind, self.mount)
instance_name = random_name()
deploy_command = [
Expand Down
4 changes: 2 additions & 2 deletions streamflow/deployment/connector/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ def __init__(
mount_point=disk.mountpoint,
size=shutil.disk_usage(disk.mountpoint).free / 2**20,
)
except (FileNotFoundError, PermissionError) as err:
except PermissionError as pe:
logger.warning(
f"Skippping Storage on partition {disk.device} on {disk.mountpoint} "
f"for deployment {self.deployment_name}: {err}"
f"for deployment {self.deployment_name}: {pe}"
)
self._hardware: Hardware = Hardware(
cores=float(psutil.cpu_count()),
Expand Down

0 comments on commit e40222a

Please sign in to comment.