From 9dde0a1cafa5d9f6204f8ef71fbc9d6bb009eba1 Mon Sep 17 00:00:00 2001 From: Raquel Manzano Date: Tue, 16 Aug 2022 18:57:43 +0100 Subject: [PATCH] Pipeline checkpoint, all working until FASTP step. From now on input DNA/RNA will be separated for their corresponding pre-processing steps --- .gitignore | 4 + conf/slurm.config | 27 +- modules/nf-core/modules/fastqc/main.nf | 16 +- .../modules/gatk4/haplotypecaller/main.nf | 4 +- .../modules/gatk4/haplotypecaller/meta.yml | 4 + .../nf-core/modules/gatk4/mergevcfs/main.nf | 1 + .../nf-core/modules/gatk4/mergevcfs/meta.yml | 5 + .../modules/gatk4/splitncigarreads/main.nf | 48 ++ .../nf-core/modules/samtools/stats/main.nf | 5 +- subworkflows/local/input_check.nf | 3 +- subworkflows/local/prepare_genome.nf | 181 +++---- subworkflows/nf-core/markduplicates.nf | 66 +-- workflows/rnadnavar.nf | 499 +++++++++++++----- 13 files changed, 573 insertions(+), 290 deletions(-) diff --git a/.gitignore b/.gitignore index 5124c9a..967a58f 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,7 @@ results/ testing/ testing* *.pyc +.idea/ +null/ +test/ +res-test \ No newline at end of file diff --git a/conf/slurm.config b/conf/slurm.config index 4684a69..3e7d936 100644 --- a/conf/slurm.config +++ b/conf/slurm.config @@ -3,15 +3,30 @@ singularity { autoMounts = true } process { - withName: 'NFCORE_RNADNAVAR:RNADNAVAR:GATK4_BEDTOINTERVALLIST' - { - ext.args = '--DROP_MISSING_CONTIGS TRUE' - } executor = 'slurm' - clusterOptions = "--account caldas-sl2-cpu" + clusterOptions = "--account caldas-sl2-cpu --partition cclake" + pollInterval = '1 min' + queueStatInterval = '5 min' } params { + config_profile_name = 'Slurm test profile' + config_profile_description = 'Minimal real dataset to check pipeline function' max_time = 12.h max_cpus = 12 - max_memory = 100.GB + max_memory = 120.GB + + // Input data + input = '/rds/project/rds-upenYb9rdtk/Work/rm889/rna_mutect/nextflow/rnadnavar/assets/samplesheet_mytest.csv' + outdir = 'results' + skip_intervallisttools = true + genome = 'GRCh38' + fasta = '/rds/project/rds-upenYb9rdtk/Work/rm889/resources/genome/hg38/chr3.fa' + gtf = '/rds/project/rds-upenYb9rdtk/Work/rm889/resources/Homo_sapiens/NCBI/GRCh38Decoy/Annotation/Genes.gencode/gencode.v33.annotation.chr3.gtf' + known_indels = '/rds/project/rds-upenYb9rdtk/Work/rm889/resources/Homo_sapiens_assembly38.known_indels.vcf.gz' + known_indels_tbi = '/rds/project/rds-upenYb9rdtk/Work/rm889/resources/Homo_sapiens_assembly38.known_indels.vcf.gz.tbi' + pon = '/rds/project/rds-upenYb9rdtk/Work/rm889/resources/1000g_pon.hg38.vcf.gz' + pon_tbi = '/rds/project/rds-upenYb9rdtk/Work/rm889/resources/1000g_pon.hg38.vcf.gz.tbi' + germline_resource = '/rds/project/rds-upenYb9rdtk/Work/rm889/resources/af-only-gnomad.hg38.vcf.gz' + germline_resource_tbi = '/rds/project/rds-upenYb9rdtk/Work/rm889/resources/af-only-gnomad.hg38.vcf.gz.tbi' + nucleotides_per_second = 1000 } diff --git a/modules/nf-core/modules/fastqc/main.nf b/modules/nf-core/modules/fastqc/main.nf index ed6b8c5..f363176 100644 --- a/modules/nf-core/modules/fastqc/main.nf +++ b/modules/nf-core/modules/fastqc/main.nf @@ -24,8 +24,8 @@ process FASTQC { def prefix = task.ext.prefix ?: "${meta.id}" if (meta.single_end) { """ - [ ! -f ${prefix}.fastq.gz ] && ln -s $reads ${prefix}.fastq.gz - fastqc $args --threads $task.cpus ${prefix}.fastq.gz + [ ! -f ${prefix}.bam.gz ] && ln -s $reads ${prefix}.bam.gz + fastqc -f bam $args --threads $task.cpus ${prefix}.bam.gz cat <<-END_VERSIONS > versions.yml "${task.process}": @@ -44,4 +44,16 @@ process FASTQC { END_VERSIONS """ } + + stub: + def prefix = task.ext.prefix ?: "${meta.id}" + """ + touch ${prefix}.html + touch ${prefix}.zip + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + fastqc: \$( fastqc --version | sed -e "s/FastQC v//g" ) + END_VERSIONS + """ } diff --git a/modules/nf-core/modules/gatk4/haplotypecaller/main.nf b/modules/nf-core/modules/gatk4/haplotypecaller/main.nf index 6dd3f69..19cd57b 100644 --- a/modules/nf-core/modules/gatk4/haplotypecaller/main.nf +++ b/modules/nf-core/modules/gatk4/haplotypecaller/main.nf @@ -8,7 +8,7 @@ process GATK4_HAPLOTYPECALLER { 'quay.io/biocontainers/gatk4:4.2.6.1--hdfd78af_0' }" input: - tuple val(meta), path(input), path(input_index), path(intervals) + tuple val(meta), path(input), path(input_index), path(intervals), path(dragstr_model) path fasta path fai path dict @@ -28,6 +28,7 @@ process GATK4_HAPLOTYPECALLER { def prefix = task.ext.prefix ?: "${meta.id}" def dbsnp_command = dbsnp ? "--dbsnp $dbsnp" : "" def interval_command = intervals ? "--intervals $intervals" : "" + def dragstr_command = dragstr_model ? "--dragstr-params-path $dragstr_model" : "" def avail_mem = 3 if (!task.memory) { @@ -42,6 +43,7 @@ process GATK4_HAPLOTYPECALLER { --reference $fasta \\ $dbsnp_command \\ $interval_command \\ + $dragstr_command \\ --tmp-dir . \\ $args diff --git a/modules/nf-core/modules/gatk4/haplotypecaller/meta.yml b/modules/nf-core/modules/gatk4/haplotypecaller/meta.yml index 81851a9..48193d9 100644 --- a/modules/nf-core/modules/gatk4/haplotypecaller/meta.yml +++ b/modules/nf-core/modules/gatk4/haplotypecaller/meta.yml @@ -32,6 +32,10 @@ input: - intervals: type: file description: Bed file with the genomic regions included in the library (optional) + - dragstr_model: + type: file + description: Text file containing the DragSTR model of the used BAM/CRAM file (optional) + pattern: "*.txt" - fasta: type: file description: The reference fasta file diff --git a/modules/nf-core/modules/gatk4/mergevcfs/main.nf b/modules/nf-core/modules/gatk4/mergevcfs/main.nf index 964c1a3..35930a6 100644 --- a/modules/nf-core/modules/gatk4/mergevcfs/main.nf +++ b/modules/nf-core/modules/gatk4/mergevcfs/main.nf @@ -13,6 +13,7 @@ process GATK4_MERGEVCFS { output: tuple val(meta), path('*.vcf.gz'), emit: vcf + tuple val(meta), path("*.tbi") , emit: tbi path "versions.yml" , emit: versions when: diff --git a/modules/nf-core/modules/gatk4/mergevcfs/meta.yml b/modules/nf-core/modules/gatk4/mergevcfs/meta.yml index 8d4123d..3ebce0b 100644 --- a/modules/nf-core/modules/gatk4/mergevcfs/meta.yml +++ b/modules/nf-core/modules/gatk4/mergevcfs/meta.yml @@ -35,6 +35,11 @@ output: type: file description: merged vcf file pattern: "*.vcf.gz" + - tbi: + type: file + description: index files for the merged vcf files + pattern: "*.tbi" + - versions: type: file description: File containing software versions diff --git a/modules/nf-core/modules/gatk4/splitncigarreads/main.nf b/modules/nf-core/modules/gatk4/splitncigarreads/main.nf index 456ec05..9adab27 100644 --- a/modules/nf-core/modules/gatk4/splitncigarreads/main.nf +++ b/modules/nf-core/modules/gatk4/splitncigarreads/main.nf @@ -46,3 +46,51 @@ process GATK4_SPLITNCIGARREADS { END_VERSIONS """ } + +process GATK4_SPLITNCIGARREADS2 { + tag "$meta.id" + label 'process_medium' + + conda (params.enable_conda ? "bioconda::gatk4=4.2.6.1" : null) + container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? + 'https://depot.galaxyproject.org/singularity/gatk4:4.2.6.1--hdfd78af_0': + 'quay.io/biocontainers/gatk4:4.2.6.1--hdfd78af_0' }" + + input: + tuple val(meta), path(bam), path(bai) + path fasta + path fai + path dict + + output: + tuple val(meta), path('*.bam'), emit: bam + path "versions.yml" , emit: versions + + when: + task.ext.when == null || task.ext.when + + script: + def args = task.ext.args ?: '' + def prefix = task.ext.prefix ?: "${meta.id}" + + def avail_mem = 4 + if (!task.memory) { + log.info '[GATK SplitNCigarReads] Available memory not known - defaulting to 4GB. Specify process memory requirements to change this.' + } else { + avail_mem = task.memory.giga + } + """ + gatk --java-options "-Xmx${avail_mem}g" SplitNCigarReads \\ + --input $bam \\ + --output ${prefix}.bam \\ + --reference $fasta \\ + --tmp-dir . \\ + $args + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + gatk4: \$(echo \$(gatk --version 2>&1) | sed 's/^.*(GATK) v//; s/ .*\$//') + END_VERSIONS + """ +} + diff --git a/modules/nf-core/modules/samtools/stats/main.nf b/modules/nf-core/modules/samtools/stats/main.nf index bbdc324..89b92d7 100644 --- a/modules/nf-core/modules/samtools/stats/main.nf +++ b/modules/nf-core/modules/samtools/stats/main.nf @@ -20,6 +20,7 @@ process SAMTOOLS_STATS { script: def args = task.ext.args ?: '' + def prefix = task.ext.prefix ?: "${meta.id}" def reference = fasta ? "--reference ${fasta}" : "" """ samtools \\ @@ -27,7 +28,7 @@ process SAMTOOLS_STATS { --threads ${task.cpus-1} \\ ${reference} \\ ${input} \\ - > ${input}.stats + > ${prefix}.stats cat <<-END_VERSIONS > versions.yml "${task.process}": @@ -38,7 +39,7 @@ process SAMTOOLS_STATS { stub: def prefix = task.ext.prefix ?: "${meta.id}" """ - touch ${input}.stats + touch ${prefix}.stats cat <<-END_VERSIONS > versions.yml "${task.process}": diff --git a/subworkflows/local/input_check.nf b/subworkflows/local/input_check.nf index 8bc94d2..8b6b0b7 100644 --- a/subworkflows/local/input_check.nf +++ b/subworkflows/local/input_check.nf @@ -37,7 +37,8 @@ def create_fastq_channel(LinkedHashMap row) { exit 1, "ERROR: Please check input samplesheet -> Read 1 BAM file do not exist!\n${row.bam}" } file_meta = [ meta, [ file(row.bam) ] ] - file_meta = [ meta, [ file(row.bam), file(row.bai) ] ] +// file_meta = [ meta, [ file(row.bam), file(row.bai) ] ] TODO: do we need this somewhere else apart from fastqc? + meta.single_end = row.single_end.toBoolean() } else { if (!file(row.fastq_1).exists()) diff --git a/subworkflows/local/prepare_genome.nf b/subworkflows/local/prepare_genome.nf index e4760b5..c56ddb7 100644 --- a/subworkflows/local/prepare_genome.nf +++ b/subworkflows/local/prepare_genome.nf @@ -1,39 +1,50 @@ // -// Uncompress and prepare reference genome files +// PREPARE GENOME // -include { GATK4_CREATESEQUENCEDICTIONARY } from '../../modules/nf-core/modules/gatk4/createsequencedictionary/main' //addParams(options: params.genome_options) -include { GFFREAD } from '../../modules/nf-core/modules/gffread/main' //addParams(options: params.gffread_options) -include { GTF2BED } from '../../modules/local/gtf2bed' //addParams(options: params.genome_options) -include { GUNZIP as GUNZIP_FASTA } from '../../modules/nf-core/modules/gunzip/main' //addParams(options: params.genome_options) -include { GUNZIP as GUNZIP_GENE_BED } from '../../modules/nf-core/modules/gunzip/main' //addParams(options: params.genome_options) -include { GUNZIP as GUNZIP_GFF } from '../../modules/nf-core/modules/gunzip/main' //addParams(options: params.genome_options) -include { GUNZIP as GUNZIP_GTF } from '../../modules/nf-core/modules/gunzip/main' //addParams(options: params.genome_options) -include { SAMTOOLS_FAIDX } from '../../modules/nf-core/modules/samtools/faidx/main' //addParams(options: params.genome_options) -include { STAR_GENOMEGENERATE } from '../../modules/nf-core/modules/star/genomegenerate/main' //addParams(options: params.star_index_options) -include { UNTAR as UNTAR_STAR_INDEX } from '../../modules/nf-core/modules/untar/main' //addParams(options: params.star_untar_options) - +// Initialize channels based on params or indices that were just built +// For all modules here: +// A when clause condition is defined in the conf/modules.config to determine if the module should be run +// Condition is based on params.step and params.tools +// If and extra condition exists, it's specified in comments + +include { BWA_INDEX as BWAMEM1_INDEX } from '../../modules/nf-core/modules/bwa/index/main' +include { BWAMEM2_INDEX } from '../../modules/nf-core/modules/bwamem2/index/main' +include { STAR_GENOMEGENERATE } from '../../modules/nf-core/modules/star/genomegenerate/main' //addParams(options: params.star_index_options) +include { DRAGMAP_HASHTABLE } from '../../modules/nf-core/modules/dragmap/hashtable/main' +include { GATK4_CREATESEQUENCEDICTIONARY } from '../../modules/nf-core/modules/gatk4/createsequencedictionary/main' +include { SAMTOOLS_FAIDX } from '../../modules/nf-core/modules/samtools/faidx/main' +include { TABIX_TABIX as TABIX_DBSNP } from '../../modules/nf-core/modules/tabix/tabix/main' +include { TABIX_TABIX as TABIX_GERMLINE_RESOURCE } from '../../modules/nf-core/modules/tabix/tabix/main' +include { TABIX_TABIX as TABIX_KNOWN_INDELS } from '../../modules/nf-core/modules/tabix/tabix/main' +include { TABIX_TABIX as TABIX_KNOWN_SNPS } from '../../modules/nf-core/modules/tabix/tabix/main' +include { TABIX_TABIX as TABIX_PON } from '../../modules/nf-core/modules/tabix/tabix/main' +include { UNZIP as UNZIP_ALLELES } from '../../modules/nf-core/modules/unzip/main' +include { UNZIP as UNZIP_LOCI } from '../../modules/nf-core/modules/unzip/main' +include { UNZIP as UNZIP_GC } from '../../modules/nf-core/modules/unzip/main' +include { UNZIP as UNZIP_RT } from '../../modules/nf-core/modules/unzip/main' workflow PREPARE_GENOME { take: - prepare_tool_indices + dbsnp // channel: [optional] dbsnp + fasta // channel: [mandatory] fasta + fasta_fai // channel: [optional] fasta_fai + germline_resource // channel: [optional] germline_resource + known_indels // channel: [optional] known_indels + known_snps + pon // channel: [optional] pon + main: ch_versions = Channel.empty() - // - // Uncompress genome fasta file if required - // - if (params.fasta.endsWith('.gz')) { - GUNZIP_FASTA ( - Channel.fromPath(params.fasta).map{ it -> [[id:it[0].baseName], it] } - ) - ch_fasta = GUNZIP_FASTA.out.gunzip.map{ meta, fasta -> [fasta] }.collect() - ch_versions = ch_versions.mix(GUNZIP_FASTA.out.versions) - } else { - ch_fasta = Channel.fromPath(params.fasta).collect() - } + BWAMEM1_INDEX(fasta) // If aligner is bwa-mem + BWAMEM2_INDEX(fasta) // If aligner is bwa-mem2 + DRAGMAP_HASHTABLE(fasta) // If aligner is dragmap + + GATK4_CREATESEQUENCEDICTIONARY(fasta) + SAMTOOLS_FAIDX(fasta.map{ it -> [[id:it[0].getName()], it] }) // // Uncompress GTF annotation file or create from GFF3 if required @@ -68,90 +79,66 @@ workflow PREPARE_GENOME { ch_versions = ch_versions.mix(GFFREAD.out.versions) } - // - // Uncompress exon BED annotation file or create from GTF if required + // Uncompress STAR index or generate from scratch if required // - if (params.exon_bed) { - if (params.exon_bed.endsWith('.gz')) { - GUNZIP_GENE_BED ( - Channel.fromPath(params.exon_bed).map{ it -> [[id:it[0].baseName], it] } + ch_star_index = Channel.empty() + if (params.star_index) { + if (params.star_index.endsWith('.tar.gz')) { + UNTAR_STAR_INDEX ( + Channel.fromPath(params.star_index).map{ it -> [[id:it[0].baseName], it] } ) - ch_gene_bed = GUNZIP_GENE_BED.out.gunzip.map{ meta, bed -> [bed] }.collect() - ch_versions = ch_versions.mix(GUNZIP_GENE_BED.out.versions) + ch_star_index = UNTAR_STAR_INDEX.out.untar.map{ meta, star_index -> [star_index] }.collect() + ch_versions = ch_versions.mix(UNTAR_STAR_INDEX.out.versions) } else { - ch_gene_bed = Channel.fromPath(params.exon_bed).collect() + ch_star_index = Channel.fromPath(params.star_index).collect() } - } else { - ch_exon_bed = GTF2BED ( ch_gtf ).bed.collect() - ch_versions = ch_versions.mix(GTF2BED.out.versions) } - - // Index the genome fasta - ch_fasta_fai = Channel.empty() - if (params.fasta_fai) ch_fasta_fai = Channel.fromPath(params.fasta_fai).collect() - if (!params.fasta_fai) { - SAMTOOLS_FAIDX( - ch_fasta.map{ it -> [[id:it[0].getName()], it]} + else { + STAR_GENOMEGENERATE ( + fasta,ch_gtf ) - ch_fasta_fai = SAMTOOLS_FAIDX.out.fai.map{ meta, fai -> [fai] }.collect() - ch_versions = ch_versions.mix(SAMTOOLS_FAIDX.out.versions) + .index + .set { ch_star_index } + ch_versions = ch_versions.mix(STAR_GENOMEGENERATE.out.versions) } - // Create dictionary file for the genome fasta - ch_fasta_dict = Channel.empty() - if (params.dict) ch_fasta_dict = Channel.fromPath(params.dict).collect() - else ch_fasta_dict = GATK4_CREATESEQUENCEDICTIONARY(ch_fasta).dict - // - // Uncompress STAR index or generate from scratch if required - // - ch_star_index = Channel.empty() - if ('star' in prepare_tool_indices) { - if (params.star_index) { - if (params.star_index.endsWith('.tar.gz')) { - UNTAR_STAR_INDEX ( - Channel.fromPath(params.star_index).map{ it -> [[id:it[0].baseName], it] } - ) - ch_star_index = UNTAR_STAR_INDEX.out.untar.map{ meta, star_index -> [star_index] }.collect() - ch_versions = ch_versions.mix(UNTAR_STAR_INDEX.out.versions) - } else { - ch_star_index = Channel.fromPath(params.star_index).collect() - } - } - else { - STAR_GENOMEGENERATE ( - ch_fasta,ch_gtf - ) - .index - .set { ch_star_index } - ch_versions = ch_versions.mix(STAR_GENOMEGENERATE.out.versions) - } + // the following are flattened and mapped in case the user supplies more than one value for the param + // written for KNOWN_INDELS, but preemptively applied to the rest + // [file1,file2] becomes [[meta1,file1],[meta2,file2]] + // outputs are collected to maintain a single channel for relevant TBI files + TABIX_DBSNP(dbsnp.flatten().map{ it -> [[id:it.baseName], it] }) + TABIX_GERMLINE_RESOURCE(germline_resource.flatten().map{ it -> [[id:it.baseName], it] }) + TABIX_KNOWN_SNPS( known_snps.flatten().map{ it -> [[id:it.baseName], it] } ) + TABIX_KNOWN_INDELS( known_indels.flatten().map{ it -> [[id:it.baseName], it] } ) + TABIX_PON(pon.flatten().map{ it -> [[id:it.baseName], it] }) - //if((!ch_star_index) || getIndexVersion(ch_star_index) != '2.7.4a'){ - // ch_star_index = STAR_GENOMEGENERATE(ch_fasta,ch_gtf).index - // ch_versions = ch_versions.mix(STAR_GENOMEGENERATE.out.versions) - //} - } - emit: - fasta = ch_fasta // path: genome.fasta - fai = ch_fasta_fai // path: genome.fasta.fai - dict = ch_fasta_dict // path: genome.fasta.dict - gtf = ch_gtf // path: genome.gtf - exon_bed = ch_exon_bed // path: exon.bed - star_index = ch_star_index // path: star/index/ - versions = ch_versions.ifEmpty(null) // channel: [ versions.yml ] -} + // Gather versions of all tools used + ch_versions = ch_versions.mix(SAMTOOLS_FAIDX.out.versions) + ch_versions = ch_versions.mix(BWAMEM1_INDEX.out.versions) + ch_versions = ch_versions.mix(BWAMEM2_INDEX.out.versions) + ch_versions = ch_versions.mix(GATK4_CREATESEQUENCEDICTIONARY.out.versions) + ch_versions = ch_versions.mix(TABIX_DBSNP.out.versions) + ch_versions = ch_versions.mix(TABIX_GERMLINE_RESOURCE.out.versions) + ch_versions = ch_versions.mix(TABIX_KNOWN_SNPS.out.versions) + ch_versions = ch_versions.mix(TABIX_KNOWN_INDELS.out.versions) + ch_versions = ch_versions.mix(TABIX_PON.out.versions) -def getIndexVersion( index_path ) { - genomeParameters = new File("$index_path/genomeParameters.txt") - if ( genomeParameters.exists() ) { - for(line: genomeParameters.readLines()){ - if(line.startsWith("versionGenome")){ - return line.split("\t")[1].trim() - } - } - } + emit: + bwa = BWAMEM1_INDEX.out.index // path: bwa/* + bwamem2 = BWAMEM2_INDEX.out.index // path: bwamem2/* + hashtable = DRAGMAP_HASHTABLE.out.hashmap // path: dragmap/* + dbsnp_tbi = TABIX_DBSNP.out.tbi.map{ meta, tbi -> [tbi] }.collect() // path: dbsnb.vcf.gz.tbi + dict = GATK4_CREATESEQUENCEDICTIONARY.out.dict // path: genome.fasta.dict + fasta_fai = SAMTOOLS_FAIDX.out.fai.map{ meta, fai -> [fai] } // path: genome.fasta.fai + star_index = ch_star_index // path: star/index/ + gtf = ch_gtf // path: genome.gtf + germline_resource_tbi = TABIX_GERMLINE_RESOURCE.out.tbi.map{ meta, tbi -> [tbi] }.collect() // path: germline_resource.vcf.gz.tbi + known_snps_tbi = TABIX_KNOWN_SNPS.out.tbi.map{ meta, tbi -> [tbi] }.collect() // path: {known_indels*}.vcf.gz.tbi + known_indels_tbi = TABIX_KNOWN_INDELS.out.tbi.map{ meta, tbi -> [tbi] }.collect() // path: {known_indels*}.vcf.gz.tbi + pon_tbi = TABIX_PON.out.tbi.map{ meta, tbi -> [tbi] }.collect() // path: pon.vcf.gz.tbi + versions = ch_versions // channel: [ versions.yml ] } diff --git a/subworkflows/nf-core/markduplicates.nf b/subworkflows/nf-core/markduplicates.nf index 85b724d..765a044 100644 --- a/subworkflows/nf-core/markduplicates.nf +++ b/subworkflows/nf-core/markduplicates.nf @@ -1,54 +1,40 @@ // -// GATK4 MarkDuplicates, index BAM file and run samtools stats, flagstat and idxstats +// MARKDUPLICATES // +// For all modules here: +// A when clause condition is defined in the conf/modules.config to determine if the module should be run -include { BAM_STATS_SAMTOOLS } from './bam_stats_samtools' -include { GATK4_MARKDUPLICATES } from '../../modules/nf-core/modules/gatk4/markduplicates/main' -include { SAMTOOLS_INDEX } from '../../modules/nf-core/modules/samtools/index/main' +include { GATK4_MARKDUPLICATES } from '../../modules/nf-core/modules/gatk4/markduplicates/main' +include { BAM_TO_CRAM } from './bam_to_cram' workflow MARKDUPLICATES { take: - bam // channel: [ val(meta), [ bam ] ] + bam // channel: [mandatory] meta, bam + fasta // channel: [mandatory] fasta + fasta_fai // channel: [mandatory] fasta_fai + intervals_bed_combined // channel: [optional] intervals_bed main: - ch_versions = Channel.empty() + qc_reports = Channel.empty() - GATK4_MARKDUPLICATES ( - bam - ) - ch_versions = ch_versions.mix(GATK4_MARKDUPLICATES.out.versions.first()) + // Run Markupduplicates + GATK4_MARKDUPLICATES(bam) + + // Convert output to cram + BAM_TO_CRAM(GATK4_MARKDUPLICATES.out.bam.join(GATK4_MARKDUPLICATES.out.bai), Channel.empty(), fasta, fasta_fai, intervals_bed_combined) - // - // Index BAM file and run samtools stats, flagstat and idxstats - // - SAMTOOLS_INDEX ( - GATK4_MARKDUPLICATES.out.bam - ) - ch_versions = ch_versions.mix(SAMTOOLS_INDEX.out.versions.first()) - - GATK4_MARKDUPLICATES.out.bam - .join(SAMTOOLS_INDEX.out.bai, by: [0], remainder: true) - .join(SAMTOOLS_INDEX.out.csi, by: [0], remainder: true) - .map{meta, bam, bai, csi -> - if (bai) [meta, bam, bai] - else [meta, bam, csi]} - .set{ch_bam_bai} - - BAM_STATS_SAMTOOLS ( - ch_bam_bai - ) - ch_versions = ch_versions.mix(BAM_STATS_SAMTOOLS.out.versions.first()) + // Gather all reports generated + qc_reports = qc_reports.mix(GATK4_MARKDUPLICATES.out.metrics, + BAM_TO_CRAM.out.qc) + + // Gather versions of all tools used + ch_versions = ch_versions.mix(GATK4_MARKDUPLICATES.out.versions.first()) + ch_versions = ch_versions.mix(BAM_TO_CRAM.out.versions) emit: - bam = GATK4_MARKDUPLICATES.out.bam // channel: [ val(meta), [ bam ] ] - bam_bai = ch_bam_bai // channel: [ val(meta), [ bam ], [bai or csi] ] - metrics = GATK4_MARKDUPLICATES.out.metrics // channel: [ val(meta), [ metrics ] ] - - bai = SAMTOOLS_INDEX.out.bai // channel: [ val(meta), [ bai ] ] - csi = SAMTOOLS_INDEX.out.csi // channel: [ val(meta), [ csi ] ] - stats = BAM_STATS_SAMTOOLS.out.stats // channel: [ val(meta), [ stats ] ] - flagstat = BAM_STATS_SAMTOOLS.out.flagstat // channel: [ val(meta), [ flagstat ] ] - idxstats = BAM_STATS_SAMTOOLS.out.idxstats // channel: [ val(meta), [ idxstats ] ] - versions = ch_versions // channel: [versions.yml] + cram = BAM_TO_CRAM.out.cram_converted + qc = qc_reports + + versions = ch_versions // channel: [ versions.yml ] } diff --git a/workflows/rnadnavar.nf b/workflows/rnadnavar.nf index 11bcb70..14d5a76 100644 --- a/workflows/rnadnavar.nf +++ b/workflows/rnadnavar.nf @@ -16,6 +16,8 @@ def checkPathParamList = [ params.fasta, params.fasta_fai, params.dict, + params.bwa, + params.bwamem2, params.gtf, params.gff, params.dbsnp, @@ -40,20 +42,66 @@ for (param in checkPathParamList) { // Set input, can either be from --input or from automatic retrieval in lib/WorkflowRnadnavar.groovy ch_input_sample = extract_csv(file(params.input, checkIfExists: true)) -if (params.input) { - ch_input = file(params.input) - } -else { - exit 1, - 'Input samplesheet not specified!' - } +// Fails when wrongful extension for intervals file +if (params.wes && !params.step == 'annotate') { + if (params.intervals && !params.intervals.endsWith("bed")) exit 1, "Target file specified with `--intervals` must be in BED format for targeted data" + else log.warn("Intervals file was provided without parameter `--wes`: Pipeline will assume this is Whole-Genome-Sequencing data.") +} else if (params.intervals && !params.intervals.endsWith("bed") && !params.intervals.endsWith("interval_list")) exit 1, "Intervals file must end with .bed or .interval_list" + +if(params.step == 'mapping' && params.aligner.contains("dragmap") && !(params.skip_tools && params.skip_tools.split(',').contains("baserecalibrator"))){ + log.warn("DragMap was specified as aligner. Base recalibration is not contained in --skip_tools. It is recommended to skip baserecalibration when using DragMap\nhttps://gatk.broadinstitute.org/hc/en-us/articles/4407897446939--How-to-Run-germline-single-sample-short-variant-discovery-in-DRAGEN-mode") +} + +// Fails when missing params for STAR if (!params.star_index && !params.gtf && !params.gff) { exit 1, "GTF|GFF3 file is required to build a STAR reference index! Use option --gtf|--gff to provide a GTF|GFF file." } -// Should PoN be mandatory too? + +// Warns when missing files or params for mutect2 +if(params.tools && params.tools.split(',').contains('mutect2')){ + if(!params.pon){ + log.warn("No Panel-of-normal was specified for Mutect2.\nIt is highly recommended to use one: https://gatk.broadinstitute.org/hc/en-us/articles/5358911630107-Mutect2\nFor more information on how to create one: https://gatk.broadinstitute.org/hc/en-us/articles/5358921041947-CreateSomaticPanelOfNormals-BETA-") + } + if(!params.germline_resource){ + log.warn("If Mutect2 is specified without a germline resource, no filtering will be done.\nIt is recommended to use one: https://gatk.broadinstitute.org/hc/en-us/articles/5358911630107-Mutect2") + } + if(params.pon && params.pon.contains("/Homo_sapiens/GATK/GRCh38/Annotation/GATKBundle/1000g_pon.hg38.vcf.gz")){ + log.warn("The default Panel-of-Normals provided by GATK is used for Mutect2.\nIt is highly recommended to generate one from normal samples that are technical similar to the tumor ones.\nFor more information: https://gatk.broadinstitute.org/hc/en-us/articles/360035890631-Panel-of-Normals-PON-") + } +} + +// Fails when missing resources for baserecalibrator +// Warns when missing resources for haplotypecaller +if(!params.dbsnp && !params.known_indels){ + if (params.step in ['mapping', 'markduplicates', 'prepare_recalibration', 'recalibrate'] && (!params.skip_tools || (params.skip_tools && !params.skip_tools.split(',').contains('baserecalibrator')))){ + log.error "Base quality score recalibration requires at least one resource file. Please provide at least one of `--dbsnp` or `--known_indels`\nYou can skip this step in the workflow by adding `--skip_tools baserecalibrator` to the command." + exit 1 + } + if(params.tools && params.tools.split(',').contains('haplotypecaller')){ + log.warn "If Haplotypecaller is specified, without `--dbsnp` or `--known_indels no filtering will be done. For filtering, please provide at least one of `--dbsnp` or `--known_indels`.\nFor more information see FilterVariantTranches (single-sample, default): https://gatk.broadinstitute.org/hc/en-us/articles/5358928898971-FilterVariantTranches\nFor more information see VariantRecalibration (--joint_germline): https://gatk.broadinstitute.org/hc/en-us/articles/5358906115227-VariantRecalibrator\nFor more information on GATK Best practice germline variant calling: https://gatk.broadinstitute.org/hc/en-us/articles/360035535932-Germline-short-variant-discovery-SNPs-Indels-" + + } +} +if (params.joint_germline && (!params.dbsnp || !params.known_indels || !params.known_snps || params.no_intervals)){ + log.warn "If Haplotypecaller is specified, without `--dbsnp`, `--known_snps`, `--known_indels` or the associated resource labels (ie `known_snps_vqsr`), no variant recalibration will be done. For recalibration you must provide all of these resources.\nFor more information see VariantRecalibration: https://gatk.broadinstitute.org/hc/en-us/articles/5358906115227-VariantRecalibrator \nJoint germline variant calling also requires intervals in order to genotype the samples. As a result, if `--no_intervals` is set to `true` the joint germline variant calling will not be performed." +} + +// Fails when missing tools for variant_calling or annotate +if ((params.step == 'variant_calling' || params.step == 'annotate') && !params.tools) { + log.error "Please specify at least one tool when using `--step ${params.step}`.\nhttps://nf-co.re/sarek/parameters#tools" + exit 1 +} + +// Save AWS IGenomes file containing annotation version +def anno_readme = params.genomes[params.genome]?.readme +if (anno_readme && file(anno_readme).exists()) { + file("${params.outdir}/genome/").mkdirs() + file(anno_readme).copyTo("${params.outdir}/genome/") +} + /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -72,36 +120,93 @@ ch_rnadnavar_logo = Channel.fromPath(file("$projectDir/assets/nf-core- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ -// Validate the input samplesheet.csv and prepare input channels -include { INPUT_CHECK } from '../subworkflows/local/input_check' +// Initialize file channels based on params, defined in the params.genomes[params.genome] scope +dbsnp = params.dbsnp ? Channel.fromPath(params.dbsnp).collect() : Channel.value([]) +known_snps = params.known_snps ? Channel.fromPath(params.known_snps).collect() : Channel.value([]) +fasta = params.fasta ? Channel.fromPath(params.fasta).collect() : Channel.empty() +fasta_fai = params.fasta_fai ? Channel.fromPath(params.fasta_fai).collect() : Channel.empty() +germline_resource = params.germline_resource ? Channel.fromPath(params.germline_resource).collect() : Channel.value([]) //Mutec2 does not require a germline resource, so set to optional input +known_indels = params.known_indels ? Channel.fromPath(params.known_indels).collect() : Channel.value([]) +known_snps = params.known_snps ? Channel.fromPath(params.known_snps).collect() : Channel.value([]) +pon = params.pon ? Channel.fromPath(params.pon).collect() : Channel.value([]) //PON is optional for Mutect2 (but highly recommended) + + +// Create samplesheets to restart from different steps +include { MAPPING_CSV } from '../subworkflows/local/mapping_csv' +include { MARKDUPLICATES_CSV } from '../subworkflows/local/markduplicates_csv' +include { PREPARE_RECALIBRATION_CSV } from '../subworkflows/local/prepare_recalibration_csv' +include { RECALIBRATE_CSV } from '../subworkflows/local/recalibrate_csv' +include { VARIANTCALLING_CSV } from '../subworkflows/local/variantcalling_csv' + // Build the genome index and other reference files -include { PREPARE_GENOME } from '../subworkflows/local/prepare_genome' -// Annotate variants using snpEff or VEP or both -include { ANNOTATE } from '../subworkflows/local/annotate' +include { PREPARE_GENOME } from '../subworkflows/local/prepare_genome' +// Build intervals if needed +include { PREPARE_INTERVALS } from '../subworkflows/local/prepare_intervals' -/* -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - IMPORT NF-CORE MODULES/SUBWORKFLOWS -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -*/ +// Convert BAM files to FASTQ files +include { ALIGNMENT_TO_FASTQ as ALIGNMENT_TO_FASTQ_INPUT } from '../subworkflows/nf-core/alignment_to_fastq' +include { ALIGNMENT_TO_FASTQ as ALIGNMENT_TO_FASTQ_UMI } from '../subworkflows/nf-core/alignment_to_fastq' + +// Run FASTQC +include { RUN_FASTQC } from '../subworkflows/nf-core/run_fastqc' + +// TRIM/SPLIT FASTQ Files +include { FASTP } from '../modules/nf-core/modules/fastp/main' + +// Create umi consensus bams from fastq +include { CREATE_UMI_CONSENSUS } from '../subworkflows/nf-core/fgbio_create_umi_consensus/main' + +// Map input reads to reference genome +include { GATK4_MAPPING } from '../subworkflows/nf-core/gatk4/mapping/main' + +// Merge and index BAM files (optional) +include { MERGE_INDEX_BAM } from '../subworkflows/nf-core/merge_index_bam' + +include { SAMTOOLS_CONVERT as SAMTOOLS_CRAMTOBAM } from '../modules/nf-core/modules/samtools/convert/main' +include { SAMTOOLS_CONVERT as SAMTOOLS_CRAMTOBAM_RECAL } from '../modules/nf-core/modules/samtools/convert/main' + +include { SAMTOOLS_CONVERT as SAMTOOLS_BAMTOCRAM } from '../modules/nf-core/modules/samtools/convert/main' +include { SAMTOOLS_CONVERT as SAMTOOLS_BAMTOCRAM_VARIANTCALLING} from '../modules/nf-core/modules/samtools/convert/main' + +// Mark Duplicates (+QC) +include { MARKDUPLICATES } from '../subworkflows/nf-core/gatk4/markduplicates/main' + +// Mark Duplicates SPARK (+QC) +include { MARKDUPLICATES_SPARK } from '../subworkflows/nf-core/gatk4/markduplicates_spark/main' + +// Convert to CRAM (+QC) +include { BAM_TO_CRAM } from '../subworkflows/nf-core/bam_to_cram' + +// QC on CRAM +include { CRAM_QC } from '../subworkflows/nf-core/cram_qc' + +// Create recalibration tables +include { PREPARE_RECALIBRATION } from '../subworkflows/nf-core/gatk4/prepare_recalibration/main' + +// Create recalibration tables SPARK +include { PREPARE_RECALIBRATION_SPARK } from '../subworkflows/nf-core/gatk4/prepare_recalibration_spark/main' + +// Create recalibrated cram files to use for variant calling (+QC) +include { RECALIBRATE } from '../subworkflows/nf-core/gatk4/recalibrate/main' + +// Create recalibrated cram files to use for variant calling (+QC) +include { RECALIBRATE_SPARK } from '../subworkflows/nf-core/gatk4/recalibrate_spark/main' + +// Variant calling on tumor/normal pair TODO: add tumour only for next version +include { PAIR_VARIANT_CALLING } from '../subworkflows/local/pair_variant_calling' + +include { VCF_QC } from '../subworkflows/nf-core/vcf_qc' + +// Annotation +include { ANNOTATE } from '../subworkflows/local/annotate' + +// REPORTING VERSIONS OF SOFTWARE USED +include { CUSTOM_DUMPSOFTWAREVERSIONS } from '../modules/nf-core/modules/custom/dumpsoftwareversions/main' + +// MULTIQC +include { MULTIQC } from '../modules/nf-core/modules/multiqc/main' -// Basic QC -include { FASTQC } from '../modules/nf-core/modules/fastqc/main' -include { MULTIQC } from '../modules/nf-core/modules/multiqc/main' -// Variant Calling: GATK -include { GATK4_BASERECALIBRATOR } from '../modules/nf-core/modules/gatk4/baserecalibrator/main' -include { GATK4_BEDTOINTERVALLIST } from '../modules/nf-core/modules/gatk4/bedtointervallist/main' -include { GATK4_INTERVALLISTTOOLS } from '../modules/nf-core/modules/gatk4/intervallisttools/main' -include { GATK4_HAPLOTYPECALLER } from '../modules/nf-core/modules/gatk4/haplotypecaller/main' -include { GATK4_MERGEVCFS } from '../modules/nf-core/modules/gatk4/mergevcfs/main' -include { GATK4_INDEXFEATUREFILE } from '../modules/nf-core/modules/gatk4/indexfeaturefile/main' -include { GATK4_VARIANTFILTRATION } from '../modules/nf-core/modules/gatk4/variantfiltration/main' -include { SAMTOOLS_INDEX } from '../modules/nf-core/modules/samtools/index/main' -include { TABIX_TABIX as TABIX } from '../modules/nf-core/modules/tabix/tabix/main' - -// Software track -include { CUSTOM_DUMPSOFTWAREVERSIONS } from '../modules/nf-core/modules/custom/dumpsoftwareversions/main' /* ======================================================================================== @@ -110,9 +215,10 @@ include { CUSTOM_DUMPSOFTWAREVERSIONS } from '../modules/nf-core/modules/custom/ */ include { ALIGN_STAR } from '../subworkflows/nf-core/align_star' // Align reads to genome and sort and index the alignment file -include { MARKDUPLICATES } from '../subworkflows/nf-core/markduplicates' // Mark duplicates in the BAM file include { SPLITNCIGAR } from '../subworkflows/nf-core/splitncigar' // Splits reads that contain Ns in their cigar string -include { RECALIBRATE } from '../subworkflows/nf-core/recalibrate' // Estimate and correct systematic bias +include { SPLITNCIGAR2 } from '../subworkflows/nf-core/splitncigar2' // Splits reads that contain Ns in their cigar string - without intervals option +// TODO not needed? +// include { RECALIBRATE } from '../subworkflows/nf-core/recalibrate' // Estimate and correct systematic bias /* ======================================================================================== @@ -125,6 +231,8 @@ def prepareToolIndices = params.aligner def seq_platform = params.seq_platform ? params.seq_platform : [] def seq_center = params.seq_center ? params.seq_center : [] +// Info required for completion email and summary +def multiqc_report = [] /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -132,9 +240,6 @@ def seq_center = params.seq_center ? params.seq_center : [] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ -// Info required for completion email and summary -def multiqc_report = [] - workflow RNADNAVAR { // To gather all QC reports for MultiQC @@ -142,104 +247,185 @@ workflow RNADNAVAR { // To gather used softwares versions for MultiQC ch_versions = Channel.empty() - // - // SUBWORKFLOW: Uncompress and prepare reference genome files - // - PREPARE_GENOME ( - prepareToolIndices - ) - ch_genome_bed = Channel.from([id:'genome.bed']).combine(PREPARE_GENOME.out.exon_bed) + // Build indices if needed + PREPARE_GENOME( + dbsnp, + fasta, + fasta_fai, + germline_resource, + known_indels, + known_snps, + pon) + + // Gather built indices or get them from the params + bwa = params.fasta ? params.bwa ? Channel.fromPath(params.bwa).collect() : PREPARE_GENOME.out.bwa : [] + bwamem2 = params.fasta ? params.bwamem2 ? Channel.fromPath(params.bwamem2).collect() : PREPARE_GENOME.out.bwamem2 : [] + dragmap = params.fasta ? params.dragmap ? Channel.fromPath(params.dragmap).collect() : PREPARE_GENOME.out.hashtable : [] + dict = params.fasta ? params.dict ? Channel.fromPath(params.dict).collect() : PREPARE_GENOME.out.dict : [] + fasta_fai = params.fasta ? params.fasta_fai ? Channel.fromPath(params.fasta_fai).collect() : PREPARE_GENOME.out.fasta_fai : [] + dbsnp_tbi = params.dbsnp ? params.dbsnp_tbi ? Channel.fromPath(params.dbsnp_tbi).collect() : PREPARE_GENOME.out.dbsnp_tbi : Channel.value([]) + germline_resource_tbi = params.germline_resource ? params.germline_resource_tbi ? Channel.fromPath(params.germline_resource_tbi).collect() : PREPARE_GENOME.out.germline_resource_tbi : [] + known_indels_tbi = params.known_indels ? params.known_indels_tbi ? Channel.fromPath(params.known_indels_tbi).collect() : PREPARE_GENOME.out.known_indels_tbi : Channel.value([]) + known_snps_tbi = params.known_snps ? params.known_snps_tbi ? Channel.fromPath(params.known_snps_tbi).collect() : PREPARE_GENOME.out.known_snps_tbi : Channel.value([]) + pon_tbi = params.pon ? params.pon_tbi ? Channel.fromPath(params.pon_tbi).collect() : PREPARE_GENOME.out.pon_tbi : [] + + // Gather index for mapping given the chosen aligner + ch_map_index = params.aligner == "bwa-mem" ? bwa : + params.aligner == "bwa-mem2" ? bwamem2 : + dragmap + + // known_sites is made by grouping both the dbsnp and the known snps/indels resources + // Which can either or both be optional + known_sites_indels = dbsnp.concat(known_indels).collect() + known_sites_indels_tbi = dbsnp_tbi.concat(known_indels_tbi).collect() + + known_sites_snps = dbsnp.concat(known_snps).collect() + known_sites_snps_tbi = dbsnp_tbi.concat(known_snps_tbi).collect() + + // Build intervals if needed + PREPARE_INTERVALS(fasta_fai) + + // Intervals for speed up preprocessing/variant calling by spread/gather + intervals_bed_combined = params.no_intervals ? Channel.value([]) : PREPARE_INTERVALS.out.intervals_bed_combined // [interval.bed] all intervals in one file + intervals_for_preprocessing = params.wes ? intervals_bed_combined : [] // For QC during preprocessing, we don't need any intervals (MOSDEPTH doesn't take them for WGS) + + intervals = PREPARE_INTERVALS.out.intervals_bed // [interval, num_intervals] multiple interval.bed files, divided by useful intervals for scatter/gather + intervals_bed_gz_tbi = PREPARE_INTERVALS.out.intervals_bed_gz_tbi // [interval_bed, tbi, num_intervals] multiple interval.bed.gz/.tbi files, divided by useful intervals for scatter/gather + + // Gather used softwares versions ch_versions = ch_versions.mix(PREPARE_GENOME.out.versions) + ch_versions = ch_versions.mix(PREPARE_INTERVALS.out.versions) + + // PREPROCESSING + + if (params.step == 'mapping') { + + // Figure out if input is bam or fastq + ch_input_sample.branch{ + bam: it[0].data_type == "bam" + fastq: it[0].data_type == "fastq" + }.set{ch_input_sample_type} + + // convert any bam input to fastq + // Fasta are not needed when converting bam to fastq -> [] + ALIGNMENT_TO_FASTQ_INPUT(ch_input_sample_type.bam, []) + + // gather fastq (inputed or converted) + // Theorically this could work on mixed input (fastq for one sample and bam for another) + // But not sure how to handle that with the samplesheet + // Or if we really want users to be able to do that + ch_input_fastq = ch_input_sample_type.fastq.mix(ALIGNMENT_TO_FASTQ_INPUT.out.reads) + + // STEP 0: QC & TRIM + // `--skip_tools fastqc` to skip fastqc + // trim only with `--trim_fastq` + // additional options to be set up + + // QC + if (!(params.skip_tools && params.skip_tools.split(',').contains('fastqc'))) { + RUN_FASTQC(ch_input_fastq) + + ch_reports = ch_reports.mix(RUN_FASTQC.out.fastqc_zip.collect{meta, logs -> logs}) + ch_versions = ch_versions.mix(RUN_FASTQC.out.versions) + } + + // UMI consensus calling + if (params.umi_read_structure) { + CREATE_UMI_CONSENSUS( + ch_input_fastq, + fasta, + ch_map_index, + umi_read_structure, + params.group_by_umi_strategy + ) + + bamtofastq = CREATE_UMI_CONSENSUS.out.consensusbam.map{meta, bam -> [meta,bam,[]]} + + // convert back to fastq for further preprocessing + ALIGNMENT_TO_FASTQ_UMI(bamtofastq, []) + + ch_reads_fastp = ALIGNMENT_TO_FASTQ_UMI.out.reads + + // Gather used softwares versions + ch_versions = ch_versions.mix(ALIGNMENT_TO_FASTQ_UMI.out.versions) + ch_versions = ch_versions.mix(CREATE_UMI_CONSENSUS.out.versions) + } else { + ch_reads_fastp = ch_input_fastq + } + + // Trimming and/or splitting + if (params.trim_fastq || params.split_fastq > 0) { + + save_trimmed_fail = false + save_merged = false + FASTP(ch_reads_fastp, save_trimmed_fail, save_merged) + + ch_reports = ch_reports.mix( + FASTP.out.json.collect{meta, json -> json}, + FASTP.out.html.collect{meta, html -> html} + ) + + if(params.split_fastq){ + ch_reads_to_map = FASTP.out.reads.map{ key, reads -> + + read_files = reads.sort{ a,b -> a.getName().tokenize('.')[0] <=> b.getName().tokenize('.')[0] }.collate(2) + [[ + data_type:key.data_type, + id:key.id, + numLanes:key.numLanes, + patient: key.patient, + read_group:key.read_group, + sample:key.sample, + sex:key.sex, + size:read_files.size(), + status:key.status, + ], + read_files] + }.transpose() + }else{ + ch_reads_to_map = FASTP.out.reads + } + + ch_versions = ch_versions.mix(FASTP.out.versions) + } else { + ch_reads_to_map = ch_reads_fastp + } + + // STEP 1: MAPPING READS TO REFERENCE GENOME + // reads will be sorted + ch_reads_to_map = ch_reads_to_map.map{ meta, reads -> + // update ID when no multiple lanes or splitted fastqs + new_id = meta.size * meta.numLanes == 1 ? meta.sample : meta.id + + [[ + data_type: meta.data_type, + id: new_id, + numLanes: meta.numLanes, + patient: meta.patient, + read_group: meta.read_group, + sample: meta.sample, + sex: meta.sex, + size: meta.size, + status: meta.status, + ], + reads] + } + + // + // Logic to separate DNA from RNA samples, DNA samples will be aligned with bwa, and RNA samples with star + // + ch_reads_to_map.branch{ + normal: it[0].status == 0 + tumor: it[0].status == 1 + rna: it[0].status == 2 + }.set{ch_reads_to_map_status} + + + + + + } - // - // SUBWORKFLOW: Read in samplesheet, validate and stage input files - // - INPUT_CHECK ( - ch_input - ) - ch_versions = ch_versions.mix(INPUT_CHECK.out.versions) - - // - // MODULE: Run FastQC - // - // QC TODO Can I do this for BAM - for now only returning version and not running - FASTQC ( - INPUT_CHECK.out.reads - ) - ch_versions = ch_versions.mix(FASTQC.out.versions.first()) - - CUSTOM_DUMPSOFTWAREVERSIONS ( - ch_versions.unique().collectFile(name: 'collated_versions.yml') - ) - // TODO Concat fastq files if necessary - not added because at the moment I am using bams - // - // MODULE: MultiQC - // - workflow_summary = WorkflowRnadnavar.paramsSummaryMultiqc(workflow, summary_params) - ch_workflow_summary = Channel.value(workflow_summary) - - ch_multiqc_files = Channel.empty() - ch_multiqc_files = ch_multiqc_files.mix(Channel.from(ch_multiqc_config)) - ch_multiqc_files = ch_multiqc_files.mix(ch_multiqc_custom_config.collect().ifEmpty([])) - ch_multiqc_files = ch_multiqc_files.mix(ch_workflow_summary.collectFile(name: 'workflow_summary_mqc.yaml')) - ch_multiqc_files = ch_multiqc_files.mix(CUSTOM_DUMPSOFTWAREVERSIONS.out.mqc_yml.collect()) - ch_multiqc_files = ch_multiqc_files.mix(FASTQC.out.zip.collect{it[1]}.ifEmpty([])) - - MULTIQC ( - ch_multiqc_files.collect() - ) - multiqc_report = MULTIQC.out.report.toList() - ch_versions = ch_versions.mix(MULTIQC.out.versions) - - // - // MODULE: Prepare the interval list from the GTF file using GATK4 BedToIntervalList - // - ch_interval_list = Channel.empty() - GATK4_BEDTOINTERVALLIST( - ch_genome_bed, - PREPARE_GENOME.out.dict - ) - ch_interval_list = GATK4_BEDTOINTERVALLIST.out.interval_list - ch_versions = ch_versions.mix(GATK4_BEDTOINTERVALLIST.out.versions.first().ifEmpty(null)) - - // - // MODULE: Scatter one interval-list into many interval-files using GATK4 IntervalListTools - // - ch_interval_list_split = Channel.empty() - if (!params.skip_intervallisttools) { - GATK4_INTERVALLISTTOOLS( - ch_interval_list - ) - ch_interval_list_split = GATK4_INTERVALLISTTOOLS.out.interval_list.map{ meta, bed -> [bed] }.flatten() - } - else ch_interval_list_split = ch_interval_list - - // - // SUBWORKFLOW: Perform read alignment using STAR aligner - // - ch_genome_bam = Channel.empty() - ch_genome_bam_index = Channel.empty() - ch_samtools_stats = Channel.empty() - ch_samtools_flagstat = Channel.empty() - ch_samtools_idxstats = Channel.empty() - ch_star_multiqc = Channel.empty() - ch_aligner_pca_multiqc = Channel.empty() - ch_aligner_clustering_multiqc = Channel.empty() - - // TODO: MISSING ALIGNMENT HERE - - // SUBWORKFLOW: Mark duplicates with GATK4 - // - ch_genome_bam = INPUT_CHECK.out.reads // TODO: temporary solution to BAM/FASTQ inputs - MARKDUPLICATES ( - ch_genome_bam - ) - ch_genome_bam = MARKDUPLICATES.out.bam_bai - - //Gather QC reports - ch_reports = ch_reports.mix(MARKDUPLICATES.out.stats.collect{it[1]}.ifEmpty([])) - ch_reports = ch_reports.mix(MARKDUPLICATES.out.metrics.collect{it[1]}.ifEmpty([])) - ch_versions = ch_versions.mix(MARKDUPLICATES.out.versions.first().ifEmpty(null)) } /* @@ -301,13 +487,14 @@ def extract_csv(csv_file) { System.exit(1) } } - + // keep count of the number of samples sample_count_all = 0 sample_count_normal = 0 sample_count_tumor = 0 + sample_count_rna = 0 Channel.from(csv_file).splitCsv(header: true) - //Retrieves number of lanes by grouping together by patient and sample and counting how many entries there are for this combination + // Retrieves number of lanes by grouping together by patient and sample and counting how many entries there are for this combination .map{ row -> sample_count_all++ if (!(row.patient && row.sample)){ @@ -333,6 +520,7 @@ def extract_csv(csv_file) { // If no sex specified, sex is not considered // sex is only mandatory for somatic CNV + // TODO remove as this is only specific to sarek, no CN is done in here if (row.sex) meta.sex = row.sex.toString() else meta.sex = 'NA' @@ -341,13 +529,14 @@ def extract_csv(csv_file) { else meta.status = 0 if (meta.status == 0) sample_count_normal++ - else sample_count_tumor++ - + else if (meta.status == 1) sample_count_tumor++ // TODO check if elif is valid in here + else sample_count_rna++ + // TODO: think about what other condition we will have here now // Two checks for ensuring that the pipeline stops with a meaningful error message if // 1. the sample-sheet only contains normal-samples, but some of the requested tools require tumor-samples, and // 2. the sample-sheet only contains tumor-samples, but some of the requested tools require normal-samples. if ((sample_count_normal == sample_count_all) && params.tools) { // In this case, the sample-sheet contains no tumor-samples - def tools_tumor = ['ascat', 'controlfreec', 'mutect2', 'msisensorpro'] + def tools_tumor = ['sage', 'controlfreec', 'mutect2', 'strelka2'] // This will be applied to tumour DNA and tumour RNA def tools_tumor_asked = [] tools_tumor.each{ tool -> if (params.tools.split(',').contains(tool)) tools_tumor_asked.add(tool) @@ -356,8 +545,9 @@ def extract_csv(csv_file) { log.error('The sample-sheet only contains normal-samples, but the following tools, which were requested with "--tools", expect at least one tumor-sample : ' + tools_tumor_asked.join(", ")) System.exit(1) } + // TODO no need to do anything with the germline - can this be removed? } else if ((sample_count_tumor == sample_count_all) && params.tools) { // In this case, the sample-sheet contains no normal/germline-samples - def tools_requiring_normal_samples = ['ascat', 'deepvariant', 'haplotypecaller', 'msisensorpro'] + def tools_requiring_normal_samples = ['ascat', 'deepvariant', 'haplotypecaller'] def requested_tools_requiring_normal_samples = [] tools_requiring_normal_samples.each{ tool_requiring_normal_samples -> if (params.tools.split(',').contains(tool_requiring_normal_samples)) requested_tools_requiring_normal_samples.add(tool_requiring_normal_samples) @@ -491,6 +681,33 @@ def extract_csv(csv_file) { } } } +// Parse first line of a FASTQ file, return the flowcell id and lane number. +def flowcellLaneFromFastq(path) { + // expected format: + // xx:yy:FLOWCELLID:LANE:... (seven fields) + // or + // FLOWCELLID:LANE:xx:... (five fields) + def line + path.withInputStream { + InputStream gzipStream = new java.util.zip.GZIPInputStream(it) + Reader decoder = new InputStreamReader(gzipStream, 'ASCII') + BufferedReader buffered = new BufferedReader(decoder) + line = buffered.readLine() + } + assert line.startsWith('@') + line = line.substring(1) + def fields = line.split(':') + String fcid + + if (fields.size() >= 7) { + // CASAVA 1.8+ format, from https://support.illumina.com/help/BaseSpace_OLH_009008/Content/Source/Informatics/BS/FileFormat_FASTQ-files_swBS.htm + // "@::::::: :::" + fcid = fields[2] + } else if (fields.size() == 5) { + fcid = fields[0] + } + return fcid +}