- Intutive syntax: Dataflow-like flow/task composing syntax similar to function call.
- Inspired from
nextflow
's DSL2.
- Inspired from
- Pure python: No DSL, Import/Compose/Modify Task/Flow python objects at will.
- Extensible and interactive due to dynamic nature of Python.
- Task Cache.
- ...
- Extensible and interactive due to dynamic nature of Python.
- Concurrent: Task runs implicitly parallel in asyncio event loop.
- Distributable: Use Dask distributed as Task executor, can deploy in local, cluster, cloud.
- Hybrid execution model.
- Build Flow in Local python or web UI.
- Schedule/Monitor flow execution in remote server through python or web UI.
pip install flowsaber
- A minimal working example consists most features and usages of
flowsaber
.
from flowsaber.api import *
@task
def add(self, num): # self is optional
return num + 1
@task
def multiply(num1, num2):
return num1 * num2
@shell
def write(num):
"""echo {num} > {num}.txt"""
return '*.txt'
@task
def read(f: File):
return open(str(f)).readlines()
@flow
def sub_flow(num):
return add(num) | map_(lambda x: x ** 2) | add
@flow
def my_flow(num):
[sub_flow(num), sub_flow(num)] | multiply \
| write | read | flatten \
| map_(lambda x: int(x.strip())) \
| view
num_ch = Channel.values(1, 2, 3, 4, 5, 6, 7, 8)
# resolve dependencies
workflow = my_flow(num=num_ch)
run(workflow)
This is a bioinformatics workflow, rewrite verion of snakemake tutorial
from flowsaber.api import *
@shell
def bwa(self, fa: File, fastq: File): # input will be automatically converted if has type annotation
"""bwa mem -t {self.config.cpu} {fa} {fastq} | samtools view -Sb - > {fastq.stem}.bam"""
return "*.bam" # for ShellTask, str variable in the return will be treated as File and globed
@shell
def sort(bam: File): # self is optional in case you don't want to access the current task
"""samtools sort -o {sorted_bam} {bam}"""
sorted_bam = f"{bam.stem}.sorted.bam"
return sorted_bam
@shell(publish_dirs=["results/vcf"])
def call(fa: File, bams: list): # In case you need to write some python codes
"""samtools mpileup -g -f {fa} {bam_files} | bcftools call -mv - > all.vcf"""
bam_files = ' '.join(str(bam) for bam in bams)
return "all.vcf"
@task
def stats(vcf: File):
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from pysam import VariantFile
quals = [record.qual for record in VariantFile(str(vcf))]
plt.hist(quals)
plt.savefig("report.svg")
@flow
def call_vcf_flow():
def _call(bams): # task is normal function, use python as wish
return call(fa, bams)
context = flowsaber.context
fa = Channel.value(context.fa)
fastq = Channel.values(*context.fastq)
bam1 = bwa(fa, fastq) # automatically clone channel
bam2 = bwa(fa, fastq)
mix(bam1, bam2) | sort | collect | _call | stats
prefix = 'tests/test_flow/snamke-demo.nosync/data'
with flowsaber.context({
"fa": f'{prefix}/genome.fa',
"fastq": [f'{prefix}/samples/{sample}' for sample in ['A.fastq', 'B.fastq', 'C.fastq']]
}):
# resolve dependency
workflow = call_vcf_flow()
run(workflow)
Both server
and agent
need to be run in background before submitting flowruns.
In bash shell.
flowsaber server
In bash shell.
flowsaber agent --server "http://127.0.0.1:8000" --id test
In python script or IPython console.
from flowsaber.api import *
@task
def add(num):
print("This is meesage send by print to stdout in task")
print("This is meesage send by print to stderr in task", file= sys.stderr)
a = 1
for i in range(10000000):
a += 1
return num + 1
@flow
def myflow(num):
return num | add | add | view | add | view
num_ch = Channel.values(*list(range(10)))
f = myflow(num_ch)
run(f, server_address="http://127.0.0.1:8000", agent_id="test")
python -m pytest tests -s -o log_cli=True -vvvv
- Pbs/Torque executor
- More cache mode.
- Supportrun in Cloud platform.
- Run CWL script, Convert between CWL and flowsaber flow.