-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmain.nf
1582 lines (1281 loc) · 58 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env nextflow
/*
Stenglein lab metagenomic classification pipeline
implemented in nextflow
Feb 25, 2022
Mark Stenglein
*/
// output usage message
params.help = false
params.h = false
// TODO: command line arg processing and validating
// TODO: check that appropriate refseq files exist (host bt index, nr/nt, etc.)
/*
These fastq files represent the main input to this workflow
See here for info on glob pattern matching:
https://docs.oracle.com/javase/tutorial/essential/io/fileOps.html#glob
This is setup to handle possible multiple directories containing fastq
If fastq are in multiple directories, the directories should be provided as
a comma-separated list (no spaces)
*/
def fastq_dirs = params.fastq_dir.tokenize(',')
// construct list of directories in which to find fastq
fastq_dir_list = []
for (dir in fastq_dirs){
def file_pattern = "${dir}/${params.fastq_pattern}"
fastq_dir_list.add(file_pattern)
}
Channel
.fromFilePairs(fastq_dir_list, size: -1, checkIfExists: true, maxDepth: 1)
// .into {samples_ch_qc; samples_ch_trim; samples_ch_count; host_setup_ch}
.into {initial_fastq_subsample_ch; initial_fastq_ch}
// define usage output message
// TODO:
def helpMessage() {
log.info """
Usage:
The typical command for running the pipeline is as follows:
nextflow run main.nf --
Mandatory arguments:
One of:
--host_map_file The path to a file that contains host filtering
info for all datasets or subsets of datasets.
This enables different filtering for different datasets
This file should be a 2-column tab-delimited file
with:
- the first column defining dataset IDs or patterns that will
match dataset IDs
- the second column will be the path of a bowtie index that will be
used to filter out host reads
--host bt_index The path to a bowtie index that will be used
to filter host reads for all datasets
Optional arguments:
--outdir Output directory into which to place final results files
--help This usage statement.
--h This usage statement.
"""
}
if (params.help || params.h) {
helpMessage()
exit 0
}
/*
Check input parameters
*/
def check_params () {
// must specify one and only one of these 2 host mapping
if (!params.host_map_file && !params.host_bt_index){
log.info """
Warning: you have not specified either of these two parameters.
No host filtering will be performed.
1. host_map_file (--host_map_file) or
2. host_bt_index (--host_bt_index)
"""
helpMessage()
}
if (params.host_map_file && params.host_bt_index){
log.info """
Error: you may specify only one of these two parameters:
1. host_map_file (--host_map_file) and
2. host_bt_index (--host_bt_index)
"""
helpMessage()
// Stop execution
System.exit(1)
}
if (params.host_map_file) {
host_map_file = file(params.host_map_file)
if (!host_map_file.exists()) {
log.info """
Error: host_map_file ($params.host_map_file) does not exist
"""
helpMessage()
}
}
}
host_map = [:]
/*
This function defines bowtie indexes to be used to do host filtering
This can be one index for all datasets or different indexes for
different datasets.
*/
def setup_host_indexes () {
// Store the bowtie index to be used for each dataset ID or dataset ID pattern
if (params.host_map_file) {
host_map_file = file(params.host_map_file)
// read through the file
allLines = host_map_file.readLines()
for( line : allLines ) {
// split by tabs
fields = line.tokenize()
host_map.put(fields[0], fields[1])
}
}
else {
// because of the way pattern matching works below
// this empty string will match all sample IDS
// and host will be set to params.host_bt_index
host_map.put("" , params.host_bt_index)
}
}
/*
Get the bowtie index for host filtering for one dataset
*/
def get_dataset_host_index (sample_id) {
def index = ""
// first, check for a direct match
if (host_map.containsKey(sample_id)) {
index = host_map.get(sample_id)
}
// if that doesn't work, check for a pattern match
// using groovy's find operator to do pattern matching
// see: https://groovy-lang.org/operators.html#_find_operator
if (!index) {
host_map.each {
key, value ->
// Pattern matching
// TODO: if pattern ($key) is empty, this will evaluate to true
// is this the expected behavior?
if (sample_id =~ /$key/) {
index = value
}
}
}
// no index found: no host filtering will be performed
if (!index) {
log.info "Info: no host-filtering index found for $sample_id. No host filtering will be performed\n"
index = "dummy"
}
return (index)
}
// check parameters
check_params()
// setup indexes for host filtering
setup_host_indexes()
/*
Setup some initial indexes and dictionaries needed by downstream processes.
Only do this once at beginning.
*/
process subsample_input {
label 'lowmem'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/seqtk:1.3--h5bf99c6_3"
} else {
container "quay.io/biocontainers/seqtk:1.3--h5bf99c6_3"
}
when:
params.subsample_fraction
input:
tuple val (sample_id), path(input_fastq) from initial_fastq_subsample_ch
output:
tuple val (sample_id), path("*.ss.fastq*") into subsampled_fastq_ch
script:
def r1 = input_fastq[0]
def r1_ss = r1.name.replaceAll("fastq", "ss.fastq")
def r2 = input_fastq[1]
def r2_ss = input_fastq[1] ? r2.name.replaceAll("fastq", "ss.fastq") : ""
def r1_command = "seqtk sample $r1 ${params.subsample_fraction} | gzip > $r1_ss"
def r2_command = input_fastq[1] ? "seqtk sample $r2 ${params.subsample_fraction} | gzip > $r2_ss" : ""
"""
$r1_command
$r2_command
"""
}
/*
Fork fastq input channel
*/
if (params.subsample_fraction) {
subsampled_fastq_ch.into {samples_ch_qc; samples_ch_trim; samples_ch_count; host_setup_ch}
} else {
initial_fastq_ch.into {samples_ch_qc; samples_ch_trim; samples_ch_count; host_setup_ch}
}
/*
Setup some initial indexes and dictionaries needed by downstream processes.
Only do this once at beginning.
*/
process setup_indexes {
output:
val("indexes_complete") into post_index_setup_ch
script:
"""
"""
}
/*
Run fastqc on input fastq
*/
process initial_qc {
label 'lowmem_non_threaded'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/fastqc:0.11.9--0"
} else {
container "quay.io/biocontainers/fastqc:0.11.9--0"
}
input:
tuple val(sample_id), path(initial_fastq) from samples_ch_qc
output:
// path("*.html") into initial_qc_fastqc_html_ch
path("*.zip") into initial_qc_fastqc_zip_ch
script:
"""
fastqc $initial_fastq
"""
}
/*
Count initial fastq
*/
process initial_fastq_count {
label 'lowmem_non_threaded'
publishDir "${params.counts_out_dir}", mode:'link'
input:
tuple val(sample_id), path(initial_fastq) from samples_ch_count
output:
path("${sample_id}_initial_count.txt") into post_count_initial_ch
shell:
// only count the first file because for paired-read data both files
// will have the same # of reads.
// for an explanation of the xargs command used for arithmetic in a pipe, see:
// https://askubuntu.com/questions/1203063/how-can-i-pipe-the-result-of-the-wc-command-into-an-arithmetic-expansion
'''
zcat -f !{initial_fastq[0]} | wc -l | xargs bash -c 'echo $(($0 / 4))' | awk '{print "!{sample_id}" "\tinitial\t" $1}' > "!{sample_id}_initial_count.txt"
'''
}
/*
Use multiqc to merge initial fastqc reports
*/
process initial_multiqc {
publishDir "${params.outdir}", mode:'link'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/multiqc:1.11--pyhdfd78af_0"
} else {
container "quay.io/biocontainers/multiqc:1.11--pyhdfd78af_0"
}
input:
path("fastqc_zips/*") from initial_qc_fastqc_zip_ch.collect()
output:
path("initial_qc_report.html")
script:
"""
multiqc -n "initial_qc_report.html" -m fastqc fastqc_zips
"""
}
/*
Use cutadapt to trim off adapters and low quality bases
*/
process trim_adapters_and_low_quality {
label 'lowmem_non_threaded'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/cutadapt:3.5--py39h38f01e4_0"
} else {
container "quay.io/biocontainers/cutadapt:3.5--py39h38f01e4_0"
}
input:
tuple val(sample_id), path(initial_fastq) from samples_ch_trim
output:
tuple val(sample_id), path("*_f.fastq") optional true into post_trim_ch
tuple val(sample_id), path("*_f.fastq") optional true into post_trim_count_ch
// TODO: parameterize adapter sequences
script:
// this handles paired-end data, in which case must specify a paired output file
def paired_output = initial_fastq[1] ? "-p ${sample_id}_R2_f.fastq" : ""
def paired_adapters = initial_fastq[1] ? "-A AGATCGGAAGAGC -G GCTCTTCCGATCT -A AGATGTGTATAAGAGACAG -G CTGTCTCTTATACACATCT" : ""
// TODO: don't trim this much for non-amplicon data!
def paired_trimming = initial_fastq[1] ? "-U $params.always_trim_5p_bases -U -${params.always_trim_3p_bases}" : ""
"""
cutadapt \
-a AGATCGGAAGAGC -g GCTCTTCCGATCT -a AGATGTGTATAAGAGACAG -g CTGTCTCTTATACACATCT \
$paired_adapters \
-q 30,30 \
--minimum-length ${params.post_trim_min_length} \
-u ${params.always_trim_5p_bases} \
-u -${params.always_trim_3p_bases} \
$paired_trimming \
-o ${sample_id}_R1_f.fastq \
$paired_output \
$initial_fastq
"""
}
/*
Count post-trimming fastq
*/
process trimmed_fastq_count {
label 'lowmem_non_threaded'
publishDir "${params.counts_out_dir}", mode:'link'
input:
tuple val(sample_id), path(trimmed_fastq) from post_trim_count_ch
output:
path("${sample_id}_trimmed_count.txt") into post_count_trim_ch
shell:
// only count the first file because for paired-read data both files
// will have the same # of reads.
'''
zcat -f !{trimmed_fastq[0]} | wc -l | xargs bash -c 'echo $(($0 / 4))' | awk '{print "!{sample_id}" "\tpost_trimming\t" $1}' > "!{sample_id}_trimmed_count.txt"
'''
}
/*
Collapse duplicate reads - likely PCR duplicates: duplicated in any case
*/
process collapse_duplicate_reads {
label 'lowmem_threaded'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/cd-hit-auxtools:4.8.1--h7d875b9_1"
} else {
container "quay.io/biocontainers/cd-hit-auxtools:4.8.1--h7d875b9_1"
}
input:
// the filter{size()} functionality here checks if fastq is empty,
// which causes cd-hit-dup to choke
// see: https://stackoverflow.com/questions/47401518/nextflow-is-an-input-file-empty
// this means that fastq that are empty at this stage will just stop going through pipeline
// TODO: there shouldn't be an asterisk (spread operator) in this filter step ?
tuple val(sample_id), path(input_fastq) from post_trim_ch.filter{ it[1]*.getAt(0).size() > 0}
output:
tuple val(sample_id), path("*_fu.fastq") optional true into post_collapse_count_ch
tuple val(sample_id), path("*_fu.fastq") optional true into post_collapse_qc_ch
tuple val(sample_id), path("*_fu.fastq") optional true into host_filtering_ch
script:
// this handles paired-end data, in which case must specify a paired output file
def r1 = input_fastq[0]
def r2 = input_fastq[1]
def prefix_param = input_fastq[1] ? "-u 30" : "-u 30"
def paired_input = input_fastq[1] ? "-i2 $r2" : ""
def paired_output = input_fastq[1] ? "-o2 ${sample_id}_R2_fu.fastq" : ""
"""
cd-hit-dup \
-e $params.mismatches_allowed \
$prefix_param \
-i $r1 \
$paired_input \
-o ${sample_id}_R1_fu.fastq \
$paired_output \
"""
}
/*
Count post-collapse fastq
*/
process collapsed_fastq_count {
label 'lowmem_non_threaded'
publishDir "${params.counts_out_dir}", mode:'link'
input:
tuple val(sample_id), path(collapsed_fastq) from post_collapse_count_ch
output:
path("${sample_id}_collapsed_count.txt") into post_count_collapse_ch
shell:
// only count the first file because for paired-read data both files
// will have the same # of reads.
'''
zcat -f !{collapsed_fastq[0]} | wc -l | xargs bash -c 'echo $(($0 / 4))' | awk '{print "!{sample_id}" "\tpost_collapse\t" $1}' > "!{sample_id}_collapsed_count.txt"
'''
}
/*
Use fastqc to do QC on post-collapsed fastq
*/
process post_collapse_qc {
label 'lowmem_non_threaded'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/fastqc:0.11.9--0"
} else {
container "quay.io/biocontainers/fastqc:0.11.9--0"
}
input:
tuple val(sample_id), path(input_fastq) from post_collapse_qc_ch
output:
// path("*.html") into post_trim_fastqc_html_ch
path("*.zip") into post_trim_fastqc_zip_ch
script:
"""
fastqc $input_fastq
"""
}
/*
Use multiqc to merge post-trimming reports
*/
process post_preprocess_multiqc {
publishDir "${params.outdir}", mode:'link'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/multiqc:1.11--pyhdfd78af_0"
} else {
container "quay.io/biocontainers/multiqc:1.11--pyhdfd78af_0"
}
input:
path("fastqc_zips/*") from post_trim_fastqc_zip_ch.collect()
output:
path("post_trim_qc_report.html")
"""
multiqc -n "post_trim_qc_report.html" -m fastqc -m cutadapt fastqc_zips
"""
}
/*
Use bowtie2 to remove host-derived reads
*/
process host_filtering {
label 'lowmem_threaded'
publishDir "${params.host_filtered_out_dir}", mode:'link'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/bowtie2:2.4.5--py39ha4319a6_1"
} else {
container "quay.io/biocontainers/bowtie2:2.4.5--py39ha4319a6_1"
}
input:
tuple val(sample_id), path(input_fastq) from host_filtering_ch
output:
tuple val(sample_id), path("*_fuh.fastq") optional true into post_host_filter_ch
tuple val(sample_id), path("*_fuh.fastq") optional true into post_host_filter_count_ch
tuple val(sample_id), path("*_fuh.fastq") optional true into post_host_filter_remap_ch
script:
def bt_index = get_dataset_host_index(sample_id)
// handle single-end or paired-end inputs
def r1 = input_fastq[0]
def r2 = input_fastq[1] ? input_fastq[1] : ""
def bowtie_file_input = input_fastq[1] ? "-1 $r1 -2 $r2" : "-U $r1"
def bowtie_file_output = input_fastq[1] ? "--un-conc ${sample_id}_R%_fuh.fastq" : "--un ${sample_id}_R1_fuh.fastq"
if (bt_index == "dummy") {
// this is the case where we don't have a host index specified
// for this sample. Don't do host filtering. Instead, just
// use the ln (link) command to effectively copy the input fastq file
// to the output file
// handle paired end data case
def r2_dummy_cmd = input_fastq[1]? "ln $r2 ${sample_id}_R2_fuh.fastq" : ""
"""
ln $r1 ${sample_id}_R1_fuh.fastq
$r2_dummy_cmd
"""
}
else {
// case where there is a bowtie index specified for host filtering
"""
bowtie2 \
-x "${bt_index}" \
--local \
-q \
$bowtie_file_input \
--sensitive \
--score-min "C,${params.host_bt_min_score},0" \
-p ${task.cpus} \
$bowtie_file_output 2> "${sample_id}.host_filtering_bt.log" > /dev/null
"""
}
}
/*
Count post-host-filtering fastq
*/
process host_filtered_fastq_count {
label 'lowmem_non_threaded'
publishDir "${params.counts_out_dir}", mode:'link'
input:
tuple val(sample_id), path(filtered_fastq) from post_host_filter_count_ch
output:
path("${sample_id}_host_filtered_count.txt") into post_count_host_ch
shell:
// only count the first file because for paired-read data both files
// will have the same # of reads.
'''
zcat -f !{filtered_fastq[0]} | wc -l | xargs bash -c 'echo $(($0 / 4))' | awk '{print "!{sample_id}" "\tpost_host_filtered\t" $1}' > "!{sample_id}_host_filtered_count.txt"
'''
}
/*
Collect all the fastq counts from various steps of processing and create a plot / excel summary
*/
process tabulate_fastq_counts {
publishDir "${params.outdir}", mode: 'link'
// singularity info for this process
if (workflow.containerEngine == 'singularity') {
container "library://stenglein-lab/r_taxonomy_tools/r_taxonomy_tools:1.0.0"
}
input:
path(all_count_files) from post_count_initial_ch.concat(post_count_collapse_ch, post_count_trim_ch, post_count_host_ch).collect()
output:
path ("all_read_counts.txt")
path ("filtering_plots.pdf")
script:
"""
Rscript ${params.R_bindir}/process_fastq_counts.R ${params.R_bindir} ${all_count_files}
"""
}
/*
Assemble reads remaining after host filtering
*/
process assemble_remaining_reads {
label 'highmem'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/spades:3.15.4--h95f258a_0"
} else {
container "quay.io/biocontainers/spades:3.15.4--h95f258a_0"
}
// Spades will fail if, for instance, there are very few reads in the post-host-filtered datasets
// this is expected for some kinds of datasets, such as water negative control datasets
// we don't want the whole pipeline to stop in this case, so set errorStrategy to ignore
// in this case, the pipeline
// see: https://www.nextflow.io/docs/latest/process.html#errorstrategy
errorStrategy 'ignore'
input:
tuple val(sample_id), path(input_fastq) from post_host_filter_ch
output:
tuple val(sample_id), path("${sample_id}.spades"), path(input_fastq) into post_assembly_ch
script:
// handle single-end or paired-end inputs
def r1 = input_fastq[0]
def r2 = input_fastq[1] ? input_fastq[1] : ""
def spades_input = input_fastq[1] ? "-1 $r1 -2 $r2" : "-s $r1"
// def bowtie_file_output = input_fastq[1] ? "--un-conc ${sample_id}_R%_fuh.fastq" : "--un ${sample_id}_R1_fuh.fastq"
"""
# run spades
spades.py -o ${sample_id}.spades ${spades_input} -t ${task.cpus}
"""
}
/*
Get contigs from assembly, converting into 1-line fasta format
*/
process retrieve_contigs {
publishDir "${params.contigs_out_dir}", mode:'link', pattern:"*.fa"
label 'lowmem'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/seqtk:1.3--h5bf99c6_3"
} else {
container "quay.io/biocontainers/seqtk:1.3--h5bf99c6_3"
}
input:
tuple val(sample_id), path("${sample_id}.spades"), path(input_fastq) from post_assembly_ch
output:
tuple val(sample_id), path("${sample_id}_contigs.fa"), path(input_fastq) into post_contigs_ch
script:
"""
# consolidate assembly output
# this forces contigs fasta into 1-line format
# also remove contigs shorter than minimum_contig_length bases
seqtk seq -A -L ${params.minimum_contig_length} ${sample_id}.spades/contigs.fasta > ${sample_id}_contigs.fa
"""
}
/*
Map reads to contigs so that contigs can be weighted by the # of
reads they account for.
*/
process quantify_read_mapping_to_contigs {
label 'lowmem_threaded'
// publishDir "${params.contigs_out_dir}", mode:'link'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/bowtie2:2.4.5--py39ha4319a6_1"
} else {
container "quay.io/biocontainers/bowtie2:2.4.5--py39ha4319a6_1"
}
input:
tuple val(sample_id), path(contigs), path(input_fastq) from post_contigs_ch
output:
tuple val(sample_id), path("${sample_id}_contig_weights.txt"), path(contigs), path(contig_mapping_sam) into post_contigs_weight_ch
script:
def r1 = input_fastq[0]
"""
bowtie2-build $contigs contig_bt_index
# C,120,1 makes min score 120 -> ~corresponds to 100% identity over ~60 bases
# TODO: make score configurable?
bowtie2 -x contig_bt_index --local --score-min C,120,1 -q -U $r1 -p ${task.cpus} -S contig_mapping_sam
# count the # of reads that hit each contig
# tally_sam_subjects script is just a series of piped bas commands: could move it here.
${params.scripts_bindir}/tally_sam_subjects contig_mapping_sam > ${sample_id}_contig_weights.txt
"""
}
/*
Optionally merge contigs and singletons
*/
process merge_contigs_and_singletons {
label 'lowmem_threaded'
// publishDir "${params.contigs_out_dir}", mode:'link'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/samtools:1.17--h00cdaf9_0"
} else {
container "quay.io/biocontainers/samtools:1.17--h00cdaf9_0"
}
input:
tuple val(sample_id), path(contig_weights), path(contigs), path(contig_mapping_sam) from post_contigs_weight_ch
output:
tuple val(sample_id), path(contig_weights), path("${sample_id}_contigs_and_singletons.fasta") into post_contigs_singletons_ch
script:
// classify singletons and contigs - slower but more thorough
if (params.classify_singletons)
"""
# create file of non-aligning aka singleton reads
samtools view -f 4 contig_mapping_sam | samtools fasta > ${sample_id}_non_contig_mapping_reads.fasta
# concatenate contigs and singleton
cat ${sample_id}_non_contig_mapping_reads.fasta $contigs > ${sample_id}_contigs_and_singletons.fasta
"""
// don't classify singletons - only classify contigs
else
"""
# simply create a link of the contigs file named contigs_and_singletons
# TODO: fix this misleading naming scheme in case when not considering
# singletons
ln $contigs ${sample_id}_contigs_and_singletons.fasta
"""
}
/*
Create a channel containing the path to the directory containing a local copy of the blast nt
database. This is so that this directory can be staged (by link) in the work directory.
*/
blast_db_ch = Channel.fromPath( params.local_nt_database_dir, type: 'dir', checkIfExists: true)
/*
This process confirms that the local NT database is valid
*/
process check_local_blast_database {
label 'process_low'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/blast:2.12.0--pl5262h3289130_0"
} else {
container "quay.io/biocontainers/blast:2.12.0--pl5262h3289130_0"
}
input:
path(local_nt_database_dir) from blast_db_ch
output:
// tuple path("${local_nt_database_dir}"), val("${params.local_nt_database_name}") into post_blast_db_check_ch
path(local_nt_database_dir) into post_blast_db_check_ch
script:
def local_nt_database = "${local_nt_database_dir}/${params.local_nt_database_name}"
"""
# check BLAST database
# check for expected .nal file: if not present, output a helpful warning message
if [ ! -f "${local_nt_database}.nal" ]
then
echo "ERROR: it does not appear that ${local_nt_database} is a valid BLAST database."
fi
# check validity of database with blastdbcmd. If not valid, this will error
# and stop pipeline.
blastdbcmd -db ${local_nt_database} -info
"""
}
/*
Create a channel containing the path to the directory containing a local copy of the diamond
database. This is so that this directory can be staged (by link) in the work directory.
*/
diamond_db_ch = Channel.fromPath( params.local_diamond_database_dir, type: 'dir', checkIfExists: true)
/*
This process confirms that the local diamond database is valid
*/
process check_local_diamond_database {
label 'process_low'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/diamond:2.0.14--hdcc8f71_0"
} else {
container "quay.io/biocontainers/diamond:2.0.14--hdcc8f71_0"
}
input:
path(local_diamond_database_dir) from diamond_db_ch
output:
path(local_diamond_database_dir) into post_diamond_db_check_ch
script:
def local_dmd_database = "${local_diamond_database_dir}/${params.local_diamond_database_name}"
"""
# check Diamond database
# check for expected file: if not present, output a helpful warning message
if [ ! -f "${local_dmd_database}" ]
then
echo "ERROR: it does not appear that Diamond database ${local_dmd_database} exists."
fi
# check validity of database with diamond dbinfo. If not valid, this will error
# and stop pipeline.
diamond dbinfo --db ${local_dmd_database}
"""
}
/*
Create a channel containing the path to an (optional) local copy of the
NCBI taxonomy databases
*/
ncbi_tax_ch = Channel.empty()
// if this path was provided as a parameter, then create a channel
// from this path and set a boolean to true to indicate it's an existing
// directory
if (params.ncbi_tax_dir) {
ncbi_tax_ch = Channel.fromPath( params.ncbi_tax_dir )
.map { path -> [ path , true ] }
} else {
// if this path was *not* provided as a parameter, then create a channel
// from a bogus path "ncbi_tax_dir" and set a boolean to false
// to indicate it *doesn't* refer to an existing directory
ncbi_tax_ch = Channel.fromPath( "ncbi_tax_dir" )
.map { path -> [ path , false ] }
}
/*
Download NCBI Taxonomy database files if necessary
*/
process download_ncbi_taxonomy_if_necessary {
label 'process_low'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/curl:7.80.0"
} else {
container "quay.io/biocontainers/curl:7.80.0"
}
input:
tuple path(ncbi_tax_dir), val(existing_db) from ncbi_tax_ch
output:
tuple path (ncbi_tax_dir), val(existing_db) into post_ncbi_tax_download_ch
script:
// if a local directory containing the ncbi taxonomy database is specified,
// check that it exists and contains the expected sqlite db file
existing_db ?
"""
# check that the directory exists
if [ ! -d "${ncbi_tax_dir}" ] ; then
echo "ERROR: BLAST taxonomy directory ${ncbi_tax_dir} (--ncbi_tax_dir) does not exist."
exit 1
fi
# check that appropriate files exist
if [ ! -f "${ncbi_tax_dir}/${params.ncbi_tax_db}" ] ; then
echo "ERROR: required NCBI taxonomy database file ${params.ncbi_tax_db} (--ncbi_tax_db) not in directory ${ncbi_tax_dir} (--ncbi_tax_dir)."
exit 1
fi
""" :
// if tax db doesn't already exist : download the necessary files and keep track of directory
// log.info("Downloading the NCBI Taxonomy database files.")
"""
rm $ncbi_tax_dir
mkdir $ncbi_tax_dir
${params.scripts_bindir}/download_taxonomy_databases
mv *.dmp $ncbi_tax_dir
"""
}
/*
Pre-process NCBI Taxonomy database files
*/
process preprocess_ncbi_taxonomy_files {
label 'process_low'
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/perl:5.26.2"
} else {
container "quay.io/biocontainers/perl:5.26.2"
}
input:
tuple path (ncbi_tax_dir), val(existing_db) from post_ncbi_tax_download_ch
output:
tuple path ("prepped_ncbi_tax_dir"), val(existing_db) into post_ncbi_tax_prep_ch
script:
existing_db ?
// if the path to a local copy of the ncbi taxonomy database is specified, simply link to the directory
"""
echo "nothing to do"
ln -s $ncbi_tax_dir prepped_ncbi_tax_dir
""" :
// deal with the newly downloaded *.dmp files
// pull out the info we need: this will be dumped into a sqlite db in the
// next downstream process
"""
rm -rf prepped_ncbi_tax_dir
mkdir -p prepped_ncbi_tax_dir
${params.scripts_bindir}/preprocess_dmp_files $ncbi_tax_dir prepped_ncbi_tax_dir ${params.scripts_bindir}
"""
}
/*
Setup NCBI Taxonomy databases: use existing installation if available
or download and create new one
*/
process create_ncbi_tax_sqlite_db {
label 'process_low'
publishDir "${params.tax_db_out_dir}", mode:'link', pattern:"*.sqlite3"
// singularity info for this process
if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) {
container "https://depot.galaxyproject.org/singularity/sqlite:3.33.0"
} else {
container "quay.io/biocontainers/sqlite:3.33.0"
}
input:
tuple path (ncbi_tax_dir), val(existing_db) from post_ncbi_tax_prep_ch
output:
path ("${params.ncbi_tax_db}") into post_ncbi_tax_check_ch
script:
// if a local blast_tax_dir is specified, check that it contains the expected files
// and that these are valid SQLite databases
existing_db ?
"""
# validate that a SQLite database is minimally valid
# see: https://stackoverflow.com/questions/3888529/how-to-tell-if-sqlite-database-file-is-valid-or-not
# This sqlite3 command will exit with 0 status (success) if everything looks OK
# and will exit with non-zero exit status otherwise
sqlite3 -batch ${ncbi_tax_dir}/${params.ncbi_tax_db} <<"EOF"
pragma schema_version;