From d129baf1a00e5ad28f8f3bb6210712349a6474f7 Mon Sep 17 00:00:00 2001 From: pdimens Date: Wed, 5 Feb 2025 16:44:50 -0500 Subject: [PATCH 1/8] start work on per-sample parallel demuxing --- harpy/scripts/demultiplex_gen1.py | 65 +++-------- harpy/scripts/demultiplex_gen1.py.bak | 161 ++++++++++++++++++++++++++ harpy/snakefiles/demultiplex_gen1.smk | 24 ++-- 3 files changed, 192 insertions(+), 58 deletions(-) create mode 100755 harpy/scripts/demultiplex_gen1.py.bak diff --git a/harpy/scripts/demultiplex_gen1.py b/harpy/scripts/demultiplex_gen1.py index 33bb25f8..b2082ca6 100755 --- a/harpy/scripts/demultiplex_gen1.py +++ b/harpy/scripts/demultiplex_gen1.py @@ -20,25 +20,6 @@ def read_barcodes(file_path, segment): continue return data_dict -def read_schema(file_path): - """Read and parse schema file of sampleid_segment""" - # one sample can have more than one code - # {segment : sample} - data_dict = {} - # codes can be Axx, Bxx, Cxx, Dxx - code_letters = set() - with open(file_path, 'r') as file: - for line in file: - try: - sample, segment_id = line.rstrip().split() - data_dict[segment_id] = sample - code_letters.add(segment_id[0]) - except ValueError: - # skip rows without two columns - continue - id_letter = code_letters.pop() - return id_letter, data_dict - def get_min_dist(needle, code_letter): minDist = 999 nbFound = 0 @@ -81,7 +62,9 @@ def get_read_codes(index_read, left_segment, right_segment): sys.stderr = sys.stdout = f outdir = snakemake.params.outdir qxrx = snakemake.params.qxrx - schema = snakemake.input.schema + sample_name = snakemake.params.sample + id_segments = snakemake.params.id_segments + id_letter = id_segments[0][0] r1 = snakemake.input.R1 r2 = snakemake.input.R2 i1 = snakemake.input.I1 @@ -97,14 +80,6 @@ def get_read_codes(index_read, left_segment, right_segment): "D" : read_barcodes(bx_d, "D"), } - #read schema - id_letter, samples_dict = read_schema(schema) - samples = list(set(samples_dict.values())) - samples.append("unidentified_data") - #create an array of files (one per sample) for writing - R1_output = {sample: open(f"{outdir}/{sample}.R1.fq", 'w') for sample in samples} - R2_output = {sample: open(f"{outdir}/{sample}.R2.fq", 'w') for sample in samples} - segments = {'A':'', 'B':'', 'C':'', 'D':''} unclear_read_map={} clear_read_map={} @@ -113,49 +88,45 @@ def get_read_codes(index_read, left_segment, right_segment): pysam.FastxFile(r2) as R2, pysam.FastxFile(i1, persist = False) as I1, pysam.FastxFile(i2, persist = False) as I2, - open(snakemake.output.valid, 'w') as clearBC_log, - open(snakemake.output.invalid, 'w') as unclearBC_log + open(f"{outdir}/{sample_name}.R1.fq", 'w') as R1_out, + open(f"{outdir}/{sample_name}.R2.fq", 'w') as R2_out, + open(snakemake.output.bx_info, 'w') as BC_log ): for r1_rec, r2_rec, i1_rec, i2_rec in zip(R1, R2, I1, I2): segments['A'], segments['C'], statusR1 = get_read_codes(i1_rec.sequence, "C", "A") segments['B'], segments['D'], statusR2 = get_read_codes(i2_rec.sequence, "D", "B") + if segments[id_letter] not in id_segments: + continue + statuses = [status_R1, statusR2] BX_code = segments['A'] + segments['C'] + segments['B']+ segments['D'] bc_tags = f"BX:Z:{BX_code}" if qxrx: bc_tags = f"RX:Z:{i1_rec.sequence}+{i2_rec.sequence}\tQX:Z:{i1_rec.quality}+{i2_rec.quality}\t{bc_tags}" r1_rec.comment += f"\t{bc_tags}" r2_rec.comment += f"\t{bc_tags}" - # search sample name - sample_name = samples_dict.get(segments[id_letter], "unidentified_data") - R1_output[sample_name].write(f"{r1_rec}\n") - R2_output[sample_name].write(f"{r2_rec}\n") + R1_out.write(f"{r1_rec}\n") + R2_out.write(f"{r2_rec}\n") - if (statusR1 == "unclear" or statusR2 == "unclear"): + if "unclear" in statuses: if BX_code in unclear_read_map: unclear_read_map[BX_code] += 1 else: unclear_read_map[BX_code] = 1 else: - if (statusR1 == "corrected" or statusR2 == "corrected"): + if "corrected" in statuses: if BX_code in clear_read_map: clear_read_map[BX_code][1] += 1 else: clear_read_map[BX_code] = [0,1] else: - if (statusR1 == "found" and statusR2 == "found"): + if all(status == "found" for status in statuses): if BX_code in clear_read_map: clear_read_map[BX_code][0] += 1 else: - clear_read_map[BX_code] = [1,0] - - for sample_name in samples: - R1_output[sample_name].close() - R2_output[sample_name].close() + clear_read_map[BX_code] = [1,0] - clearBC_log.write("Barcode\tCorrect reads\tCorrected reads\n" ) + BC_log.write("Barcode\tTotal Reads\tCorrect Reads\tCorrected Reads\n") for code in clear_read_map: - clearBC_log.write(code+"\t"+"\t".join(str(x) for x in clear_read_map[code])+"\n") - - unclearBC_log.write("Barcode\tReads\n") + BC_log.write(f"{code}\t{sum(clear_read_map[code])}\t{clear_read_map[code][0]}\t{clear_read_map[code][1]}\n") for code in unclear_read_map: - unclearBC_log.write(code +"\t"+str(unclear_read_map [code])+"\n") + BC_log.write(f"{code}\t{unclear_read_map[code]}\t0\t0\n") diff --git a/harpy/scripts/demultiplex_gen1.py.bak b/harpy/scripts/demultiplex_gen1.py.bak new file mode 100755 index 00000000..33bb25f8 --- /dev/null +++ b/harpy/scripts/demultiplex_gen1.py.bak @@ -0,0 +1,161 @@ +#!/usr/bin/env python + +import sys +import pysam +from Levenshtein import distance + +def read_barcodes(file_path, segment): + """Read and parse input barcode (segment) file of segmentsequence""" + data_dict = {} + with open(file_path, 'r') as file: + for line in file: + try: + code, seq = line.rstrip().split() + if code[0].upper() != segment: + sys.stderr.write(f"Segments in {file_path} are expected to begin with {segment}, but begin with {code[0].upper()}\n") + sys.exit(1) + data_dict[seq] = code + except ValueError: + # skip rows without two columns + continue + return data_dict + +def read_schema(file_path): + """Read and parse schema file of sampleid_segment""" + # one sample can have more than one code + # {segment : sample} + data_dict = {} + # codes can be Axx, Bxx, Cxx, Dxx + code_letters = set() + with open(file_path, 'r') as file: + for line in file: + try: + sample, segment_id = line.rstrip().split() + data_dict[segment_id] = sample + code_letters.add(segment_id[0]) + except ValueError: + # skip rows without two columns + continue + id_letter = code_letters.pop() + return id_letter, data_dict + +def get_min_dist(needle, code_letter): + minDist = 999 + nbFound = 0 + minSeq ="" + for seq in bar_codes[code_letter]: + d = distance(needle, seq) + if (d < minDist): + minDist = d + nbFound = 1 + minSeq= seq + elif (d == minDist): + nbFound += 1 + if (nbFound>1): + code_min_dist = f"{code_letter}00" + else: + code_min_dist = bar_codes[code_letter][minSeq] + return code_min_dist + +def get_read_codes(index_read, left_segment, right_segment): + left = index_read[0:6] # protocol-dependent + right = index_read[7:] + status = "found" + if left in bar_codes[left_segment]: + lc = bar_codes[left_segment][left] + else: + lc = get_min_dist(left, left_segment) + status = "corrected" + + if right in bar_codes[right_segment]: + rc = bar_codes[right_segment][right] + else: + rc = get_min_dist(right, right_segment) + status = "corrected" + + if (lc == f"{left_segment}00" or rc == f"{right_segment}00"): + status = "unclear" + return rc, lc, status + +with open(snakemake.log[0], "w") as f: + sys.stderr = sys.stdout = f + outdir = snakemake.params.outdir + qxrx = snakemake.params.qxrx + schema = snakemake.input.schema + r1 = snakemake.input.R1 + r2 = snakemake.input.R2 + i1 = snakemake.input.I1 + i2 = snakemake.input.I2 + bx_a = snakemake.input.segment_a + bx_b = snakemake.input.segment_b + bx_c = snakemake.input.segment_c + bx_d = snakemake.input.segment_d + bar_codes = { + "A" : read_barcodes(bx_a, "A"), + "B" : read_barcodes(bx_b, "B"), + "C" : read_barcodes(bx_c, "C"), + "D" : read_barcodes(bx_d, "D"), + } + + #read schema + id_letter, samples_dict = read_schema(schema) + samples = list(set(samples_dict.values())) + samples.append("unidentified_data") + #create an array of files (one per sample) for writing + R1_output = {sample: open(f"{outdir}/{sample}.R1.fq", 'w') for sample in samples} + R2_output = {sample: open(f"{outdir}/{sample}.R2.fq", 'w') for sample in samples} + + segments = {'A':'', 'B':'', 'C':'', 'D':''} + unclear_read_map={} + clear_read_map={} + with ( + pysam.FastxFile(r1) as R1, + pysam.FastxFile(r2) as R2, + pysam.FastxFile(i1, persist = False) as I1, + pysam.FastxFile(i2, persist = False) as I2, + open(snakemake.output.valid, 'w') as clearBC_log, + open(snakemake.output.invalid, 'w') as unclearBC_log + ): + for r1_rec, r2_rec, i1_rec, i2_rec in zip(R1, R2, I1, I2): + segments['A'], segments['C'], statusR1 = get_read_codes(i1_rec.sequence, "C", "A") + segments['B'], segments['D'], statusR2 = get_read_codes(i2_rec.sequence, "D", "B") + BX_code = segments['A'] + segments['C'] + segments['B']+ segments['D'] + bc_tags = f"BX:Z:{BX_code}" + if qxrx: + bc_tags = f"RX:Z:{i1_rec.sequence}+{i2_rec.sequence}\tQX:Z:{i1_rec.quality}+{i2_rec.quality}\t{bc_tags}" + r1_rec.comment += f"\t{bc_tags}" + r2_rec.comment += f"\t{bc_tags}" + # search sample name + sample_name = samples_dict.get(segments[id_letter], "unidentified_data") + R1_output[sample_name].write(f"{r1_rec}\n") + R2_output[sample_name].write(f"{r2_rec}\n") + + if (statusR1 == "unclear" or statusR2 == "unclear"): + if BX_code in unclear_read_map: + unclear_read_map[BX_code] += 1 + else: + unclear_read_map[BX_code] = 1 + else: + if (statusR1 == "corrected" or statusR2 == "corrected"): + if BX_code in clear_read_map: + clear_read_map[BX_code][1] += 1 + else: + clear_read_map[BX_code] = [0,1] + else: + if (statusR1 == "found" and statusR2 == "found"): + if BX_code in clear_read_map: + clear_read_map[BX_code][0] += 1 + else: + clear_read_map[BX_code] = [1,0] + + for sample_name in samples: + R1_output[sample_name].close() + R2_output[sample_name].close() + + clearBC_log.write("Barcode\tCorrect reads\tCorrected reads\n" ) + for code in clear_read_map: + clearBC_log.write(code+"\t"+"\t".join(str(x) for x in clear_read_map[code])+"\n") + + unclearBC_log.write("Barcode\tReads\n") + for code in unclear_read_map: + unclearBC_log.write(code +"\t"+str(unclear_read_map [code])+"\n") diff --git a/harpy/snakefiles/demultiplex_gen1.smk b/harpy/snakefiles/demultiplex_gen1.smk index dca08e0c..d191feca 100644 --- a/harpy/snakefiles/demultiplex_gen1.smk +++ b/harpy/snakefiles/demultiplex_gen1.smk @@ -24,14 +24,17 @@ def barcodedict(smpl): for i in f.readlines(): # a casual way to ignore empty lines or lines with !=2 fields try: - smpl, bc = i.split() - d[smpl] = bc + sample, bc = i.split() + if sample not in d: + d[sample] = [bc] + else: + d[sample].append(bc) except ValueError: continue return d samples = barcodedict(samplefile) -samplenames = [i for i in samples.keys()] +samplenames = [i for i in samples] rule barcode_segments: output: @@ -49,21 +52,22 @@ rule demultiplex: R2 = config["inputs"]["R2"], I1 = config["inputs"]["I1"], I2 = config["inputs"]["I2"], - schema = config["inputs"]["demultiplex_schema"], segment_a = f"{outdir}/workflow/segment_A.bc", segment_b = f"{outdir}/workflow/segment_B.bc", segment_c = f"{outdir}/workflow/segment_C.bc", segment_d = f"{outdir}/workflow/segment_D.bc" output: - fw = temp(collect(outdir + "/{sample}.R1.fq", sample = samplenames)), - rv = temp(collect(outdir + "/{sample}.R2.fq", sample = samplenames)), - valid = f"{outdir}/logs/valid_barcodes.log", - invalid = f"{outdir}/logs/invalid_barcodes.log" + fw = pipe(outdir + "/{sample}.R1.fq"), + rv = pipe(outdir + "/{sample}.R2.fq"), + bx_info = outdir + "/logs/{sample}.barcodes" log: f"{outdir}/logs/demultiplex.log" params: qxrx = config["include_qx_rx_tags"], - outdir = outdir + outdir = outdir, + #TODO this lambda function is wrong, it needs to return the sample, not the bc + sample = lambda wc: samples[wc.sample], + id_segments = lambda wc: samples[wc.get("sample")] conda: f"{envdir}/demultiplex.yaml" script: @@ -74,8 +78,6 @@ rule compress_fastq: outdir + "/{sample}.R{FR}.fq" output: outdir + "/{sample}.R{FR}.fq.gz" - params: - barcode = lambda wc: samples[wc.get("sample")] container: None shell: From 2d2be422f8897dc8d5bdfdf0899931c6d34aefc6 Mon Sep 17 00:00:00 2001 From: pdimens Date: Wed, 5 Feb 2025 16:46:59 -0500 Subject: [PATCH 2/8] fix lambda --- harpy/snakefiles/demultiplex_gen1.smk | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/harpy/snakefiles/demultiplex_gen1.smk b/harpy/snakefiles/demultiplex_gen1.smk index d191feca..d5c24824 100644 --- a/harpy/snakefiles/demultiplex_gen1.smk +++ b/harpy/snakefiles/demultiplex_gen1.smk @@ -63,11 +63,10 @@ rule demultiplex: log: f"{outdir}/logs/demultiplex.log" params: - qxrx = config["include_qx_rx_tags"], outdir = outdir, - #TODO this lambda function is wrong, it needs to return the sample, not the bc - sample = lambda wc: samples[wc.sample], - id_segments = lambda wc: samples[wc.get("sample")] + qxrx = config["include_qx_rx_tags"], + sample = lambda wc: wc.get("sample"), + id_segments = lambda wc: samples[wc.sample] conda: f"{envdir}/demultiplex.yaml" script: From af447b34c8daafbe7f61c8ea35332b7769fd1afb Mon Sep 17 00:00:00 2001 From: pdimens Date: Wed, 5 Feb 2025 16:54:54 -0500 Subject: [PATCH 3/8] fix status variable names --- harpy/demultiplex.py | 2 +- harpy/scripts/demultiplex_gen1.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/harpy/demultiplex.py b/harpy/demultiplex.py index 28f5d09e..efc35e86 100644 --- a/harpy/demultiplex.py +++ b/harpy/demultiplex.py @@ -39,7 +39,7 @@ def demultiplex(): @click.command(no_args_is_help = True, context_settings=dict(allow_interspersed_args=False), epilog = "Documentation: https://pdimens.github.io/harpy/workflows/demultiplex/") @click.option('-s', '--schema', required = True, type=click.Path(exists=True, dir_okay=False, readable=True), help = 'File of `sample`\\`barcode`') -@click.option('-t', '--threads', default = 4, show_default = True, type = click.IntRange(min = 1, max_open = True), help = 'Number of threads to use') +@click.option('-t', '--threads', default = 4, show_default = True, type = click.IntRange(min = 2, max_open = True), help = 'Number of threads to use') @click.option('-o', '--output-dir', type = click.Path(exists = False), default = "Demultiplex", show_default=True, help = 'Output directory name') @click.option('-q', '--qx-rx', is_flag = True, default = False, help = 'Include the `QX:Z` and `RX:Z` tags in the read header') @click.option('--container', is_flag = True, default = False, help = 'Use a container instead of conda') diff --git a/harpy/scripts/demultiplex_gen1.py b/harpy/scripts/demultiplex_gen1.py index b2082ca6..ccaa4d45 100755 --- a/harpy/scripts/demultiplex_gen1.py +++ b/harpy/scripts/demultiplex_gen1.py @@ -93,11 +93,11 @@ def get_read_codes(index_read, left_segment, right_segment): open(snakemake.output.bx_info, 'w') as BC_log ): for r1_rec, r2_rec, i1_rec, i2_rec in zip(R1, R2, I1, I2): - segments['A'], segments['C'], statusR1 = get_read_codes(i1_rec.sequence, "C", "A") - segments['B'], segments['D'], statusR2 = get_read_codes(i2_rec.sequence, "D", "B") + segments['A'], segments['C'], R1_status = get_read_codes(i1_rec.sequence, "C", "A") + segments['B'], segments['D'], R2_status = get_read_codes(i2_rec.sequence, "D", "B") if segments[id_letter] not in id_segments: continue - statuses = [status_R1, statusR2] + statuses = [R1_status, R2_status] BX_code = segments['A'] + segments['C'] + segments['B']+ segments['D'] bc_tags = f"BX:Z:{BX_code}" if qxrx: @@ -107,6 +107,7 @@ def get_read_codes(index_read, left_segment, right_segment): R1_out.write(f"{r1_rec}\n") R2_out.write(f"{r2_rec}\n") + # logging barcode identification if "unclear" in statuses: if BX_code in unclear_read_map: unclear_read_map[BX_code] += 1 From ece78505e87b39a407a61adafd9aff580d175675 Mon Sep 17 00:00:00 2001 From: pdimens Date: Thu, 6 Feb 2025 10:24:27 -0500 Subject: [PATCH 4/8] rm dep code, swap fetch() to non-indexed version --- harpy/bin/bx_stats.py | 2 +- harpy/bin/check_bam.py | 8 +++---- harpy/bin/concatenate_bam.py | 35 +++++++++++++++--------------- harpy/bin/deconvolve_alignments.py | 14 ++++++------ harpy/bin/leviathan_bx_shim.py | 6 ++--- harpy/snakefiles/sv_leviathan.smk | 17 --------------- 6 files changed, 32 insertions(+), 50 deletions(-) diff --git a/harpy/bin/bx_stats.py b/harpy/bin/bx_stats.py index 14a422ad..98296466 100755 --- a/harpy/bin/bx_stats.py +++ b/harpy/bin/bx_stats.py @@ -57,7 +57,7 @@ def writestats(x, writechrom, destination): all_bx = set() LAST_CONTIG = None - for read in alnfile.fetch(): + for read in alnfile.fetch(until_eof=True): chrom = read.reference_name # check if the current chromosome is different from the previous one # if so, print the dict to file and empty it (a consideration for RAM usage) diff --git a/harpy/bin/check_bam.py b/harpy/bin/check_bam.py index ecfd341d..41868d61 100755 --- a/harpy/bin/check_bam.py +++ b/harpy/bin/check_bam.py @@ -30,9 +30,9 @@ parser.error(f"{args.input} was not found") bam_in = args.input -if bam_in.lower().endswith(".bam"): - if not os.path.exists(bam_in + ".bai"): - pysam.index(bam_in) +#if bam_in.lower().endswith(".bam"): +# if not os.path.exists(bam_in + ".bai"): +# pysam.index(bam_in) # regex for EXACTLY AXXCXXBXXDXX haplotag = re.compile('^A[0-9][0-9]C[0-9][0-9]B[0-9][0-9]D[0-9][0-9]') @@ -52,7 +52,7 @@ BX_NOT_LAST = 0 NO_MI = 0 -for record in alnfile.fetch(): +for record in alnfile.fetch(until_eof=True): N_READS += 1 tags = [i[0] for i in record.get_tags()] # is there a bx tag? diff --git a/harpy/bin/concatenate_bam.py b/harpy/bin/concatenate_bam.py index 558f080b..de82380d 100755 --- a/harpy/bin/concatenate_bam.py +++ b/harpy/bin/concatenate_bam.py @@ -59,13 +59,13 @@ haplotag_limit = 96**4 # instantiate output file -if aln_list[0].lower().endswith(".bam"): - if not os.path.exists(f"{aln_list[0]}.bai"): - pysam.index(aln_list[0]) - # for housekeeping - DELETE_FIRST_INDEX = True - else: - DELETE_FIRST_INDEX = False +#if aln_list[0].lower().endswith(".bam"): +# if not os.path.exists(f"{aln_list[0]}.bai"): +# pysam.index(aln_list[0]) +# # for housekeeping +# DELETE_FIRST_INDEX = True +# else: +# DELETE_FIRST_INDEX = False with pysam.AlignmentFile(aln_list[0]) as xam_in: header = xam_in.header.to_dict() # Remove all @PG lines @@ -99,14 +99,14 @@ # create MI dict for this sample MI_LOCAL = {} # create index if it doesn't exist - if xam.lower().endswith(".bam"): - if not os.path.exists(f"{xam}.bai"): - pysam.index(xam) - DELETE_INDEX = True - else: - DELETE_INDEX = False + #if xam.lower().endswith(".bam"): + # if not os.path.exists(f"{xam}.bai"): + # pysam.index(xam) + # DELETE_INDEX = True + # else: + # DELETE_INDEX = False with pysam.AlignmentFile(xam) as xamfile: - for record in xamfile.fetch(): + for record in xamfile.fetch(until_eof=True): try: mi = record.get_tag("MI") # if previously converted for this sample, use that @@ -136,8 +136,7 @@ except KeyError: pass bam_out.write(record) - if DELETE_INDEX: - Path.unlink(f"{xam}.bai") + # just for consistent housekeeping -if DELETE_FIRST_INDEX: - Path.unlink(f"{aln_list[0]}.bai") +#if DELETE_FIRST_INDEX: +# Path.unlink(f"{aln_list[0]}.bai") diff --git a/harpy/bin/deconvolve_alignments.py b/harpy/bin/deconvolve_alignments.py index 12ea9017..53fe9f1a 100755 --- a/harpy/bin/deconvolve_alignments.py +++ b/harpy/bin/deconvolve_alignments.py @@ -95,18 +95,18 @@ def write_missingbx(bam, alnrecord): # MI is the name of the current molecule, starting a 1 (0+1) MI = 0 -if bam_input.lower().endswith(".bam") and not os.path.exists(bam_input + ".bai"): - try: - pysam.index(bam_input) - except (OSError, pysam.SamtoolsError) as e: - sys.stderr.write(f"Error indexing BAM file: {e}\n") - sys.exit(1) +#if bam_input.lower().endswith(".bam") and not os.path.exists(bam_input + ".bai"): +# try: +# pysam.index(bam_input) +# except (OSError, pysam.SamtoolsError) as e: +# sys.stderr.write(f"Error indexing BAM file: {e}\n") +# sys.exit(1) with ( pysam.AlignmentFile(bam_input) as alnfile, pysam.AlignmentFile(sys.stdout.buffer, "wb", template = alnfile) as outfile ): - for record in alnfile.fetch(): + for record in alnfile.fetch(until_eof=True): chrm = record.reference_name bp = record.query_alignment_length # check if the current chromosome is different from the previous one diff --git a/harpy/bin/leviathan_bx_shim.py b/harpy/bin/leviathan_bx_shim.py index 6d3b9a6c..dbc7e3e1 100755 --- a/harpy/bin/leviathan_bx_shim.py +++ b/harpy/bin/leviathan_bx_shim.py @@ -27,8 +27,8 @@ args = parser.parse_args() if not os.path.exists(args.input): parser.error(f"{args.input} was not found") -if not os.path.exists(args.input + ".bai"): - parser.error(f"{args.input}.bai was not found") +#if not os.path.exists(args.input + ".bai"): +# parser.error(f"{args.input}.bai was not found") # set up a generator for the BX tags bc_range = [f"{i}".zfill(2) for i in range(1,97)] @@ -37,7 +37,7 @@ with pysam.AlignmentFile(args.input) as bam_in, pysam.AlignmentFile(sys.stdout.buffer, 'wb', header=bam_in.header) as bam_out: # iterate through the bam file - for record in bam_in.fetch(): + for record in bam_in.fetch(until_eof=True): try: mi = record.get_tag("MI") if mi not in MI_BX: diff --git a/harpy/snakefiles/sv_leviathan.smk b/harpy/snakefiles/sv_leviathan.smk index f61e11b2..872e0640 100644 --- a/harpy/snakefiles/sv_leviathan.smk +++ b/harpy/snakefiles/sv_leviathan.smk @@ -41,23 +41,6 @@ def get_alignments(wildcards): aln = list(filter(r.match, bamlist)) return aln[0] -#rule process_alignments: -# input: -# get_alignments -# output: -# bam = temp(outdir + "/workflow/input/{sample}.bam"), -# bai = temp(outdir + "/workflow/input/{sample}.bam.bai") -# log: -# outdir + "/logs/process_alignments/{sample}.log" -# container: -# None -# shell: -# """ -# samtools index {input} 2> {log} -# leviathan_bx_shim.py {input} > {output.bam} 2>> {log} -# samtools index {output.bam} 2>> {log} -# """ - rule index_barcodes: input: get_alignments From 0a7cb3c69691db2842f256af3bd5bef00b3e75e4 Mon Sep 17 00:00:00 2001 From: pdimens Date: Thu, 6 Feb 2025 10:25:46 -0500 Subject: [PATCH 5/8] parallelized over samples mode --- harpy/demultiplex.py | 10 ++++++---- harpy/scripts/demultiplex_gen1.py | 2 +- harpy/snakefiles/demultiplex_gen1.smk | 18 +++++++++++------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/harpy/demultiplex.py b/harpy/demultiplex.py index efc35e86..1a3147cc 100644 --- a/harpy/demultiplex.py +++ b/harpy/demultiplex.py @@ -28,7 +28,7 @@ def demultiplex(): "harpy demultiplex gen1": [ { "name": "Parameters", - "options": ["--qx-rx","--schema"], + "options": ["--keep-unknown", "--qx-rx","--schema"], }, { "name": "Workflow Controls", @@ -38,10 +38,11 @@ def demultiplex(): } @click.command(no_args_is_help = True, context_settings=dict(allow_interspersed_args=False), epilog = "Documentation: https://pdimens.github.io/harpy/workflows/demultiplex/") +@click.option('-u', '--keep-unknown', is_flag = True, default = False, help = 'Keep reads that could not be demultiplexed') +@click.option('-q', '--qx-rx', is_flag = True, default = False, help = 'Include the `QX:Z` and `RX:Z` tags in the read header') @click.option('-s', '--schema', required = True, type=click.Path(exists=True, dir_okay=False, readable=True), help = 'File of `sample`\\`barcode`') @click.option('-t', '--threads', default = 4, show_default = True, type = click.IntRange(min = 2, max_open = True), help = 'Number of threads to use') @click.option('-o', '--output-dir', type = click.Path(exists = False), default = "Demultiplex", show_default=True, help = 'Output directory name') -@click.option('-q', '--qx-rx', is_flag = True, default = False, help = 'Include the `QX:Z` and `RX:Z` tags in the read header') @click.option('--container', is_flag = True, default = False, help = 'Use a container instead of conda') @click.option('--setup-only', is_flag = True, hidden = True, default = False, help = 'Setup the workflow and exit') @click.option('--hpc', type = HPCProfile(), help = 'Directory with HPC submission `config.yaml` file') @@ -52,7 +53,7 @@ def demultiplex(): @click.argument('R2_FQ', required=True, type=click.Path(exists=True, dir_okay=False, readable=True)) @click.argument('I1_FQ', required=True, type=click.Path(exists=True, dir_okay=False, readable=True)) @click.argument('I2_FQ', required=True, type=click.Path(exists=True, dir_okay=False, readable=True)) -def gen1(r1_fq, r2_fq, i1_fq, i2_fq, output_dir, schema, qx_rx, threads, snakemake, skip_reports, quiet, hpc, container, setup_only): +def gen1(r1_fq, r2_fq, i1_fq, i2_fq, output_dir, keep_unknown, schema, qx_rx, threads, snakemake, skip_reports, quiet, hpc, container, setup_only): """ Demultiplex Generation I haplotagged FASTQ files @@ -84,12 +85,13 @@ def gen1(r1_fq, r2_fq, i1_fq, i2_fq, output_dir, schema, qx_rx, threads, snakema "workflow" : "demultiplex gen1", "snakemake_log" : sm_log, "output_directory" : output_dir, + "include_qx_rx_tags" : qx_rx, + "keep_unknown" : keep_unknown, "workflow_call" : command.rstrip(), "conda_environments" : conda_envs, "reports" : { "skip": skip_reports }, - "include_qx_rx_tags" : qx_rx, "inputs" : { "demultiplex_schema" : Path(schema).resolve().as_posix(), "R1": Path(r1_fq).resolve().as_posix(), diff --git a/harpy/scripts/demultiplex_gen1.py b/harpy/scripts/demultiplex_gen1.py index ccaa4d45..6a5f7b55 100755 --- a/harpy/scripts/demultiplex_gen1.py +++ b/harpy/scripts/demultiplex_gen1.py @@ -126,7 +126,7 @@ def get_read_codes(index_read, left_segment, right_segment): else: clear_read_map[BX_code] = [1,0] - BC_log.write("Barcode\tTotal Reads\tCorrect Reads\tCorrected Reads\n") + BC_log.write("Barcode\tTotal_Reads\tCorrect_Reads\tCorrected_Reads\n") for code in clear_read_map: BC_log.write(f"{code}\t{sum(clear_read_map[code])}\t{clear_read_map[code][0]}\t{clear_read_map[code][1]}\n") for code in unclear_read_map: diff --git a/harpy/snakefiles/demultiplex_gen1.smk b/harpy/snakefiles/demultiplex_gen1.smk index d5c24824..c9992c05 100644 --- a/harpy/snakefiles/demultiplex_gen1.smk +++ b/harpy/snakefiles/demultiplex_gen1.smk @@ -7,6 +7,7 @@ outdir = config["output_directory"] envdir = os.path.join(os.getcwd(), outdir, "workflow", "envs") samplefile = config["inputs"]["demultiplex_schema"] skip_reports = config["reports"]["skip"] +keep_unknown = config["keep_unknown"] onstart: logger.logger.addHandler(logging.FileHandler(config["snakemake_log"])) @@ -18,22 +19,25 @@ onerror: os.remove(logger.logfile) ## the barcode log file ## -def barcodedict(smpl): +def parse_schema(smpl, keep_unknown): d = {} with open(smpl, "r") as f: for i in f.readlines(): # a casual way to ignore empty lines or lines with !=2 fields try: sample, bc = i.split() + id_segment = bc[0] if sample not in d: d[sample] = [bc] else: d[sample].append(bc) except ValueError: continue + if keep_unknown: + d["_unknown_sample"] = f"{id_segment}00" return d -samples = barcodedict(samplefile) +samples = parse_schema(samplefile, keep_unknown) samplenames = [i for i in samples] rule barcode_segments: @@ -57,11 +61,11 @@ rule demultiplex: segment_c = f"{outdir}/workflow/segment_C.bc", segment_d = f"{outdir}/workflow/segment_D.bc" output: - fw = pipe(outdir + "/{sample}.R1.fq"), - rv = pipe(outdir + "/{sample}.R2.fq"), - bx_info = outdir + "/logs/{sample}.barcodes" + fw = temp(f"{outdir}/{{sample}}.R1.fq"), + rv = temp(f"{outdir}/{{sample}}.R2.fq"), + bx_info = f"{outdir}/logs/sample_barcodes/{{sample}}.barcodes" log: - f"{outdir}/logs/demultiplex.log" + f"{outdir}/logs/{{sample}}.demultiplex.log" params: outdir = outdir, qxrx = config["include_qx_rx_tags"], @@ -136,7 +140,7 @@ rule report_config: with open(output[0], "w", encoding="utf-8") as yml: yaml.dump(configs, yml, default_flow_style= False, sort_keys=False, width=float('inf')) -rule qa_report: +rule quality_report: input: fqc = collect(outdir + "/reports/data/{sample}.R{FR}.fastqc", sample = samplenames, FR = [1,2]), mqc_yaml = outdir + "/workflow/multiqc.yaml" From 7f271ee96ea76a745ae3b54eb354367c105ddb8a Mon Sep 17 00:00:00 2001 From: pdimens Date: Thu, 6 Feb 2025 10:25:52 -0500 Subject: [PATCH 6/8] rm need to index --- harpy/snakefiles/preflight_bam.smk | 33 +++++++++++++++--------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/harpy/snakefiles/preflight_bam.smk b/harpy/snakefiles/preflight_bam.smk index c12e4253..7796b2af 100644 --- a/harpy/snakefiles/preflight_bam.smk +++ b/harpy/snakefiles/preflight_bam.smk @@ -26,32 +26,31 @@ def get_alignments(wildcards): aln = list(filter(r.match, bamlist)) return aln[0] -def get_align_index(wildcards): - """returns a list with the bai index file for the sample based on wildcards.sample""" - r = re.compile(fr"(.*/{wildcards.sample})\.(bam|sam)$", flags = re.IGNORECASE) - aln = list(filter(r.match, bamlist)) - return aln[0] + ".bai" +#def get_align_index(wildcards): +# """returns a list with the bai index file for the sample based on wildcards.sample""" +# r = re.compile(fr"(.*/{wildcards.sample})\.(bam|sam)$", flags = re.IGNORECASE) +# aln = list(filter(r.match, bamlist)) +# return aln[0] + ".bai" -rule index_alignments: - input: - lambda wc: bamdict[wc.bam] - output: - "{bam}.bai" - container: - None - shell: - "samtools index {input}" +#rule index_alignments: +# input: +# lambda wc: bamdict[wc.bam] +# output: +# "{bam}.bai" +# container: +# None +# shell: +# "samtools index {input}" rule check_bam: input: - bam = get_alignments, - bai = get_align_index + get_alignments output: temp(outdir + "/{sample}.log") container: None shell: - "check_bam.py {input.bam} > {output}" + "check_bam.py {input} > {output}" rule concat_results: input: From c125b90df2918dca3fdf236f211fed214e7017ef Mon Sep 17 00:00:00 2001 From: pdimens Date: Thu, 6 Feb 2025 11:23:26 -0500 Subject: [PATCH 7/8] add duplicate for testing purposes --- test/demux/samples.schema | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/demux/samples.schema b/test/demux/samples.schema index 3c33c123..54a0ffaf 100644 --- a/test/demux/samples.schema +++ b/test/demux/samples.schema @@ -10,4 +10,5 @@ Sample_25 C25 Sample_26 C26 Sample_27 C27 Sample_28 C28 -Sample_52 C52 \ No newline at end of file +Sample_52 C52 +Sample_52 C11 \ No newline at end of file From a6a24e3e64ce26d13fe5fe80b19d6f8edd21b1cd Mon Sep 17 00:00:00 2001 From: pdimens Date: Thu, 6 Feb 2025 11:23:34 -0500 Subject: [PATCH 8/8] better/nicer errors --- harpy/_validations.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/harpy/_validations.py b/harpy/_validations.py index 60fe55a7..c53ffe40 100644 --- a/harpy/_validations.py +++ b/harpy/_validations.py @@ -265,7 +265,7 @@ def validate_demuxschema(infile): try: sample, segment_id = line.rstrip().split() if not segment_pattern.match(segment_id): - print_error("invalid segment format", f"Segment ID '{segment_id}' does not follow the expected format.") + print_error("invalid segment format", f"Segment ID [green]{segment_id}[/green] does not follow the expected format.") print_solution("This haplotagging design expects segments to follow the format of letter [green bold]A-D[/green bold] followed by [bold]two digits[/bold], e.g. [green bold]C51[/green bold]). Check that your ID segments or formatted correctly and that you are attempting to demultiplex with a workflow appropriate for your data design.") sys.exit(1) code_letters.add(segment_id[0]) @@ -283,10 +283,15 @@ def validate_demuxschema(infile): # skip rows without two columns continue if not code_letters: - print_error("incorrect schema format", f"Schema file {os.path.basename(infile)} has no valid rows. Rows should be samplesegment, e.g. sample_01C75") + print_error("incorrect schema format", f"Schema file [blue]{os.path.basename(infile)}[/blue] has no valid rows. Rows should be samplesegment, e.g. sample_01C75") sys.exit(1) if len(code_letters) > 1: - print("invalid schema", f"Schema file {os.path.basename(infile)} has sample IDs expected to be identified across multiple barcode segments. All sample IDs for this technology should be in a single segment, such as [bold green]C[/bold green] or [bold green]D[/bold green].") + print_error("invalid schema", f"Schema file [blue]{os.path.basename(infile)}[/blue] has sample IDs occurring in different barcode segments.") + print_solution_with_culprits( + "All sample IDs for this barcode design should be in a single segment, such as [bold green]C[/bold green] or [bold green]D[/bold green]. Make sure the schema contains only one segment.", + "The segments identified in the schema:" + ) + click.echo(", ".join(code_letters)) sys.exit(1) def validate_regions(regioninput, genome):