Skip to content

Commit

Permalink
Integrate uvloop async engine
Browse files Browse the repository at this point in the history
This commit integrates the `uvloop` asynchronous engine as a drop-in
replacement for `asyncio` when Python runs on top of the `Cython`
library. The `uvloop` library is faster than the standard async engine,
improving StreamFlow performance epsecially on HTC workflows.
  • Loading branch information
GlassOfWhiskey committed Oct 25, 2024
1 parent 723bac0 commit af22c0c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 10 deletions.
6 changes: 2 additions & 4 deletions examples/mpi/streamflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ workflows:
bindings:
- step: /compile
target:
deployment: k8s-mpi
deployment: helm-mpi
service: openmpi
- step: /execute
target:
deployment: k8s-mpi
deployment: helm-mpi
locations: 2
service: openmpi
deployments:
Expand All @@ -28,7 +28,6 @@ deployments:
type: helm
config:
chart: environment/helm/openmpi
kubeconfig: ~/.kube/config-streamflow
releaseName: openmpi-rel
workdir: /tmp
k8s-mpi:
Expand All @@ -37,5 +36,4 @@ deployments:
files:
- environment/k8s/secrets.yaml
- environment/k8s/deployment.yaml
kubeconfig: ~/.kube/config-streamflow
workdir: /tmp
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ kubernetes_asyncio==31.1.0
psutil==6.1.0
referencing==0.35.1
rdflib==7.0.0
uvloop==0.21.0
yattag==1.16.0
19 changes: 13 additions & 6 deletions streamflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import asyncio
import logging
import os
import platform
import sys
import uuid
import uvloop
from typing import Any, MutableMapping

from streamflow import VERSION, report
Expand Down Expand Up @@ -245,16 +247,21 @@ def build_context(config: MutableMapping[str, Any]) -> StreamFlowContext:
def main(args):
try:
args = parser.parse_args(args)
if sys.platform != "win32" and platform.python_implementation() == "CPython":
logger.info("CPython detected: using uvloop EventLoop implementation")
engine = uvloop
else:
engine = asyncio
if args.context == "ext":
asyncio.run(_async_ext(args))
engine.run(_async_ext(args))
elif args.context == "list":
asyncio.run(_async_list(args))
engine.run(_async_list(args))
elif args.context == "plugin":
asyncio.run(_async_plugin(args))
engine.run(_async_plugin(args))
elif args.context == "prov":
asyncio.run(_async_prov(args))
engine.run(_async_prov(args))
elif args.context == "report":
asyncio.run(_async_report(args))
engine.run(_async_report(args))
elif args.context == "run":
if args.quiet:
logger.setLevel(logging.WARNING)
Expand All @@ -266,7 +273,7 @@ def main(args):
logger.handlers = []
logger.addHandler(colored_stream_handler)
logger.addFilter(HighlitingFilter())
asyncio.run(_async_run(args))
engine.run(_async_run(args))
elif args.context == "schema":
load_extensions()
print(SfSchema().dump(args.version, args.pretty))
Expand Down

0 comments on commit af22c0c

Please sign in to comment.