diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index 9c53149..bb8336a 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -45,13 +45,19 @@ body: label: Workflow Execution description: Where are you running the workflow? options: - - EPI2ME Desktop application - - Command line - - EPI2ME cloud agent + - EPI2ME Desktop (Local) + - EPI2ME Desktop (Cloud) + - Command line (Local) + - Command line (Cluster) - Other (please describe) validations: required: true - + - type: input + id: other-workflow-execution + attributes: + label: Other workflow execution + description: If "Other", please describe + placeholder: Tell us where / how you are running the workflow. - type: markdown attributes: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2f0bb9f..e950f47 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,7 +8,7 @@ repos: always_run: true pass_filenames: false additional_dependencies: - - epi2melabs>=0.0.52 + - epi2melabs==0.0.57 - id: build_models name: build_models entry: datamodel-codegen --strict-nullable --base-class workflow_glue.results_schema_helpers.BaseModel --use-schema-description --disable-timestamp --input results_schema.yml --input-file-type openapi --output bin/workflow_glue/results_schema.py diff --git a/CHANGELOG.md b/CHANGELOG.md index a64fc66..5971c69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [v1.1.2] +### Changed +- Updated Ezcharts to v0.11.2. + ## [v1.1.1] ### Changed - The name of the column with run IDs from `run_ids` to `run_id`. diff --git a/README.md b/README.md index fab3162..87ad2c2 100644 --- a/README.md +++ b/README.md @@ -43,37 +43,64 @@ ARM processor support: True ## Install and run - -These are instructions to install and run the workflow on command line. You can also access the workflow via the [EPI2ME application](https://labs.epi2me.io/downloads/). -The workflow uses [Nextflow](https://www.nextflow.io/) to manage compute and software resources, therefore nextflow will need to be installed before attempting to run the workflow. - -The workflow can currently be run using either [Docker](https://www.docker.com/products/docker-desktop) or -[Singularity](https://docs.sylabs.io/guides/3.0/user-guide/index.html) to provide isolation of -the required software. Both methods are automated out-of-the-box provided -either docker or singularity is installed. This is controlled by the [`-profile`](https://www.nextflow.io/docs/latest/config.html#config-profiles) parameter as exemplified below. - -It is not required to clone or download the git repository in order to run the workflow. -More information on running EPI2ME workflows can be found on our [website](https://labs.epi2me.io/wfindex). - -The following command can be used to obtain the workflow. This will pull the repository in to the assets folder of nextflow and provide a list of all parameters available for the workflow as well as an example command: +These are instructions to install and run the workflow on command line. +You can also access the workflow via the +[EPI2ME Desktop application](https://labs.epi2me.io/downloads/). + +The workflow uses [Nextflow](https://www.nextflow.io/) to manage +compute and software resources, +therefore Nextflow will need to be +installed before attempting to run the workflow. + +The workflow can currently be run using either +[Docker](https://www.docker.com/products/docker-desktop) +or [Singularity](https://docs.sylabs.io/guides/3.0/user-guide/index.html) +to provide isolation of the required software. +Both methods are automated out-of-the-box provided +either Docker or Singularity is installed. +This is controlled by the +[`-profile`](https://www.nextflow.io/docs/latest/config.html#config-profiles) +parameter as exemplified below. + +It is not required to clone or download the git repository +in order to run the workflow. +More information on running EPI2ME workflows can +be found on our [website](https://labs.epi2me.io/wfindex). + +The following command can be used to obtain the workflow. +This will pull the repository in to the assets folder of +Nextflow and provide a list of all parameters +available for the workflow as well as an example command: ``` -nextflow run epi2me-labs/wf-cas9 –help +nextflow run epi2me-labs/wf-cas9 --help ``` -A demo dataset is provided for testing of the workflow. It can be downloaded using: +To update a workflow to the latest version on the command line use +the following command: ``` -wget https://ont-exd-int-s3-euwst1-epi2me-labs.s3.amazonaws.com/wf-cas9/wf-cas9-demo.tar.gz \ - && tar -xvf wf-cas9-demo.tar.gz +nextflow pull epi2me-labs/wf-cas9 ``` -The workflow can be run with the demo data using: + +A demo dataset is provided for testing of the workflow. +It can be downloaded and unpacked using the following commands: +``` +wget https://ont-exd-int-s3-euwst1-epi2me-labs.s3.amazonaws.com/wf-cas9/wf-cas9-demo.tar.gz +tar -xzvf wf-cas9-demo.tar.gz +``` +The workflow can then be run with the downloaded demo data using: ``` nextflow run epi2me-labs/wf-cas9 \ - --fastq wf-cas9-demo/fastq/ \ - --reference_genome wf-cas9-demo/grch38/grch38_chr19_22.fa.gz \ - --targets wf-cas9-demo/targets.bed + --fastq 'wf-cas9-demo/fastq/sample_1' \ + --full_report \ + --reference_genome 'wf-cas9-demo/grch38/grch38_chr19_22.fa.gz' \ + --targets 'wf-cas9-demo/targets.bed' \ + -profile standard ``` -For further information about running a workflow on the cmd line see https://labs.epi2me.io/wfquickstart/ + +For further information about running a workflow on +the command line see https://labs.epi2me.io/wfquickstart/ + @@ -145,13 +172,6 @@ input_reads.fastq ─── input_directory ─── input_directory | threads | integer | Number of CPU threads to use per workflow task. | The total CPU resource used by the workflow is constrained by the executor configuration. | 8 | -### Miscellaneous Options - -| Nextflow parameter name | Type | Description | Help | Default | -|--------------------------|------|-------------|------|---------| -| disable_ping | boolean | Enable to prevent sending a workflow ping. | | False | - - diff --git a/bin/workflow_glue/__init__.py b/bin/workflow_glue/__init__.py index 30febca..3949151 100755 --- a/bin/workflow_glue/__init__.py +++ b/bin/workflow_glue/__init__.py @@ -3,6 +3,7 @@ import glob import importlib import os +import sys from .util import _log_level, get_main_logger # noqa: ABS101 @@ -11,15 +12,17 @@ _package_name = "workflow_glue" -def get_components(): +def get_components(allowed_components=None): """Find a list of workflow command scripts.""" logger = get_main_logger(_package_name) path = os.path.dirname(os.path.abspath(__file__)) - components = list() + components = dict() for fname in glob.glob(os.path.join(path, "*.py")): name = os.path.splitext(os.path.basename(fname))[0] if name in ("__init__", "util"): continue + if allowed_components is not None and name not in allowed_components: + continue # leniently attempt to import module try: @@ -34,7 +37,7 @@ def get_components(): try: req = "main", "argparser" if all(callable(getattr(mod, x)) for x in req): - components.append(name) + components[name] = mod except Exception: pass return components @@ -42,6 +45,8 @@ def get_components(): def cli(): """Run workflow entry points.""" + logger = get_main_logger(_package_name) + logger.info("Bootstrapping CLI.") parser = argparse.ArgumentParser( 'wf-glue', parents=[_log_level()], @@ -56,16 +61,21 @@ def cli(): help='additional help', dest='command') subparsers.required = True - # all component demos, plus some others - components = [ - f'{_package_name}.{comp}' for comp in get_components()] - for module in components: - mod = importlib.import_module(module) + # importing everything can take time, try to shortcut + if len(sys.argv) > 1: + components = get_components(allowed_components=[sys.argv[1]]) + if not sys.argv[1] in components: + logger.warn("Importing all modules, this may take some time.") + components = get_components() + else: + components = get_components() + + # add all module parsers to main CLI + for name, module in components.items(): p = subparsers.add_parser( - module.split(".")[-1], parents=[mod.argparser()]) - p.set_defaults(func=mod.main) + name.split(".")[-1], parents=[module.argparser()]) + p.set_defaults(func=module.main) - logger = get_main_logger(_package_name) args = parser.parse_args() logger.info("Starting entrypoint.") diff --git a/bin/workflow_glue/check_bam_headers_in_dir.py b/bin/workflow_glue/check_bam_headers_in_dir.py index d582de1..199e056 100755 --- a/bin/workflow_glue/check_bam_headers_in_dir.py +++ b/bin/workflow_glue/check_bam_headers_in_dir.py @@ -8,11 +8,6 @@ from .util import get_named_logger, wf_parser # noqa: ABS101 -def get_sq_lines(xam_file): - """Extract the `@SQ` lines from the header of a XAM file.""" - return pysam.AlignmentFile(xam_file, check_sq=False).header["SQ"] - - def main(args): """Run the entry point.""" logger = get_named_logger("checkBamHdr") @@ -27,10 +22,26 @@ def main(args): # Set `is_unaligned` accordingly. If there are mixed headers (either with some files # containing `@SQ` lines and some not or with different files containing different # `@SQ` lines), set `mixed_headers` to `True`. + # Also check if there is the SO line, to validate whether the file is (un)sorted. first_sq_lines = None mixed_headers = False + sorted_xam = False for xam_file in target_files: - sq_lines = get_sq_lines(xam_file) + # get the `@SQ` and `@HD` lines in the header + with pysam.AlignmentFile(xam_file, check_sq=False) as f: + # compare only the SN/LN/M5 elements of SQ to avoid labelling XAM with + # same reference but different SQ.UR as mixed_header (see CW-4842) + sq_lines = [{ + "SN": sq["SN"], + "LN": sq["LN"], + "M5": sq.get("M5"), + } for sq in f.header.get("SQ", [])] + hd_lines = f.header.get("HD") + # Check if it is sorted. + # When there is more than one BAM, merging/sorting + # will happen regardless of this flag. + if hd_lines is not None and hd_lines.get('SO') == 'coordinate': + sorted_xam = True if first_sq_lines is None: # this is the first file first_sq_lines = sq_lines @@ -46,13 +57,15 @@ def main(args): # write `is_unaligned` and `mixed_headers` out so that they can be set as env. # variables sys.stdout.write( - f"IS_UNALIGNED={int(is_unaligned)};MIXED_HEADERS={int(mixed_headers)}" + f"IS_UNALIGNED={int(is_unaligned)};" + + f"MIXED_HEADERS={int(mixed_headers)};" + + f"IS_SORTED={int(sorted_xam)}" ) logger.info(f"Checked (u)BAM headers in '{args.input_path}'.") def argparser(): """Argument parser for entrypoint.""" - parser = wf_parser("check_bam_headers") + parser = wf_parser("check_bam_headers_in_dir") parser.add_argument("input_path", type=Path, help="Path to target directory") return parser diff --git a/bin/workflow_glue/check_sample_sheet.py b/bin/workflow_glue/check_sample_sheet.py index 62e3483..8fec3f0 100755 --- a/bin/workflow_glue/check_sample_sheet.py +++ b/bin/workflow_glue/check_sample_sheet.py @@ -38,6 +38,7 @@ def main(args): barcodes = [] aliases = [] sample_types = [] + analysis_groups = [] allowed_sample_types = [ "test_sample", "positive_control", "negative_control", "no_template_control" ] @@ -49,6 +50,21 @@ def main(args): try: encoding = determine_codec(args.sample_sheet) with open(args.sample_sheet, "r", encoding=encoding) as f: + try: + # Excel files don't throw any error until here + csv.Sniffer().sniff(f.readline()) + f.seek(0) # return to initial position again + except Exception as e: + # Excel fails with UniCode error + sys.stdout.write( + "The sample sheet doesn't seem to be a CSV file.\n" + "The sample sheet has to be a CSV file.\n" + "Please verify that the sample sheet is a CSV file.\n" + f"Parsing error: {e}" + ) + + sys.exit() + csv_reader = csv.DictReader(f) n_row = 0 for row in csv_reader: @@ -76,6 +92,10 @@ def main(args): sample_types.append(row["type"]) except KeyError: pass + try: + analysis_groups.append(row["analysis_group"]) + except KeyError: + pass except Exception as e: sys.stdout.write(f"Parsing error: {e}") sys.exit() @@ -121,6 +141,14 @@ def main(args): sys.stdout.write( f"Sample sheet requires at least 1 of {required_type}") sys.exit() + if analysis_groups: + # if there was a "analysis_group" column, make sure it had values for all + # samples + if not all(analysis_groups): + sys.stdout.write( + "if an 'analysis_group' column exists, it needs values in each row" + ) + sys.exit() logger.info(f"Checked sample sheet {args.sample_sheet}.") diff --git a/bin/workflow_glue/check_xam_index.py b/bin/workflow_glue/check_xam_index.py new file mode 100755 index 0000000..f9f631e --- /dev/null +++ b/bin/workflow_glue/check_xam_index.py @@ -0,0 +1,43 @@ +"""Validate a single (u)BAM file index.""" + +from pathlib import Path +import sys + +import pysam + +from .util import get_named_logger, wf_parser # noqa: ABS101 + + +def validate_xam_index(xam_file): + """Use fetch to validate the index. + + Invalid indexes will fail the call with a ValueError: + ValueError: fetch called on bamfile without index + """ + with pysam.AlignmentFile(xam_file, check_sq=False) as alignments: + try: + alignments.fetch() + has_valid_index = True + except ValueError: + has_valid_index = False + return has_valid_index + + +def main(args): + """Run the entry point.""" + logger = get_named_logger("checkBamIdx") + + # Check if a XAM has a valid index + has_valid_index = validate_xam_index(args.input_xam) + # write `has_valid_index` out so that they can be set as env. + sys.stdout.write( + f"HAS_VALID_INDEX={int(has_valid_index)}" + ) + logger.info(f"Checked (u)BAM index for: '{args.input_xam}'.") + + +def argparser(): + """Argument parser for entrypoint.""" + parser = wf_parser("check_xam_index") + parser.add_argument("input_xam", type=Path, help="Path to target XAM") + return parser diff --git a/bin/workflow_glue/configure_igv.py b/bin/workflow_glue/configure_igv.py new file mode 100755 index 0000000..db158a4 --- /dev/null +++ b/bin/workflow_glue/configure_igv.py @@ -0,0 +1,205 @@ +"""Create an IGV config file.""" + +from itertools import zip_longest +import json +import sys + +from .util import get_named_logger, wf_parser # noqa: ABS101 + + +def parse_fnames(fofn): + """Parse list with filenames and return them grouped as ref-, XAM-, or VCF-related. + + :param fofn: File with list of file names (one per line) + :return: dict of reference-related filenames (with keys 'ref', 'fai', and '.gzi' and + `None` as default values); lists of XAM- and VCF-related filenames + """ + ref_extensions = [".fasta", ".fasta.gz", ".fa", ".fa.gz", ".fna", ".fna.gz"] + ref_dict = {} + xams = [] + xam_indices = [] + vcfs = [] + vcf_indices = [] + with open(fofn, "r") as f: + for line in f: + fname = line.strip() + if any(fname.endswith(ext) for ext in ref_extensions): + ref_dict["ref"] = fname + elif fname.endswith(".fai"): + ref_dict["fai"] = fname + elif fname.endswith(".gzi"): + ref_dict["gzi"] = fname + elif fname.endswith(".bam") or fname.endswith(".cram"): + xams.append(fname) + elif fname.endswith(".bai") or fname.endswith(".crai"): + xam_indices.append(fname) + elif fname.endswith(".vcf") or fname.endswith(".vcf.gz"): + vcfs.append(fname) + elif fname.endswith(".csi") or fname.endswith(".tbi"): + vcf_indices.append(fname) + # do some sanity checks + if "ref" not in ref_dict: + raise ValueError( + "No reference file (i.e. file ending in one of " + f"{ref_extensions} was found)." + ) + ref = ref_dict["ref"] + if (gzi := ref_dict.get("gzi")) is not None: + # since we got a '.gzi' index, make sure that the reference is actually + # compressed + if not ref_dict["ref"].endswith(".gz"): + raise ValueError( + f"Found GZI reference index '{gzi}', but the reference file " + f"'{ref}' appears not to be compressed." + ) + if xam_indices: + if len(xams) != len(xam_indices): + raise ValueError("Got different number of XAM and XAM index files.") + if vcf_indices: + if len(vcfs) != len(vcf_indices): + raise ValueError("Got different number of VCF and VCF index files.") + if xams and vcfs: + if len(xams) != len(vcfs): + raise ValueError("Got different number of XAM and VCF files.") + # if we got XAM or VCF indices, pair them up with their corresponding files (and + # otherwise with `None`) + xams_with_indices = zip_longest(xams, xam_indices) + vcfs_with_indices = zip_longest(vcfs, vcf_indices) + return ref_dict, xams_with_indices, vcfs_with_indices + + +def get_reference_options(ref, fai=None, gzi=None): + """Create dict with IGV reference options. + + :param ref: reference file name + :param fai: name reference `.fai` index file + :param gzi: name of `.gzi` index file for a compressed reference + :return: dict with reference options + """ + # initialise the options dict and add the index attributes later + ref_opts = { + "id": "ref", + "name": "ref", + "wholeGenomeView": False, + "fastaURL": ref, + } + if fai is not None: + ref_opts["indexURL"] = fai + if gzi is not None: + ref_opts["compressedIndexURL"] = gzi + return ref_opts + + +def get_alignment_track(xam, xai=None, extra_opts=None): + """Create dict with options for IGV alignment track. + + :param xam: name of XAM file to be displayed + :param xai: name of XAM index file + :param extra_opts: dict of extra options for the alignment track + :return: dict with alignment track options + """ + alignment_track_dict = { + "name": xam, + "type": "alignment", + "format": xam.split(".")[-1], + "url": xam, + } + # add the XAM index if present + if xai is not None: + alignment_track_dict["indexURL"] = xai + alignment_track_dict.update(extra_opts or {}) + return alignment_track_dict + + +def get_variant_track(vcf, index=None, extra_opts=None): + """Create dict with options for IGV variant track. + + :param vcf: name of VCF file to be displayed + :param index: name of VCF index file (ending in `.csi` or `.tbi`) + :param extra_opts: dict of extra options for the variant track + :return: dict with variant track options + """ + variant_track_dict = { + "name": vcf, + "type": "variant", + "format": "vcf", + "url": vcf, + } + # add the VCF index if we got an index extension + if index is not None: + variant_track_dict["indexURL"] = index + variant_track_dict.update(extra_opts or {}) + return variant_track_dict + + +def main(args): + """Run the entry point.""" + logger = get_named_logger("configIGV") + + # parse the FOFN + ref_dict, xams_with_indices, vcfs_with_indices = parse_fnames(args.fofn) + + # initialise the IGV options dict with the reference options + json_dict = {"reference": get_reference_options(**ref_dict)} + + # if we got JSON files with extra options for the alignment / variant tracks, read + # them + extra_alignment_opts = {} + if args.extra_alignment_opts is not None: + with open(args.extra_alignment_opts, "r") as f: + extra_alignment_opts = json.load(f) + extra_variant_opts = {} + if args.extra_variant_opts is not None: + with open(args.extra_variant_opts, "r") as f: + extra_variant_opts = json.load(f) + + # now add the alignment and variant tracks + json_dict["tracks"] = [] + # we use `zip_longest` to make sure that variant and alignment tracks from the same + # sample are added after each other + for (vcf, vcf_index), (xam, xam_index) in zip_longest( + vcfs_with_indices, xams_with_indices, fillvalue=(None, None) + ): + if vcf is not None: + # add a variant track for the VCF + json_dict["tracks"].append( + get_variant_track(vcf, vcf_index, extra_variant_opts) + ) + if xam is not None: + # add an alignment track for the XAM + json_dict["tracks"].append( + get_alignment_track(xam, xam_index, extra_alignment_opts) + ) + + if args.locus is not None: + json_dict["locus"] = args.locus + + json.dump(json_dict, sys.stdout, indent=4) + + logger.info("Printed IGV config JSON to STDOUT.") + + +def argparser(): + """Argument parser for entrypoint.""" + parser = wf_parser("configure_igv") + parser.add_argument( + "--fofn", + required=True, + help=( + "File with list of names of reference / XAM / VCF files and indices " + "(one filename per line)" + ), + ) + parser.add_argument( + "--locus", + help="Locus string to set initial genomic coordinates to display in IGV", + ) + parser.add_argument( + "--extra-alignment-opts", + help="JSON file with extra options for alignment tracks", + ) + parser.add_argument( + "--extra-variant-opts", + help="JSON file with extra options for variant tracks", + ) + return parser diff --git a/bin/workflow_glue/get_max_depth_locus.py b/bin/workflow_glue/get_max_depth_locus.py new file mode 100755 index 0000000..b873ea9 --- /dev/null +++ b/bin/workflow_glue/get_max_depth_locus.py @@ -0,0 +1,61 @@ +"""Find max depth window in a `mosdepth` regions BED file and write as locus string.""" + +from pathlib import Path +import sys + +import pandas as pd + +from .util import get_named_logger, wf_parser # noqa: ABS101 + + +def main(args): + """Run the entry point.""" + logger = get_named_logger("getMaxDepth") + + # read the regions BED file + df = pd.read_csv( + args.depths_bed, sep="\t", header=None, names=["ref", "start", "end", "depth"] + ) + + # get the window with the largest depth + ref, start, end, depth = df.loc[df["depth"].idxmax()] + + # get the length of the reference of that window + ref_length = df.query("ref == @ref")["end"].iloc[-1] + + # show the whole reference in case it's shorter than the desired locus size + if ref_length < args.locus_size: + start = 1 + end = ref_length + else: + # otherwise, show a region of the desired size around the window + half_size = args.locus_size // 2 + mid = (start + end) // 2 + start = mid - half_size + end = mid + half_size + # check if the region starts below `1` or ends beyond the end of the reference + if start < 1: + start = 1 + end = args.locus_size + if end > ref_length: + start = ref_length - args.locus_size + end = ref_length + + # write depth and locus string + sys.stdout.write(f"{depth}\t{ref}:{start}-{end}") + + logger.info("Wrote locus with maximum depth to STDOUT.") + + +def argparser(): + """Argument parser for entrypoint.""" + parser = wf_parser("get_max_depth_locus") + parser.add_argument( + "depths_bed", + type=Path, + help="path to mosdepth regions depth file (can be compressed)", + ) + parser.add_argument( + "locus_size", type=int, help="size of the locus in basepairs (e.g. '2000')" + ) + return parser diff --git a/bin/workflow_glue/report.py b/bin/workflow_glue/report.py index eba92da..19b5f6f 100755 --- a/bin/workflow_glue/report.py +++ b/bin/workflow_glue/report.py @@ -1,5 +1,6 @@ #!/usr/bin/env python """Create workflow report.""" +from math import pi from pathlib import Path from dominate.tags import h6, p @@ -10,15 +11,13 @@ from ezcharts.components.reports.labs import LabsReport from ezcharts.components.theme import LAB_head_resources from ezcharts.layout.snippets import DataTable, Grid, Tabs -from ezcharts.plots import AxisLabelFormatter, util +from ezcharts.plots import util from ezcharts.util import get_named_logger import pandas as pd from .util import wf_parser # noqa: ABS101 # Setup simple globals -WORKFLOW_NAME = 'wf-cas9' -REPORT_TITLE = f'{WORKFLOW_NAME} report' Colors = util.Colors @@ -57,6 +56,9 @@ def argparser(): parser.add_argument( "--off_target_hotspots", required=False, default=None, type=Path, help="Tiled background coverage") + parser.add_argument( + "--wf_version", default='unknown', + help="version of the executed workflow") return parser @@ -90,23 +92,16 @@ def target_table_and_plots(report, target_coverages, target_summaries): plot = ezc.lineplot( df_target, + title=target, x=chrom, y='coverage', hue='strand', - markers=False) - - fmt = AxisLabelFormatter(sci_notation=False) - - for s in plot.series: - s.showSymbol = False # Add this to ezcharts (markers) - - plot.xAxis.axisLabel = dict(formatter=fmt) + marker=False) - plot.xAxis.min = df_target[chrom].min() - plot.title = dict(text=target) - plot.legend = dict( - orient='horizontal', top=30, icon='rect') - plot.xAxis.axisLabel.rotate = 30 + plot._fig.x_range.start = df_target[chrom].min() + plot._fig.xaxis.major_label_orientation = 35 * (pi / 180) + plot._fig.xaxis.major_label_standoff = 5 + plot._fig.add_layout(plot._fig.legend[0], 'right') EZChart(plot, theme='epi2melabs', height='250px') @@ -206,12 +201,11 @@ def tiled_coverage_hist(report, tiled_coverage_tsv): with Grid(columns=n_columns): for i, (sample_id, df) in enumerate(df_all.groupby('sample_id')): plot = ezc.histplot( - df, bins=5, hue='target_status', stat='proportion') - plot.xAxis.name = 'coverage' - plot.yAxis.name = 'Proportion of reads' - plot.title = dict(text=sample_id) - plot.xAxis.axisLabel = dict(rotate=30) - plot.legend = dict(orient='horizontal', top=30) + df, title=sample_id, bins=5, hue='target_status', stat='proportion') + plot._fig.xaxis.axis_label = 'coverage' + plot._fig.yaxis.axis_label = 'Proportion of reads' + plot._fig.xaxis.major_label_orientation = 30 * (pi / 180) + # legend not working EZChart(plot, theme='epi2melabs', height='250px') @@ -254,8 +248,8 @@ def main(args): logger.info('Building report') report = LabsReport( - REPORT_TITLE, WORKFLOW_NAME, args.params, args.versions, - head_resources=[*LAB_head_resources]) + 'Workflow Cas9', 'wf-cas9', args.params, args.versions, + args.wf_version, head_resources=[*LAB_head_resources]) with report.add_section('Introduction', 'Intro'): p(''' @@ -276,8 +270,6 @@ def main(args): coverage_summary_table(report, args.coverage_summary) target_table_and_plots(report, args.target_coverage, args.target_summary) - # plot_target_coverage(report, args.target_coverage) - # make_target_summary_table(report, args.target_summary) tiled_coverage_hist(report, args.tile_coverage) diff --git a/docs/04_install_and_run.md b/docs/04_install_and_run.md index 142407e..4d78e50 100644 --- a/docs/04_install_and_run.md +++ b/docs/04_install_and_run.md @@ -1,31 +1,57 @@ - -These are instructions to install and run the workflow on command line. You can also access the workflow via the [EPI2ME application](https://labs.epi2me.io/downloads/). -The workflow uses [Nextflow](https://www.nextflow.io/) to manage compute and software resources, therefore nextflow will need to be installed before attempting to run the workflow. +These are instructions to install and run the workflow on command line. +You can also access the workflow via the +[EPI2ME Desktop application](https://labs.epi2me.io/downloads/). -The workflow can currently be run using either [Docker](https://www.docker.com/products/docker-desktop) or -[Singularity](https://docs.sylabs.io/guides/3.0/user-guide/index.html) to provide isolation of -the required software. Both methods are automated out-of-the-box provided -either docker or singularity is installed. This is controlled by the [`-profile`](https://www.nextflow.io/docs/latest/config.html#config-profiles) parameter as exemplified below. +The workflow uses [Nextflow](https://www.nextflow.io/) to manage +compute and software resources, +therefore Nextflow will need to be +installed before attempting to run the workflow. -It is not required to clone or download the git repository in order to run the workflow. -More information on running EPI2ME workflows can be found on our [website](https://labs.epi2me.io/wfindex). +The workflow can currently be run using either +[Docker](https://www.docker.com/products/docker-desktop) +or [Singularity](https://docs.sylabs.io/guides/3.0/user-guide/index.html) +to provide isolation of the required software. +Both methods are automated out-of-the-box provided +either Docker or Singularity is installed. +This is controlled by the +[`-profile`](https://www.nextflow.io/docs/latest/config.html#config-profiles) +parameter as exemplified below. -The following command can be used to obtain the workflow. This will pull the repository in to the assets folder of nextflow and provide a list of all parameters available for the workflow as well as an example command: +It is not required to clone or download the git repository +in order to run the workflow. +More information on running EPI2ME workflows can +be found on our [website](https://labs.epi2me.io/wfindex). +The following command can be used to obtain the workflow. +This will pull the repository in to the assets folder of +Nextflow and provide a list of all parameters +available for the workflow as well as an example command: + +``` +nextflow run epi2me-labs/wf-cas9 --help ``` -nextflow run epi2me-labs/wf-cas9 –help +To update a workflow to the latest version on the command line use +the following command: ``` -A demo dataset is provided for testing of the workflow. It can be downloaded using: +nextflow pull epi2me-labs/wf-cas9 ``` -wget https://ont-exd-int-s3-euwst1-epi2me-labs.s3.amazonaws.com/wf-cas9/wf-cas9-demo.tar.gz \ - && tar -xvf wf-cas9-demo.tar.gz + +A demo dataset is provided for testing of the workflow. +It can be downloaded and unpacked using the following commands: ``` -The workflow can be run with the demo data using: +wget https://ont-exd-int-s3-euwst1-epi2me-labs.s3.amazonaws.com/wf-cas9/wf-cas9-demo.tar.gz +tar -xzvf wf-cas9-demo.tar.gz +``` +The workflow can then be run with the downloaded demo data using: ``` nextflow run epi2me-labs/wf-cas9 \ - --fastq wf-cas9-demo/fastq/ \ - --reference_genome wf-cas9-demo/grch38/grch38_chr19_22.fa.gz \ - --targets wf-cas9-demo/targets.bed + --fastq 'wf-cas9-demo/fastq/sample_1' \ + --full_report \ + --reference_genome 'wf-cas9-demo/grch38/grch38_chr19_22.fa.gz' \ + --targets 'wf-cas9-demo/targets.bed' \ + -profile standard ``` -For further information about running a workflow on the cmd line see https://labs.epi2me.io/wfquickstart/ \ No newline at end of file + +For further information about running a workflow on +the command line see https://labs.epi2me.io/wfquickstart/ diff --git a/docs/06_input_parameters.md b/docs/06_input_parameters.md index 5ef5445..88ad7c4 100644 --- a/docs/06_input_parameters.md +++ b/docs/06_input_parameters.md @@ -31,10 +31,3 @@ | threads | integer | Number of CPU threads to use per workflow task. | The total CPU resource used by the workflow is constrained by the executor configuration. | 8 | -### Miscellaneous Options - -| Nextflow parameter name | Type | Description | Help | Default | -|--------------------------|------|-------------|------|---------| -| disable_ping | boolean | Enable to prevent sending a workflow ping. | | False | - - diff --git a/lib/common.nf b/lib/common.nf new file mode 100644 index 0000000..3a8568d --- /dev/null +++ b/lib/common.nf @@ -0,0 +1,57 @@ +import groovy.json.JsonBuilder + +process getParams { + label "wf_common" + cpus 1 + memory "2 GB" + output: + path "params.json" + script: + def paramsJSON = new JsonBuilder(params).toPrettyString() + """ + # Output nextflow params object to JSON + echo '$paramsJSON' > params.json + """ +} + +process configure_igv { + publishDir "${params.out_dir}/", mode: 'copy', pattern: 'igv.json', enabled: params.containsKey("igv") && params.igv + label "wf_common" + cpus 1 + memory "2 GB" + input: + // the python script will work out what to do with all the files based on their + // extensions + path "file-names.txt" + val locus_str + val aln_extra_opts + val var_extra_opts + output: path "igv.json" + script: + // the locus argument just makes sure that the initial view in IGV shows something + // interesting + String locus_arg = locus_str ? "--locus $locus_str" : "" + // extra options for alignment tracks + def aln_opts_json_str = \ + aln_extra_opts ? new JsonBuilder(aln_extra_opts).toPrettyString() : "" + String aln_extra_opts_arg = \ + aln_extra_opts ? "--extra-alignment-opts extra-aln-opts.json" : "" + // extra options for variant tracks + def var_opts_json_str = \ + var_extra_opts ? new JsonBuilder(var_extra_opts).toPrettyString() : "" + String var_extra_opts_arg = \ + var_extra_opts ? "--extra-vcf-opts extra-var-opts.json" : "" + """ + # write out JSON files with extra options for the alignment and variant tracks + echo '$aln_opts_json_str' > extra-aln-opts.json + echo '$var_opts_json_str' > extra-var-opts.json + + workflow-glue configure_igv \ + --fofn file-names.txt \ + $locus_arg \ + $aln_extra_opts_arg \ + $var_extra_opts_arg \ + > igv.json + """ +} + diff --git a/lib/ingress.nf b/lib/ingress.nf index 0e492f4..2931357 100644 --- a/lib/ingress.nf +++ b/lib/ingress.nf @@ -24,29 +24,42 @@ def is_target_file(Path file, List extensions) { /** - * Take a channel of the shape `[meta, reads, path-to-stats-dir | null]` and extract the - * run IDs from the `run_ids` file in the stats directory into the metamap. If the path - * to the stats dir is `null`, add an empty list. + * Take a channel of the shape `[meta, reads, path-to-stats-dir | null]` (or + * `[meta, [reads, index], path-to-stats-dir | null]` in the case of XAM) and extract the + * run IDs and basecall model, from the `run_ids` and `basecaller` files in the stats + * directory, into the metamap. If the path to the stats dir is `null`, add an empty list. * * @param ch: input channel of shape `[meta, reads, path-to-stats-dir | null]` - * @return: channel with a list of run IDs added to the metamap + * @return: channel with lists of run IDs and basecall models added to the metamap */ -def add_run_IDs_to_meta(ch) { +def add_run_IDs_and_basecall_models_to_meta(ch, boolean allow_multiple_basecall_models) { // HashSet for all observed run_ids Set ingressed_run_ids = new HashSet() // extract run_ids from fastcat stats / bamstats results and add to metadata as well // as `ingressed_run_ids` ch = ch | map { meta, reads, stats -> - ArrayList run_ids = [] if (stats) { run_ids = stats.resolve("run_ids").splitText().collect { it.strip() } ingressed_run_ids += run_ids + + basecall_models = \ + stats.resolve("basecallers").splitText().collect { it.strip() } + // check if we got more than one basecall model and set reads + stats to + // `null` for that sample unless `allow_multiple_basecall_models` + if ((basecall_models.size() > 1) && !allow_multiple_basecall_models) { + log.warn "Found multiple basecall models for sample " + \ + "'$meta.alias': ${basecall_models.join(", ")}. The sample's " + \ + "reads were discarded." + reads = reads instanceof List ? [null, null] : null + stats = null + } + // `meta + [...]` returns a new map which is handy to avoid any + // modifying-maps-in-closures weirdness + // See https://github.com/nextflow-io/nextflow/issues/2660 + meta = meta + [run_ids: run_ids, basecall_models: basecall_models] } - // `meta + [...]` returns a new map which is handy to avoid any - // modifying-maps-in-closures weirdness - // See https://github.com/nextflow-io/nextflow/issues/2660 - [meta + [run_ids: run_ids], reads, stats] + [meta, reads, stats] } // put run_ids somewhere global for trivial access later // bit grim but decouples ingress metadata from workflow main.nf @@ -58,6 +71,53 @@ def add_run_IDs_to_meta(ch) { } +/** + * Take a channel of the shape `[meta, reads, path-to-stats-dir | null]` and do the + * following: + * - For `fastcat`, extract the number of reads from the `n_seqs` file. + * - For `bamstats`, extract the number of primary alignments and unmapped reads from + * the `bamstats.flagstat.tsv` file. + * Then, add add these metrics to the meta map. If the path to the stats dir is `null`, + * set the values to 0 when adding them. + * + * @param ch: input channel of shape `[meta, reads, path-to-stats-dir | null]` + * @return: channel with a list of number of reads added to the metamap + */ +def add_number_of_reads_to_meta(ch, String input_type_format) { + // extract reads from fastcat stats / bamstats results and add to metadata + ch = ch | map { meta, reads, stats -> + // Check that stats directory is present. + if (stats) { + if (input_type_format == "fastq") { + // Stats from fastcat + Integer n_seqs = stats.resolve("n_seqs").splitText()[0] as Integer + // `meta + [...]` returns a new map which is handy to avoid any + // modifying-maps-in-closures weirdness + // See https://github.com/nextflow-io/nextflow/issues/2660 + [meta + [n_seqs: n_seqs], reads, stats] + } else { + // or bamstats + ArrayList stats_csv = stats.resolve("bamstats.flagstat.tsv").splitCsv(header: true, sep:'\t') + // get primary alignments and unmapped and sum them + Integer n_primary = stats_csv["primary"].collect{it as Integer}.sum() + Integer n_unmapped = stats_csv["unmapped"].collect{it as Integer}.sum() + // `meta + [...]` returns a new map which is handy to avoid any + // modifying-maps-in-closures weirdness + // See https://github.com/nextflow-io/nextflow/issues/2660 + [meta + [n_primary: n_primary, n_unmapped: n_unmapped], reads, stats] + } + } else { + // return defaults if stats is not there + if (input_type_format == "fastq") { + [meta + [n_seqs: null], reads, stats] + } else { + [meta + [n_primary: null, n_unmapped: null], reads, stats] + } + } + } + return ch +} + /** * Take a map of input arguments, find valid FASTQ inputs, and return a channel * with elements of `[metamap, seqs.fastq.gz | null, path-to-fastcat-stats | null]`. @@ -76,6 +136,10 @@ def add_run_IDs_to_meta(ch) { * - "fastcat_extra_args": string with extra arguments to pass to `fastcat` * - "required_sample_types": list of required sample types in the sample sheet * - "watch_path": boolean whether to use `watchPath` and run in streaming mode + * - "fastq_chunk": null or a number of reads to place into chunked FASTQ files + * - "allow_multiple_basecall_models": emit data of samples that had more than one + * basecall model; if this is `false`, such samples will be emitted as `[meta, null, + * null]` * @return: channel of `[Map(alias, barcode, type, ...), Path|null, Path|null]`. * The first element is a map with metadata, the second is the path to the * `.fastq.gz` file with the (potentially concatenated) sequences and the third is @@ -86,7 +150,14 @@ def add_run_IDs_to_meta(ch) { def fastq_ingress(Map arguments) { // check arguments - Map margs = parse_arguments(arguments, ["fastcat_extra_args": ""]) + Map margs = parse_arguments( + "fastq_ingress", arguments, + [ + "fastcat_extra_args": "", + "fastq_chunk": null, + ] + ) + margs["fastq_chunk"] ?= 0 // cant pass null through channel ArrayList fq_extensions = [".fastq", ".fastq.gz", ".fq", ".fq.gz"] @@ -96,20 +167,55 @@ def fastq_ingress(Map arguments) def ch_result if (margs.stats) { // run fastcat regardless of input type - ch_result = fastcat(input.files.mix(input.dirs), margs["fastcat_extra_args"]) + ch_result = fastcat(input.files.mix(input.dirs), margs, "FASTQ") } else { // run `fastcat` only on directories and rename / compress single files - ch_result = fastcat(input.dirs, margs["fastcat_extra_args"]) - | mix( - input.files - | move_or_compress_fq_file + ch_dir = fastcat(input.dirs, margs, "FASTQ") + .map { meta, path, stats -> [meta, path] } + def ch_file + if (margs["fastq_chunk"] > 0) { + ch_file = split_fq_file(input.files, margs["fastq_chunk"]) + } else { + ch_file = move_or_compress_fq_file(input.files) + } + ch_result = ch_dir + | mix(ch_file) | map { meta, path -> [meta, path, null] } - ) } - // add sample sheet entries without barcode dirs to the results channel and extract - // the run IDs into the metamaps before returning - ch_result = ch_result.mix(input.missing.map { [*it, null] }) - return add_run_IDs_to_meta(ch_result) + // TODO: xam_ingress mixes in a .no_files channel here. Do we need to do the same? + + // The above may have returned a channel with multiple fastqs if chunking + // is enabled. Flatten this and add a groupKey to meta information which + // states the number of sibling files. This can be later used as the key + // for .groupTuple() on a channel in order to get all results for a sample + // We don't decorate "alias" with a count because that messes up downstream + // serialisation. + // Mix in the missing files from the sample sheet + // Add in a unique key for every emission + def ch_spread_result = ch_result + .mix (input.missing.map { meta, files -> [meta, files, null] }) + .map { meta, files, stats -> + // new `arity: '1..*'` would be nice here + files = files instanceof List ? files : [files] + def new_keys = [ + "group_key": groupKey(meta["alias"], files.size()), + "n_fastq": files.size()] + def grp_index = (0.. + def new_keys = [ + "group_index": "${meta["alias"]}_${grp_i}"] + [meta + new_keys, files, stats] + } + + // add number of reads, run IDs, and basecall models to meta + def ch_final = add_number_of_reads_to_meta(ch_spread_result, "fastq") + ch_final = add_run_IDs_and_basecall_models_to_meta( + ch_final, margs.allow_multiple_basecall_models + ) + return ch_final } @@ -118,7 +224,8 @@ def fastq_ingress(Map arguments) * with elements of `[metamap, reads.bam | null, path-to-bamstats-results | null]`. * The second item is `null` for sample sheet entries without a matching barcode * directory or samples containing only uBAM files when `keep_unaligned` is `false`. - * The last item is `null` if `bamstats` was not run (it is only run when `stats: true`). + * The last item is `null` if `bamstats` was not run (it is only run when `stats: + * true`). * * @param arguments: map with arguments containing * - "input": path to either: (i) input (u)BAM file, (ii) top-level directory @@ -129,6 +236,9 @@ def fastq_ingress(Map arguments) * - "analyse_unclassified": boolean whether to keep unclassified reads * - "stats": boolean whether to run `bamstats` * - "keep_unaligned": boolean whether to include uBAM files + * - "return_fastq": boolean whether to convert to FASTQ (this will always run + * `fastcat`) + * - "fastcat_extra_args": string with extra arguments to pass to `fastcat` * - "required_sample_types": list of required sample types in the sample sheet * - "watch_path": boolean whether to use `watchPath` and run in streaming mode * @return: channel of `[Map(alias, barcode, type, ...), Path|null, Path|null]`. @@ -142,7 +252,16 @@ def fastq_ingress(Map arguments) def xam_ingress(Map arguments) { // check arguments - Map margs = parse_arguments(arguments, ["keep_unaligned": false]) + Map margs = parse_arguments( + "xam_ingress", arguments, + [ + "keep_unaligned": false, + "return_fastq": false, + "fastcat_extra_args": "", + "fastq_chunk": null, + ] + ) + margs["fastq_chunk"] ?= 0 // cant pass null through channel // we only accept BAM or uBAM for now (i.e. no SAM or CRAM) ArrayList xam_extensions = [".bam", ".ubam"] @@ -153,11 +272,33 @@ def xam_ingress(Map arguments) ch_result = input.dirs | map { meta, path -> [meta, get_target_files_in_dir(path, xam_extensions)] } | mix(input.files) + | map{ + // If there is more than one BAM in each folder we ignore + // the indices. For single BAM we add it as a string to the + // metadata for later use. If then the BAM returns as position + // sorted, the index will be used. + meta, paths -> + boolean is_array = paths instanceof ArrayList + String src_xam + String src_xai + // Using `.uri` or `.Uri()` leads to S3 paths to be prefixed with `s3:///` + // instead of `s3://`, causing the workflow to not find the index file. + // `.toUriString()` returns the correct path. + if (!is_array){ + src_xam = paths.toUriString() + def xai = file(paths.toUriString() + ".bai") + if (xai.exists()){ + src_xai = xai.toUriString() + } + } + [meta + [src_xam: src_xam, src_xai: src_xai], paths] + } | checkBamHeaders - | map { meta, paths, is_unaligned_env, mixed_headers_env -> + | map { meta, paths, is_unaligned_env, mixed_headers_env, is_sorted_env -> // convert the env. variables from strings ('0' or '1') into bools boolean is_unaligned = is_unaligned_env as int as boolean boolean mixed_headers = mixed_headers_env as int as boolean + boolean is_sorted = is_sorted_env as int as boolean // throw an error if there was a sample with mixed headers if (mixed_headers) { error "Found mixed headers in (u)BAM files of sample '${meta.alias}'." @@ -165,7 +306,7 @@ def xam_ingress(Map arguments) // add `is_unaligned` to the metamap (note the use of `+` to create a copy of // `meta` to avoid modifying every item in the channel; // https://github.com/nextflow-io/nextflow/issues/2660) - [meta + [is_unaligned: is_unaligned], paths] + [meta + [is_unaligned: is_unaligned, is_sorted: is_sorted], paths] } | branch { meta, paths -> // set `paths` to `null` for uBAM samples if unallowed (they will be added to @@ -176,13 +317,12 @@ def xam_ingress(Map arguments) } // get the number of files (`paths` can be a list, a single path, or `null`) int n_files = paths instanceof List ? paths.size() : (paths ? 1 : 0) - // Preparations finished; we can do the branching now. There will be 3 branches + // Preparations finished; we can do the branching now. There will be 5 branches // depending on the number of files per sample and whether the reads are already // aligned: - // * no_op_needed: no need to do anything; just add to the final results channel - // downstream - // - no files - // - a single unaligned file + // * no files: no need to do anything + // * indexed: a single sorted and indexed BAM file. Index will be validated. + // * to_index: a single sorted, but not indexed, BAM file // * to_catsort: `samtools cat` into `samtools sort` // - a single aligned file // - more than one unaligned file @@ -191,26 +331,135 @@ def xam_ingress(Map arguments) // open file descriptors) // * to_merge: flatMap > sort > group > merge // - between 1 and `N_OPEN_FILES_LIMIT` aligned files - no_op_needed: (n_files == 0) || (n_files == 1 && meta["is_unaligned"]) + no_files: n_files == 0 + indexed: \ + n_files == 1 && (meta["is_unaligned"] || meta["is_sorted"]) && meta["src_xai"] + to_index: \ + n_files == 1 && (meta["is_unaligned"] || meta["is_sorted"]) && !meta["src_xai"] to_catsort: \ (n_files == 1) || (n_files > N_OPEN_FILES_LIMIT) || meta["is_unaligned"] to_merge: true } + if (margs["return_fastq"]) { + // only run samtools fastq on samples with at least one file + ch_to_fastq = ch_result.indexed.mix( + ch_result.to_index, + ch_result.to_merge, + ch_result.to_catsort + ) + // TODO: this is largely similar to fastq_ingress, should be refactored + + // input.missing: sample sheet entries without barcode dirs + def ch_spread_result = input.missing + .mix(ch_result.no_files) // TODO: we don't have this in fastq_ingress? + .map { meta, files -> [meta, files, null] } + .mix( + fastcat(ch_to_fastq, margs, "BAM") + ) + .map { meta, files, stats -> + // new `arity: '1..*'` would be nice here + files = files instanceof List ? files : [files] + def new_keys = [ + "group_key": groupKey(meta["alias"], files.size()), + "n_fastq": files.size()] + def grp_index = (0.. + def new_keys = [ + "group_index": "${meta["alias"]}_${grp_i}"] + [meta + new_keys, files, stats] + } + .map { meta, path, stats -> + [meta.findAll { it.key !in ['is_sorted', 'src_xam', 'src_xai'] }, path, stats] + } + + // add number of reads, run IDs, and basecall models to meta + def ch_final = add_number_of_reads_to_meta(ch_spread_result, "fastq") + ch_final = add_run_IDs_and_basecall_models_to_meta( + ch_final, margs.allow_multiple_basecall_models + ) + return ch_final + } + // deal with samples with few-enough files for `samtools merge` first ch_merged = ch_result.to_merge | flatMap { meta, paths -> paths.collect { [meta, it] } } | sortBam | groupTuple | mergeBams + | map{ + meta, bam, bai -> + [meta + [src_xam: null, src_xai: null], bam, bai] + } // now handle samples with too many files for `samtools merge` ch_catsorted = ch_result.to_catsort | catSortBams + | map{ + meta, bam, bai -> + [meta + [src_xam: null, src_xai: null], bam, bai] + } + + // Validate the index of the input BAM. + // If the input BAM index is invalid, regenerate it. + // First separate the BAM from the null input channels. + ch_to_validate = ch_result.indexed + | map{ + meta, paths -> + def bai = paths && meta.src_xai ? file(meta.src_xai) : null + [meta, paths, bai] + } + | branch { + meta, paths, bai -> + to_validate: paths && bai + no_op_needed: true + } + + // Validate non-null files with index + ch_validated = ch_to_validate.to_validate + | validateIndex + | branch { + meta, bam, bai, has_valid_index_env -> + boolean has_valid_index = has_valid_index_env as int as boolean + // Split if it is a valid index + valid_idx: has_valid_index + return [meta, bam, bai] + invalid_idx: true + return [meta, bam] + } + + // Create channel for no_op needed (null channels and valid indexes) + ch_no_op = ch_validated.valid_idx + | mix(ch_to_validate.no_op_needed) + + // Re-index sorted-not-indexed BAM file + ch_indexed = ch_result.to_index + | mix( ch_validated.invalid_idx ) + | samtools_index + | map{ + meta, bam, bai -> + [meta + [src_xai: null], bam, bai] + } - ch_result = input.missing + // Add extra null for the missing index to input.missing + // as well as the missing metadata. + // input.missing: sample sheet entries without barcode dirs + ch_missing = input.missing | mix( - ch_result.no_op_needed, + ch_result.no_files, + ) + | map{ + meta, paths -> + [meta + [src_xam: null, src_xai: null, is_sorted: false], paths, null] + } + + // Combine all possible inputs + ch_result = ch_missing | mix( + ch_no_op, + ch_indexed, ch_merged, ch_catsorted, ) @@ -218,20 +467,142 @@ def xam_ingress(Map arguments) // run `bamstats` if requested if (margs["stats"]) { // branch and run `bamstats` only on the non-`null` paths - ch_result = ch_result.branch { meta, path -> + ch_result = ch_result.branch { meta, path, index -> has_reads: path is_null: true } - ch_bamstats = bamstats(ch_result.has_reads) - ch_result = add_run_IDs_to_meta(ch_bamstats) | mix(ch_result.is_null) + ch_bamstats = bamstats(ch_result.has_reads, margs) + + // the channel comes from xam_ingress also have the BAM index in it. + // Handle this by placing them in a nested array, maintaining the structure + // from fastq_ingress. We do not use variable name as assigning variable + // name with a tuple not matching (e.g. meta, bam, bai, stats <- [meta, bam, stats] ) + // causes the workflow to crash. + ch_result = ch_bamstats + | map{ + it[3] ? [it[0], [it[1], it[2]], it[3]] : it + } + | map{ + it.flatten() + } + | mix( + ch_result.is_null.map{it + [null]} + ) } else { // add `null` instead of path to `bamstats` results dir - ch_result = ch_result | map { meta, bam -> [meta, bam, null] } + ch_result = ch_result | map { meta, bam, bai -> [meta, bam, bai, null] } + } + + // Remove metadata that are unnecessary downstream: + // meta.src_xai: not needed, as it will be part of the channel as a file + // meta.is_sorted: if data are aligned, they will also be sorted/indexed + // + // The output meta can contain the following flags: + // [ + // barcode: always present + // type: always present + // run_id: always present, but can be empty (i.e. `[]`) + // alias: always present + // n_primary: always present, but can be `null` + // n_unmapped: always present, but can be `null` + // is_unaligned: present if there is a (u)BAM file + // ] + // also, add number of reads, run IDs, and basecall models to meta + ch_result = add_number_of_reads_to_meta( + ch_result + | map{ + meta, bam, bai, stats -> + [meta.findAll { it.key !in ['is_sorted'] }, [bam, bai], stats] + }, + "xam" + ) + ch_result = add_run_IDs_and_basecall_models_to_meta( + ch_result, margs.allow_multiple_basecall_models + ) + | map{ + it.flatten() + } + // Final check to ensure that src_xam/src_xai is not an s3 + // path. If so, drop it. We check src_xam also for src_xai + // as, the latter is irrelevant if the former is in s3. + | map{ + meta, bam, bai, stats -> + def xam = meta.src_xam + def xai = meta.src_xai + if (meta.src_xam){ + xam = meta.src_xam.startsWith('s3://') ? null : meta.src_xam + xai = meta.src_xam.startsWith('s3://') ? null : meta.src_xai + } + [ meta + [src_xam: xam, src_xai: xai], bam, bai, stats ] } + return ch_result } +process fastcat { + label "ingress" + label "wf_common" + cpus 4 + memory "2 GB" + input: + tuple val(meta), path(input_src, stageAs: "input_src") + val fcargs + val src + output: + tuple val(meta), + path("fastq_chunks/*.fastq.gz"), // TODO: change this to use new arity: '1..*' + path("fastcat_stats") + script: + Integer lines_per_chunk = fcargs["fastq_chunk"] != 0 ? fcargs["fastq_chunk"] * 4 : null + def input_src = src == "FASTQ" + ? "input_src" + : """<( + samtools cat -b <(find . -name 'input_src*') | \ + samtools fastq - -n -T '*' -o - -0 - + )""" + def stats_args = fcargs["per_read_stats"] ? "-r >(bgzip -c > fastcat_stats/per-read-stats.tsv.gz)" : "" + """ + mkdir fastcat_stats + mkdir fastq_chunks + + # Save file as compressed fastq + fastcat \ + -s ${meta["alias"]} \ + -f fastcat_stats/per-file-stats.tsv \ + -i fastcat_stats/per-file-runids.tsv \ + -l fastcat_stats/per-file-basecallers.tsv \ + --histograms histograms \ + $stats_args \ + ${fcargs["fastcat_extra_args"]} \ + $input_src \ + | if [ "${fcargs["fastq_chunk"]}" = "0" ]; then + bgzip -@ $task.cpus > fastq_chunks/seqs.fastq.gz + else + split -l $lines_per_chunk -d --additional-suffix=.fastq.gz --filter='bgzip -@ $task.cpus > \$FILE' - fastq_chunks/seqs_; + fi + + mv histograms/* fastcat_stats + + # get n_seqs from per-file stats - need to sum them up + awk 'NR==1{for (i=1; i<=NF; i++) {ix[\$i] = i}} NR>1 {c+=\$ix["n_seqs"]} END{print c}' \ + fastcat_stats/per-file-stats.tsv > fastcat_stats/n_seqs + # get unique run IDs (we add `-F '\\t'` as `awk` uses any stretch of whitespace + # as field delimiter per default and thus ignores empty columns) + awk -F '\\t' ' + NR==1 {for (i=1; i<=NF; i++) {ix[\$i] = i}} + # only print run_id if present + NR>1 && \$ix["run_id"] != "" {print \$ix["run_id"]} + ' fastcat_stats/per-file-runids.tsv | sort | uniq > fastcat_stats/run_ids + # get unique basecall models + awk -F '\\t' ' + NR==1 {for (i=1; i<=NF; i++) {ix[\$i] = i}} + # only print basecall model if present + NR>1 && \$ix["basecaller"] != "" {print \$ix["basecaller"]} + ' fastcat_stats/per-file-basecallers.tsv | sort | uniq > fastcat_stats/basecallers + """ +} + process checkBamHeaders { label "ingress" label "wf_common" @@ -239,13 +610,12 @@ process checkBamHeaders { memory "2 GB" input: tuple val(meta), path("input_dir/reads*.bam") output: - // set the two env variables by `eval`-ing the output of the python script - // checking the XAM headers tuple( val(meta), path("input_dir/reads*.bam", includeInputs: true), env(IS_UNALIGNED), env(MIXED_HEADERS), + env(IS_SORTED), ) script: """ @@ -255,32 +625,60 @@ process checkBamHeaders { } +process validateIndex { + label "ingress" + label "wf_common" + cpus 1 + memory "2 GB" + input: tuple val(meta), path("reads.bam"), path("reads.bam.bai") + output: + // set the two env variables by `eval`-ing the output of the python script + // checking the XAM headers + tuple( + val(meta), + path("reads.bam", includeInputs: true), + path("reads.bam.bai", includeInputs: true), + env(HAS_VALID_INDEX) + ) + script: + """ + workflow-glue check_xam_index reads.bam > env.vars + source env.vars + """ +} + + +// Sort FOFN for samtools merge to ensure samtools sort breaks ties deterministically. +// Uses -c to ensure matching RG.IDs across multiple inputs are not unnecessarily modified to avoid collisions. process mergeBams { label "ingress" label "wf_common" cpus 3 memory "4 GB" - input: tuple val(meta), path("input_bams/reads*.bam") - output: tuple val(meta), path("reads.bam") - shell: + input: tuple val(meta), path("input_bams/reads*.bam"), path("input_bams/reads*.bam.bai") + output: tuple val(meta), path("reads.bam"), path("reads.bam.bai") + script: + def merge_threads = Math.max(1, task.cpus - 1) """ - samtools merge -@ ${task.cpus - 1} \ - -b <(find input_bams -name 'reads*.bam') -o reads.bam + samtools merge -@ ${merge_threads} \ + -c -b <(find input_bams -name 'reads*.bam' | sort) --write-index -o reads.bam##idx##reads.bam.bai """ } +// Sort FOFN for samtools cat to ensure samtools sort breaks ties deterministically. process catSortBams { label "ingress" label "wf_common" cpus 4 memory "4 GB" input: tuple val(meta), path("input_bams/reads*.bam") - output: tuple val(meta), path("reads.bam") + output: tuple val(meta), path("reads.bam"), path("reads.bam.bai") script: + def sort_threads = Math.max(1, task.cpus - 2) """ - samtools cat -b <(find input_bams -name 'reads*.bam') \ - | samtools sort - -@ ${task.cpus - 2} -o reads.bam + samtools cat -b <(find input_bams -name 'reads*.bam' | sort) \ + | samtools sort - -@ ${sort_threads} --write-index -o reads.bam##idx##reads.bam.bai """ } @@ -291,10 +689,11 @@ process sortBam { cpus 3 memory "4 GB" input: tuple val(meta), path("reads.bam") - output: tuple val(meta), path("reads.sorted.bam") + output: tuple val(meta), path("reads.sorted.bam"), path("reads.sorted.bam.bai") script: + def sort_threads = Math.max(1, task.cpus - 1) """ - samtools sort -@ ${task.cpus - 1} reads.bam -o reads.sorted.bam + samtools sort --write-index -@ ${sort_threads} reads.bam -o reads.sorted.bam##idx##reads.sorted.bam.bai """ } @@ -305,28 +704,44 @@ process bamstats { cpus 3 memory "4 GB" input: - tuple val(meta), path("reads.bam") + tuple val(meta), path("reads.bam"), path("reads.bam.bai") + val bsargs output: tuple val(meta), path("reads.bam"), + path("reads.bam.bai"), path("bamstats_results") script: def bamstats_threads = Math.max(1, task.cpus - 1) + def per_read_stats_arg = bsargs["per_read_stats"] ? "| bgzip > bamstats_results/bamstats.readstats.tsv.gz" : " > /dev/null" """ mkdir bamstats_results bamstats reads.bam -s $meta.alias -u \ -f bamstats_results/bamstats.flagstat.tsv -t $bamstats_threads \ + -i bamstats_results/bamstats.runids.tsv \ + -l bamstats_results/bamstats.basecallers.tsv \ --histograms histograms \ - | bgzip > bamstats_results/bamstats.readstats.tsv.gz + $per_read_stats_arg mv histograms/* bamstats_results/ - # extract the run IDs from the per-read stats - csvtk cut -tf runid bamstats_results/bamstats.readstats.tsv.gz \ - | csvtk del-header | sort | uniq > bamstats_results/run_ids + # get n_seqs from flagstats - need to sum them up + awk 'NR==1{for (i=1; i<=NF; i++) {ix[\$i] = i}} NR>1 {c+=\$ix["total"]} END{print c}' \ + bamstats_results/bamstats.flagstat.tsv > bamstats_results/n_seqs + # get unique run IDs (we add `-F '\\t'` as `awk` uses any stretch of whitespace + # as field delimiter otherwise and thus ignore empty columns) + awk -F '\\t' ' + NR==1 {for (i=1; i<=NF; i++) {ix[\$i] = i}} + # only print run_id if present + NR>1 && \$ix["run_id"] != "" {print \$ix["run_id"]} + ' bamstats_results/bamstats.runids.tsv | sort | uniq > bamstats_results/run_ids + # get unique basecall models + awk -F '\\t' ' + NR==1 {for (i=1; i<=NF; i++) {ix[\$i] = i}} + # only print run_id if present + NR>1 && \$ix["basecaller"] != "" {print \$ix["basecaller"]} + ' bamstats_results/bamstats.basecallers.tsv | sort | uniq > bamstats_results/basecallers """ } - - /** * Run `watchPath` on the input directory and return a channel of shape [metamap, * path-to-target-file]. The meta data is taken from the sample sheet in case one was @@ -446,36 +861,25 @@ process move_or_compress_fq_file { } -process fastcat { +process split_fq_file { label "ingress" label "wf_common" - cpus 3 + cpus 1 memory "2 GB" input: - tuple val(meta), path("input") - val extra_args + // don't stage `input` with a literal because we check the file extension + tuple val(meta), path(input) + val fastq_chunk output: - tuple val(meta), - path("seqs.fastq.gz"), - path("fastcat_stats") + tuple val(meta), path("fastq_chunks/*.fastq.gz") // TODO: change this to use new arity: '1..*' script: - String out = "seqs.fastq.gz" - String fastcat_stats_outdir = "fastcat_stats" + String cat = input.name.endsWith('.gz') ? "zcat" : "cat" + Integer lines_per_chunk = fastq_chunk * 4 """ - mkdir $fastcat_stats_outdir - fastcat \ - -s ${meta["alias"]} \ - -r >(bgzip -c > $fastcat_stats_outdir/per-read-stats.tsv.gz) \ - -f $fastcat_stats_outdir/per-file-stats.tsv \ - --histograms histograms \ - $extra_args \ - input \ - | bgzip > $out - - mv histograms/* $fastcat_stats_outdir - # extract the run IDs from the per-read stats - csvtk cut -tf runid $fastcat_stats_outdir/per-read-stats.tsv.gz \ - | csvtk del-header | sort | uniq > $fastcat_stats_outdir/run_ids + mkdir fastq_chunks + $cat "$input" \ + | split -l $lines_per_chunk -d --additional-suffix=.fastq.gz --filter='bgzip \ + > \$FILE' - fastq_chunks/seqs_ """ } @@ -489,7 +893,7 @@ process fastcat { * the argument-parsing to be tailored to a particular ingress function) * @return: map of parsed arguments */ -Map parse_arguments(Map arguments, Map extra_kwargs=[:]) { +Map parse_arguments(String func_name, Map arguments, Map extra_kwargs=[:]) { ArrayList required_args = ["input"] Map default_kwargs = [ "sample": null, @@ -497,12 +901,14 @@ Map parse_arguments(Map arguments, Map extra_kwargs=[:]) { "analyse_unclassified": false, "stats": true, "required_sample_types": [], - "watch_path": false + "watch_path": false, + "per_read_stats": false, + "allow_multiple_basecall_models": false, ] ArgumentParser parser = new ArgumentParser( args: required_args, kwargs: default_kwargs + extra_kwargs, - name: "fastq_ingress") + name: func_name) return parser.parse_args(arguments) } @@ -690,6 +1096,7 @@ Map create_metamap(Map arguments) { "barcode": null, "type": "test_sample", "run_ids": [], + "basecall_models": [], ], name: "create_metamap", ) @@ -726,30 +1133,53 @@ def get_sample_sheet(Path sample_sheet, ArrayList required_sample_types) { // in STDOUT. Thus, we use the somewhat clunky construct with `concat` and `last` // below. This lets the CSV channel only start to emit once the error checking is // done. - ch_err = validate_sample_sheet(sample_sheet, required_sample_types).map { + ch_err = validate_sample_sheet(sample_sheet, required_sample_types).map { stdoutput, sample_sheet_file -> // check if there was an error message - if (it) error "Invalid sample sheet: ${it}." - it + if (stdoutput) error "Invalid sample sheet: ${stdoutput}." + stdoutput } // concat the channel holding the path to the sample sheet to `ch_err` and call // `.last()` to make sure that the error-checking closure above executes before // emitting values from the CSV - return ch_err.concat(Channel.fromPath(sample_sheet)).last().splitCsv( + ch_sample_sheet = ch_err.concat(Channel.fromPath(sample_sheet)).last().splitCsv( header: true, quote: '"' ) + // in case there is an 'analysis_group' column, we need to define a `groupKey` to + // allow for non-blocking calls of `groupTuple` later (on the values in the + // 'analysis_group' column); we first collect the sample sheet in a single list of + // maps and then count the occurrences of each group before using these to create + // the `groupKey` objects; note that the below doesn't do anything if there is no + // 'analysis_group' column + ch_group_counts = ch_sample_sheet + | collect + | map { rows -> rows.collect { it.analysis_group } .countBy { it } } + + // now we `combine` the analysis group counts with the sample sheet channel and add + // the `groupKey` to the entries + ch_sample_sheet = ch_sample_sheet + | combine(ch_group_counts) + | map { row, group_counts -> + if (row.analysis_group) { + int counts = group_counts[row.analysis_group] + row = row + [analysis_group: groupKey(row.analysis_group, counts)] + } + row + } + return ch_sample_sheet } /** * Python script for validating a sample sheet. The script will write messages * to STDOUT if the sample sheet is invalid. In case there are no issues, no - * message is emitted. + * message is emitted. The sample sheet will be published to the output dir. * * @param: path to sample sheet CSV * @param: list of required sample types (optional) * @return: string (optional) */ process validate_sample_sheet { + publishDir params.out_dir, mode: 'copy', overwrite: true cpus 1 label "ingress" label "wf_common" @@ -757,10 +1187,27 @@ process validate_sample_sheet { input: path "sample_sheet.csv" val required_sample_types - output: stdout + output: + tuple stdout, path("sample_sheet.csv") script: String req_types_arg = required_sample_types ? "--required_sample_types "+required_sample_types.join(" ") : "" """ workflow-glue check_sample_sheet sample_sheet.csv $req_types_arg """ } + +// Generate an index for an input XAM file +process samtools_index { + cpus 4 + label "ingress" + label "wf_common" + memory 4.GB + input: + tuple val(meta), path("reads.bam") + output: + tuple val(meta), path("reads.bam"), path("reads.bam.bai") + script: + """ + samtools index -@ $task.cpus reads.bam + """ +} diff --git a/main.nf b/main.nf index 1f322eb..645bad1 100644 --- a/main.nf +++ b/main.nf @@ -369,7 +369,7 @@ process build_tables { } process makeReport { - label "cas9" + label "wf_common" cpus 2 memory "4 GB" input: @@ -381,6 +381,7 @@ process makeReport { path 'target_summary_table.tsv' path 'coverage_summary.tsv' path off_target_hotspots + val wf_version output: path "wf-cas9-*.html", emit: report @@ -397,6 +398,7 @@ process makeReport { --tile_coverage tile_coverage.tsv \ --coverage_summary coverage_summary.tsv \ --target_summary target_summary_table.tsv \ + --wf_version $wf_version \ $opttcov \ $optbghot """ @@ -535,6 +537,7 @@ workflow pipeline { build_tables.out.target_summary, build_tables.out.read_target_summary, bg_hotspots, + workflow.manifest.version ) pack_files_into_sample_dirs( @@ -581,6 +584,7 @@ workflow { "sample_sheet":params.sample_sheet, "analyse_unclassified":params.analyse_unclassified, "stats": true, + "per_read_stats": true, "fastcat_extra_args": ""]) pipeline(samples, ref_genome, targets) diff --git a/nextflow.config b/nextflow.config index 8117ede..52a3161 100644 --- a/nextflow.config +++ b/nextflow.config @@ -42,7 +42,7 @@ params { ] container_sha = "shaa7b95018145dc9c753d6092309ac6be5166a491a" - common_sha = "sha1c5febff9f75143710826498b093d9769a5edbb9" + common_sha = "shad28e55140f75a68f59bbecc74e880aeab16ab158" } } @@ -53,7 +53,7 @@ manifest { description = 'Summarise the results of Cas9 enrichment sequencing.' mainScript = 'main.nf' nextflowVersion = '>=23.04.2' - version = 'v1.1.1' + version = 'v1.1.2' } @@ -145,3 +145,8 @@ trace { overwrite = true file = "${params.out_dir}/execution/trace.txt" } + +env { + PYTHONNOUSERSITE = 1 + JAVA_TOOL_OPTIONS = "-Xlog:disable -Xlog:all=warning:stderr" +} \ No newline at end of file