Nextflow patterns
Nextflow patterns
1 Basic Patterns
1.1 Channel duplication
P:需要在兩個(gè)或多個(gè)進(jìn)程中使用相同的通道作為輸入
S:使用into運(yùn)算符創(chuàng)建源通道的兩個(gè)(或更多)副本。然后,使用新通道作為流程的輸入。
代碼:
Channel.fromPath('prots/*_?.fa').into { prot1_ch; prot2_ch }process foo {input: file x from prot1_chscript:"""echo your_command --input $x""" }process bar {input: file x from prot2_chscript:"""your_command --input $x""" }2 Scatter executions
2.1 Process per file path
P:需要為每個(gè)匹配 glob 模式的文件執(zhí)行一個(gè)任務(wù)
S:使用Channel.fromPath方法創(chuàng)建一個(gè)通道,發(fā)出與 glob 模式匹配的所有文件。然后,使用通道作為執(zhí)行任務(wù)的流程的輸入。
代碼:
Channel.fromPath('reads/*_1.fq.gz').set{ samples_ch }process foo {input:file x from samples_chscript:"""your_command --input $x""" }2.2 Process per file chunk
P:需要將一個(gè)或多個(gè)輸入文件拆分為塊并為每個(gè)文件執(zhí)行一項(xiàng)任務(wù)
S:使用splitText運(yùn)算符將文件拆分為給定大小的塊。然后將結(jié)果通道用作執(zhí)行任務(wù)的流程的輸入
代碼:
Channel.fromPath('poem.txt').splitText(by: 5).set{ chunks_ch }process foo {echo trueinput:file x from chunks_chscript:"""rev $x | rev""" }2.3 Process per file pairs
P:需要將文件處理到按對(duì)分組的目錄中
S:使用Channel.fromFilePairs方法創(chuàng)建一個(gè)通道,該通道發(fā)出與 glob 模式匹配的文件對(duì)。該模式必須匹配成對(duì)文件名中的公共前綴。匹配文件作為元組發(fā)出,其中第一個(gè)元素是匹配文件的分組鍵,第二個(gè)元素是文件對(duì)本身。
代碼:
Channel.fromFilePairs('reads/*_{1,2}.fq.gz').set { samples_ch }process foo {input:set sampleId, file(reads) from samples_chscript:"""your_command --sample $sampleId --reads $reads""" }- 自定義分組策略
需要時(shí),可以定義自定義分組策略。一個(gè)常見(jiàn)的用例是對(duì)齊 BAM 文件 ( sample1.bam) 隨附的索引文件。困難在于索引有時(shí)會(huì)被調(diào)用sample1.bai,有時(shí)sample1.bam.bai取決于所使用的軟件。下面的例子可以適應(yīng)這兩種情況。
代碼:
Channel.fromFilePairs('alignment/*.{bam,bai}') { file -> file.name.replaceAll(/.bam|.bai$/,'') }.set { samples_ch }process foo {input:set sampleId, file(bam) from samples_chscript:"""your_command --sample $sampleId --bam ${sampleId}.bam""" }2.4 Process per file range
P:需要在具有共同索引范圍的兩個(gè)或更多系列文件上執(zhí)行任務(wù)
S:使用from方法定義重復(fù)執(zhí)行任務(wù)的范圍,然后將其與map運(yùn)算符鏈接以將每個(gè)索引與相應(yīng)的輸入文件相關(guān)聯(lián)。最后使用結(jié)果通道作為過(guò)程的輸入
代碼:
Channel.from(1..23).map { chr -> tuple("sample$chr", file("/some/path/foo.${chr}.indels.vcf"), file("/other/path/foo.snvs.${chr}.vcf")) }.set { pairs_ch }process foo {tag "$sampleId"input:set sampleId, file(indels), file(snps) from pairs_ch"""echo foo_command --this $indels --that $snps""" }2.5 Process per CSV record
P:需要為一個(gè)或多個(gè) CSV 文件中的每條記錄執(zhí)行一項(xiàng)任務(wù)
S:使用splitCsv運(yùn)算符逐行讀取 CSV 文件,然后使用map運(yùn)算符返回每行所需字段的元組,并使用該file函數(shù)將任何字符串路徑轉(zhuǎn)換為文件路徑對(duì)象。最后使用結(jié)果通道作為過(guò)程的輸入
index.csv
| FC816RLABXX | read/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_1.fq.gz | read/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_2.fq.gz |
| FC812MWABXX | read/110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_1.fq.gz | read110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_2.fq.gz |
| FC81DE8ABXX | read/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_1.fq.gz | read/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_2.fq.gz |
| FC81DB5ABXX | read/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_1.fq.gz | read/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_2.fq.gz |
| FC819P0ABXX | read/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_1.fq.gz | read/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_2.fq.gz |
代碼:
params.index = 'index.csv'Channel.fromPath(params.index).splitCsv(header:true).map{ row-> tuple(row.sampleId, file(row.read1), file(row.read2)) }.set { samples_ch }process foo {input:set sampleId, file(read1), file(read2) from samples_chscript:"""echo your_command --sample $sampleId --reads $read1 $read2""" }2.6 Process per file output
P:工作流中的任務(wù)一次生成兩個(gè)或更多文件。下游任務(wù)需要獨(dú)立處理這些文件中的每一個(gè)
S:使用flatten運(yùn)算符將上游進(jìn)程的輸出轉(zhuǎn)換為單獨(dú)發(fā)送到每個(gè)文件的通道。然后將此通道用作下游過(guò)程的輸入
代碼:
process foo {output:file '*.txt' into foo_chscript:'''echo Hello there! > file1.txtecho What a beautiful day > file2.txtecho I wish you are having fun1 > file3.txt''' }process bar {input:file x from foo_ch.flatten()script:"""cat $x""" }3 Gather results
3.1 Process all outputs altogether
P:需要完全處理上游任務(wù)的所有輸出
S:使用collect運(yùn)算符收集上游任務(wù)產(chǎn)生的所有輸出,并將它們作為唯一輸出發(fā)出。然后使用結(jié)果通道作為過(guò)程的輸入輸入
代碼:
Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }process foo {input:file x from samples_choutput:file 'file.fq' into unzipped_chscript:"""< $x zcat > file.fq""" }process bar {echo trueinput:file '*.fq' from unzipped_ch.collect()"""cat *.fq""" }3.2 Process outputs into groups
P:需要在同一批次中處理文件名中具有匹配鍵的所有文件
S:使用map運(yùn)算符將每個(gè)文件關(guān)聯(lián)一個(gè)從文件名中提取的鍵。然后將結(jié)果通道與groupTuple運(yùn)算符鏈接起來(lái),將所有具有匹配鍵的文件組合在一起。最后使用結(jié)果通道作為過(guò)程的輸入。
代碼:
Channel.fromPath('reads/*').map { file ->def key = file.name.toString().tokenize('_').get(0)return tuple(key, file)}.groupTuple().set{ groups_ch }process foo {input:set key, file(samples) from groups_chscript:"""echo your_command --batch $key --input $samples""" }3.3 Collect outputs into a file
P:需要將上游進(jìn)程生成的所有輸出文件連接到一個(gè)文件中
S:使用collectFile運(yùn)算符將所有輸出文件合并為一個(gè)文件
代碼:
Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }process foo {input:file x from samples_choutput:file 'file.fq' into unzipped_chscript:"""< $x zcat > file.fq""" }unzipped_ch.collectFile().println{ it.text }4 Organize outputs
4.1 Store process outputs
P:需要將一個(gè)或多個(gè)進(jìn)程的輸出存儲(chǔ)到您選擇的目錄結(jié)構(gòu)中
S:使用publishDir指令設(shè)置一個(gè)自定義目錄,在該目錄中需要提供流程輸出
代碼:
params.reads = 'reads/*{1,2}.fq.gz' params.outdir = 'my-results'Channel.fromFilePairs(params.reads).set{ samples_ch }process foo {publishDir "$params.outdir/$sampleId"input:set sampleId, file(samples) from samples_choutput:file '*.fq'script:"""< ${samples[0]} zcat > sample_1.fq< ${samples[1]} zcat > sample_2.fq""" }4.2 Store outputs matching a glob pattern
P:工作流中的任務(wù)會(huì)創(chuàng)建下游任務(wù)所需的許多輸出文件??筛鶕?jù)文件名將其中一些文件存儲(chǔ)到單獨(dú)的目錄中
S:使用兩個(gè)或多個(gè)publishDir指令將輸出文件存儲(chǔ)到單獨(dú)的存儲(chǔ)路徑中。對(duì)于每個(gè)指令,使用選項(xiàng)指定一個(gè)不同的 glob 字符串,pattern僅將與提供的模式匹配的文件存儲(chǔ)到每個(gè)目錄中
代碼:
Channel.fromFilePairs(params.reads, flat: true).set{ samples_ch }process foo {publishDir "$params.outdir/$sampleId/counts", pattern: "*_counts.txt"publishDir "$params.outdir/$sampleId/outlooks", pattern: '*_outlook.txt'publishDir "$params.outdir/$sampleId/", pattern: '*.fq'input:set sampleId, file('sample1.fq.gz'), file('sample2.fq.gz') from samples_choutput:file "*"script:"""< sample1.fq.gz zcat > sample1.fq< sample2.fq.gz zcat > sample2.fqawk '{s++}END{print s/4}' sample1.fq > sample1_counts.txtawk '{s++}END{print s/4}' sample2.fq > sample2_counts.txthead -n 50 sample1.fq > sample1_outlook.txthead -n 50 sample2.fq > sample2_outlook.txt""" }4.3 Rename process outputs
P:需要將進(jìn)程的輸出存儲(chǔ)到一個(gè)目錄中,為文件指定一個(gè)您選擇的名稱
S:publishDir 允許在過(guò)程輸出存儲(chǔ)在所選擇的目錄。指定saveAs參數(shù)為每個(gè)文件提供您選擇的名稱,證明自定義規(guī)則作為閉包(closure)
代碼:
process foo {publishDir 'results', saveAs: { filename -> "foo_$filename" }output:file '*.txt''''touch this.txttouch that.txt''' }- 若保存在子目錄中:
可以使用相同的模式將特定文件存儲(chǔ)在不同的目錄中,具體取決于實(shí)際名稱
代碼:
process foo {publishDir 'results', saveAs: { filename -> filename.endsWith(".zip") ? "zips/$filename" : filename }output:file '*''''touch this.txttouch that.zip''' }5 Other
5.1 Get process work directory
P:需要當(dāng)前任務(wù)工作目錄的顯式路徑
S:使用$PWDBash 變量或pwd命令檢索任務(wù)工作目錄路徑
代碼:
process foo {echo truescript:"""echo foo task path: \$PWD""" }process bar {echo truescript:'''echo bar task path: $PWD''' }注意:$當(dāng)命令腳本包含在雙引號(hào)字符中時(shí),請(qǐng)確保使用轉(zhuǎn)義變量占位符,如上在雙引號(hào)中的$前加上了\來(lái)進(jìn)行了轉(zhuǎn)義
- 使用以下命令為相同的腳本提供一些輸入文件,以防止進(jìn)程被執(zhí)行
5.2 Ignore failing process
P:預(yù)期任務(wù)在特定條件下會(huì)失敗,由此希望忽略失敗并繼續(xù)執(zhí)行工作流中的剩余任務(wù)
S:使用 process指令 errorStrategy 'ignore'忽略錯(cuò)誤條件
代碼:
process foo {errorStrategy 'ignore'script:'''echo This is going to fail!exit 1''' }process bar {script:'''echo OK''' }5.3 Mock dependency
P:需要同步兩個(gè)沒(méi)有直接輸入輸出關(guān)系的進(jìn)程bar的執(zhí)行,以便該進(jìn)程僅在 process 完成后執(zhí)行foo
S:將foo產(chǎn)生標(biāo)志值的通道添加到進(jìn)程的輸出中。然后將此通道用作進(jìn)程的輸入以bar在其他進(jìn)程完成時(shí)觸發(fā)其執(zhí)行
代碼:
Channel.fromPath('.data/reads/*.fq.gz').set{ reads_ch }process foo {output:val true into done_chscript:"""your_command_here""" }process bar {input:val flag from done_chfile fq from reads_chscript:"""other_commad_here --reads $fq""" }6 Advanced patterns
6.1 Conditional resources definition
P:工作流中的任務(wù)需要使用一定量的計(jì)算資源,例如。內(nèi)存取決于一個(gè)或多個(gè)輸入文件的大小或名稱。
S:使用閉包以動(dòng)態(tài)方式聲明資源需求,例如memory,cpus等。閉包使用size流程定義中聲明的輸入的文件屬性(例如等),來(lái)計(jì)算所需的資源量。
代碼:
Channel.fromPath('reads/*_1.fq.gz').set { reads_ch }process foo {memory { reads.size() < 70.KB ? 1.GB : 5.GB }input:file reads from reads_ch"""your_command_here --in $reads""" }6.2 Conditional process executions
P:兩個(gè)不同的任務(wù)需要以互斥的方式執(zhí)行,那么第三個(gè)任務(wù)應(yīng)該對(duì)前一次執(zhí)行的結(jié)果進(jìn)行后處理。
S:使用when語(yǔ)句有條件地執(zhí)行兩個(gè)不同的進(jìn)程。每個(gè)進(jìn)程聲明自己的輸出通道。然后使用mix運(yùn)算符創(chuàng)建一個(gè)新通道,該通道將發(fā)出兩個(gè)進(jìn)程產(chǎn)生的輸出,并將其用作第三個(gè)進(jìn)程的輸入。
代碼:
params.flag = falseprocess foo {output:file 'x.txt' into foo_chwhen:!params.flagscript:'''echo foo > x.txt''' }process bar {output:file 'x.txt' into bar_chwhen:params.flagscript:'''echo bar > x.txt''' }process omega {echo trueinput:file x from foo_ch.mix(bar_ch)script:"""cat $x""" }可根據(jù)flag的不同來(lái)選擇不同的進(jìn)程執(zhí)行,執(zhí)行foo和omega:
nextflow run patterns/conditional-process.nf執(zhí)行bar和omega:
nextflow run patterns/conditional-process.nf --flag結(jié)果:
peng@sin-try2:~/patterns$ nextflow run conditional-process.nf N E X T F L O W ~ version 21.04.0-edge Launching `conditional-process.nf` [soggy_hypatia] - revision: 1b07ba0b38 executor > local (2) [82/ea867d] process > foo [100%] 1 of 1 ? [- ] process > bar - [38/be26fb] process > omega (1) [100%] 1 of 1 ? foopeng@sin-try2:~/patterns$ nextflow run conditional-process.nf --flag N E X T F L O W ~ version 21.04.0-edge Launching `conditional-process.nf` [astonishing_goldwasser] - revision: 1b07ba0b38 executor > local (2) [- ] process > foo - [7f/158b8c] process > bar [100%] 1 of 1 ? [05/b06073] process > omega (1) [100%] 1 of 1 ? bar- 有條件地正常(使用數(shù)據(jù))或作為空通道創(chuàng)建輸入 通道。使用各個(gè)輸入通道的過(guò)程僅在通道被填充時(shí)才會(huì)執(zhí)行。每個(gè)進(jìn)程仍然聲明自己的輸出通道。然后使用mix運(yùn)算符創(chuàng)建一個(gè)新通道,該通道將發(fā)出兩個(gè)進(jìn)程產(chǎn)生的輸出,并將其用作第三個(gè)進(jìn)程的輸入
代碼:
params.flag = false(foo_inch, bar_inch) = ( params.flag? [ Channel.empty(), Channel.from(1,2,3) ]: [ Channel.from(4,5,6), Channel.empty() ] )process foo {input:val(f) from foo_inchoutput:file 'x.txt' into foo_chscript:"""echo $f > x.txt""" }process bar {input:val(b) from bar_inchoutput:file 'x.txt' into bar_chscript:"""echo $b > x.txt""" }process omega {echo trueinput:file x from foo_ch.mix(bar_ch)script:"""cat $x""" }運(yùn)行結(jié)果:
peng@sin-try2:~/patterns$ nextflow run conditional-process2.nf N E X T F L O W ~ version 21.04.0-edge Launching `conditional-process2.nf` [naughty_minsky] - revision: 296937c5d2 executor > local (6) [0b/8183d2] process > foo (3) [100%] 3 of 3 ? [- ] process > bar - [e8/e9334f] process > omega (3) [100%] 3 of 3 ? 654peng@sin-try2:~/patterns$ nextflow run conditional-process2.nf --flag N E X T F L O W ~ version 21.04.0-edge Launching `conditional-process2.nf` [disturbed_goldwasser] - revision: 296937c5d2 executor > local (6) [- ] process > foo - [aa/907692] process > bar (3) [100%] 3 of 3 ? [c1/e15b2f] process > omega (3) [100%] 3 of 3 ? 1236.3 Skip process execution
P:工作流程中有兩個(gè)連續(xù)的任務(wù),當(dāng)指定了可選標(biāo)志時(shí),不應(yīng)執(zhí)行第一個(gè)任務(wù),其輸入由第二個(gè)任務(wù)處理。
S:使用條件表達(dá)式中創(chuàng)建的空通道,在指定可選參數(shù)時(shí)跳過(guò)第一個(gè)執(zhí)行流程。然后將第二進(jìn)程的輸入定義為第一進(jìn)程的輸出(當(dāng)被執(zhí)行時(shí))與輸入信道的mix
代碼:
params.skip = false params.input = "$baseDir/sample.fq.gz"Channel.fromPath(params.input).set{ input_ch }(foo_ch, bar_ch) = ( params.skip? [Channel.empty(), input_ch]: [input_ch, Channel.empty()] )process foo {input:file x from foo_choutput:file('*.fastq') into optional_chscript:"""< $x zcat > ${x.simpleName}.fastq""" }process bar {echo trueinput:file x from bar_ch.mix(optional_ch)"""echo your_command --input $x""" }6.4 Feedback loop☆
P:需要重復(fù)執(zhí)行一個(gè)或多個(gè)任務(wù),使用輸出作為新迭代的輸入,直到達(dá)到特定條件(i=o=i…)
S:使用迭代循環(huán)中最后一個(gè)進(jìn)程的輸出作為第一個(gè)進(jìn)程的輸入。使用Channel.create方法顯式創(chuàng)建輸出通道。然后將過(guò)程輸入定義為初始輸入和過(guò)程輸出的mix,該過(guò)程輸出應(yīng)用于定義終止條件的until運(yùn)算符
代碼;
params.input = 'hello.txt'condition = { it.readLines().size()>3 } feedback_ch = Channel.create() input_ch = Channel.fromPath(params.input).mix( feedback_ch.until(condition) )process foo {input:file x from input_choutput:file 'foo.txt' into foo_chscript:"""cat $x > foo.txt""" }process bar {input:file x from foo_choutput:file 'bar.txt' into feedback_chfile 'bar.txt' into result_chscript:"""cat $x > bar.txtecho World >> bar.txt""" }result_ch.last().println { "Result:\n${it.text.indent(' ')}" }6.5 Optional input
P:一個(gè)或多個(gè)進(jìn)程有一個(gè)可選的輸入文件
S:使用特殊的文件名來(lái)標(biāo)記文件參數(shù)的缺失
代碼:
params.inputs = 'prots/*{1,2,3}.fa' params.filter = 'NO_FILE'prots_ch = Channel.fromPath(params.inputs) opt_file = file(params.filter)process foo {input:file seq from prots_chfile opt from opt_filescript:def filter = opt.name != 'NO_FILE' ? "--filter $opt" : ''"""your_commad --input $seq $filter""" }6.6 Optional output
P:在某些情況下,工作流中的任務(wù)預(yù)計(jì)不會(huì)創(chuàng)建輸出文件
S:將此類輸出聲明為optional文件
代碼:
process foo {output:file 'foo.txt' optional true into foo_chscript:'''your_command''' }6.7 Execute when empty
P:如果通道為空,您需要執(zhí)行一個(gè)過(guò)程
S:使用ifEmpty運(yùn)算符發(fā)出標(biāo)記值以觸發(fā)流程的執(zhí)行
代碼:
process foo {input:val x from ch.ifEmpty { 'EMPTY' }when:x == 'EMPTY'script:'''your_command''' }總結(jié)
以上是生活随笔為你收集整理的Nextflow patterns的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 区分度评估指标-KS
- 下一篇: MC34063在扩展后的降压应用