Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

attempt parallelized demultiplexing #200

Merged
merged 8 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 18 additions & 47 deletions harpy/scripts/demultiplex_gen1.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,6 @@ def read_barcodes(file_path, segment):
continue
return data_dict

def read_schema(file_path):
"""Read and parse schema file of sample<tab>id_segment"""
# one sample can have more than one code
# {segment : sample}
data_dict = {}
# codes can be Axx, Bxx, Cxx, Dxx
code_letters = set()
with open(file_path, 'r') as file:
for line in file:
try:
sample, segment_id = line.rstrip().split()
data_dict[segment_id] = sample
code_letters.add(segment_id[0])
except ValueError:
# skip rows without two columns
continue
id_letter = code_letters.pop()
return id_letter, data_dict

def get_min_dist(needle, code_letter):
minDist = 999
nbFound = 0
Expand Down Expand Up @@ -81,7 +62,9 @@ def get_read_codes(index_read, left_segment, right_segment):
sys.stderr = sys.stdout = f
outdir = snakemake.params.outdir
qxrx = snakemake.params.qxrx
schema = snakemake.input.schema
sample_name = snakemake.params.sample
id_segments = snakemake.params.id_segments
id_letter = id_segments[0][0]
r1 = snakemake.input.R1
r2 = snakemake.input.R2
i1 = snakemake.input.I1
Expand All @@ -97,14 +80,6 @@ def get_read_codes(index_read, left_segment, right_segment):
"D" : read_barcodes(bx_d, "D"),
}

#read schema
id_letter, samples_dict = read_schema(schema)
samples = list(set(samples_dict.values()))
samples.append("unidentified_data")
#create an array of files (one per sample) for writing
R1_output = {sample: open(f"{outdir}/{sample}.R1.fq", 'w') for sample in samples}
R2_output = {sample: open(f"{outdir}/{sample}.R2.fq", 'w') for sample in samples}

segments = {'A':'', 'B':'', 'C':'', 'D':''}
unclear_read_map={}
clear_read_map={}
Expand All @@ -113,49 +88,45 @@ def get_read_codes(index_read, left_segment, right_segment):
pysam.FastxFile(r2) as R2,
pysam.FastxFile(i1, persist = False) as I1,
pysam.FastxFile(i2, persist = False) as I2,
open(snakemake.output.valid, 'w') as clearBC_log,
open(snakemake.output.invalid, 'w') as unclearBC_log
open(f"{outdir}/{sample_name}.R1.fq", 'w') as R1_out,
open(f"{outdir}/{sample_name}.R2.fq", 'w') as R2_out,
open(snakemake.output.bx_info, 'w') as BC_log
):
for r1_rec, r2_rec, i1_rec, i2_rec in zip(R1, R2, I1, I2):
segments['A'], segments['C'], statusR1 = get_read_codes(i1_rec.sequence, "C", "A")
segments['B'], segments['D'], statusR2 = get_read_codes(i2_rec.sequence, "D", "B")
if segments[id_letter] not in id_segments:
continue
statuses = [status_R1, statusR2]
BX_code = segments['A'] + segments['C'] + segments['B']+ segments['D']
bc_tags = f"BX:Z:{BX_code}"
if qxrx:
bc_tags = f"RX:Z:{i1_rec.sequence}+{i2_rec.sequence}\tQX:Z:{i1_rec.quality}+{i2_rec.quality}\t{bc_tags}"
r1_rec.comment += f"\t{bc_tags}"
r2_rec.comment += f"\t{bc_tags}"
# search sample name
sample_name = samples_dict.get(segments[id_letter], "unidentified_data")
R1_output[sample_name].write(f"{r1_rec}\n")
R2_output[sample_name].write(f"{r2_rec}\n")
R1_out.write(f"{r1_rec}\n")
R2_out.write(f"{r2_rec}\n")

if (statusR1 == "unclear" or statusR2 == "unclear"):
if "unclear" in statuses:
if BX_code in unclear_read_map:
unclear_read_map[BX_code] += 1
else:
unclear_read_map[BX_code] = 1
else:
if (statusR1 == "corrected" or statusR2 == "corrected"):
if "corrected" in statuses:
if BX_code in clear_read_map:
clear_read_map[BX_code][1] += 1
else:
clear_read_map[BX_code] = [0,1]
else:
if (statusR1 == "found" and statusR2 == "found"):
if all(status == "found" for status in statuses):
if BX_code in clear_read_map:
clear_read_map[BX_code][0] += 1
else:
clear_read_map[BX_code] = [1,0]

for sample_name in samples:
R1_output[sample_name].close()
R2_output[sample_name].close()
clear_read_map[BX_code] = [1,0]

clearBC_log.write("Barcode\tCorrect reads\tCorrected reads\n" )
BC_log.write("Barcode\tTotal Reads\tCorrect Reads\tCorrected Reads\n")
for code in clear_read_map:
clearBC_log.write(code+"\t"+"\t".join(str(x) for x in clear_read_map[code])+"\n")

unclearBC_log.write("Barcode\tReads\n")
BC_log.write(f"{code}\t{sum(clear_read_map[code])}\t{clear_read_map[code][0]}\t{clear_read_map[code][1]}\n")
for code in unclear_read_map:
unclearBC_log.write(code +"\t"+str(unclear_read_map [code])+"\n")
BC_log.write(f"{code}\t{unclear_read_map[code]}\t0\t0\n")
161 changes: 161 additions & 0 deletions harpy/scripts/demultiplex_gen1.py.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#!/usr/bin/env python

import sys
import pysam
from Levenshtein import distance

def read_barcodes(file_path, segment):
"""Read and parse input barcode (segment) file of segment<tab>sequence"""
data_dict = {}
with open(file_path, 'r') as file:
for line in file:
try:
code, seq = line.rstrip().split()
if code[0].upper() != segment:
sys.stderr.write(f"Segments in {file_path} are expected to begin with {segment}, but begin with {code[0].upper()}\n")
sys.exit(1)
data_dict[seq] = code
except ValueError:
# skip rows without two columns
continue
return data_dict

def read_schema(file_path):
"""Read and parse schema file of sample<tab>id_segment"""
# one sample can have more than one code
# {segment : sample}
data_dict = {}
# codes can be Axx, Bxx, Cxx, Dxx
code_letters = set()
with open(file_path, 'r') as file:
for line in file:
try:
sample, segment_id = line.rstrip().split()
data_dict[segment_id] = sample
code_letters.add(segment_id[0])
except ValueError:
# skip rows without two columns
continue
id_letter = code_letters.pop()
return id_letter, data_dict

def get_min_dist(needle, code_letter):
minDist = 999
nbFound = 0
minSeq =""
for seq in bar_codes[code_letter]:
d = distance(needle, seq)
if (d < minDist):
minDist = d
nbFound = 1
minSeq= seq
elif (d == minDist):
nbFound += 1
if (nbFound>1):
code_min_dist = f"{code_letter}00"
else:
code_min_dist = bar_codes[code_letter][minSeq]
return code_min_dist

def get_read_codes(index_read, left_segment, right_segment):
left = index_read[0:6] # protocol-dependent
right = index_read[7:]
status = "found"
if left in bar_codes[left_segment]:
lc = bar_codes[left_segment][left]
else:
lc = get_min_dist(left, left_segment)
status = "corrected"

if right in bar_codes[right_segment]:
rc = bar_codes[right_segment][right]
else:
rc = get_min_dist(right, right_segment)
status = "corrected"

if (lc == f"{left_segment}00" or rc == f"{right_segment}00"):
status = "unclear"
return rc, lc, status

with open(snakemake.log[0], "w") as f:
sys.stderr = sys.stdout = f
outdir = snakemake.params.outdir
qxrx = snakemake.params.qxrx
schema = snakemake.input.schema
r1 = snakemake.input.R1
r2 = snakemake.input.R2
i1 = snakemake.input.I1
i2 = snakemake.input.I2
bx_a = snakemake.input.segment_a
bx_b = snakemake.input.segment_b
bx_c = snakemake.input.segment_c
bx_d = snakemake.input.segment_d
bar_codes = {
"A" : read_barcodes(bx_a, "A"),
"B" : read_barcodes(bx_b, "B"),
"C" : read_barcodes(bx_c, "C"),
"D" : read_barcodes(bx_d, "D"),
}

#read schema
id_letter, samples_dict = read_schema(schema)
samples = list(set(samples_dict.values()))
samples.append("unidentified_data")
#create an array of files (one per sample) for writing
R1_output = {sample: open(f"{outdir}/{sample}.R1.fq", 'w') for sample in samples}
R2_output = {sample: open(f"{outdir}/{sample}.R2.fq", 'w') for sample in samples}

segments = {'A':'', 'B':'', 'C':'', 'D':''}
unclear_read_map={}
clear_read_map={}
with (
pysam.FastxFile(r1) as R1,
pysam.FastxFile(r2) as R2,
pysam.FastxFile(i1, persist = False) as I1,
pysam.FastxFile(i2, persist = False) as I2,
open(snakemake.output.valid, 'w') as clearBC_log,
open(snakemake.output.invalid, 'w') as unclearBC_log
):
for r1_rec, r2_rec, i1_rec, i2_rec in zip(R1, R2, I1, I2):
segments['A'], segments['C'], statusR1 = get_read_codes(i1_rec.sequence, "C", "A")
segments['B'], segments['D'], statusR2 = get_read_codes(i2_rec.sequence, "D", "B")
BX_code = segments['A'] + segments['C'] + segments['B']+ segments['D']
bc_tags = f"BX:Z:{BX_code}"
if qxrx:
bc_tags = f"RX:Z:{i1_rec.sequence}+{i2_rec.sequence}\tQX:Z:{i1_rec.quality}+{i2_rec.quality}\t{bc_tags}"
r1_rec.comment += f"\t{bc_tags}"
r2_rec.comment += f"\t{bc_tags}"
# search sample name
sample_name = samples_dict.get(segments[id_letter], "unidentified_data")
R1_output[sample_name].write(f"{r1_rec}\n")
R2_output[sample_name].write(f"{r2_rec}\n")

if (statusR1 == "unclear" or statusR2 == "unclear"):
if BX_code in unclear_read_map:
unclear_read_map[BX_code] += 1
else:
unclear_read_map[BX_code] = 1
else:
if (statusR1 == "corrected" or statusR2 == "corrected"):
if BX_code in clear_read_map:
clear_read_map[BX_code][1] += 1
else:
clear_read_map[BX_code] = [0,1]
else:
if (statusR1 == "found" and statusR2 == "found"):
if BX_code in clear_read_map:
clear_read_map[BX_code][0] += 1
else:
clear_read_map[BX_code] = [1,0]

for sample_name in samples:
R1_output[sample_name].close()
R2_output[sample_name].close()

clearBC_log.write("Barcode\tCorrect reads\tCorrected reads\n" )
for code in clear_read_map:
clearBC_log.write(code+"\t"+"\t".join(str(x) for x in clear_read_map[code])+"\n")

unclearBC_log.write("Barcode\tReads\n")
for code in unclear_read_map:
unclearBC_log.write(code +"\t"+str(unclear_read_map [code])+"\n")
23 changes: 12 additions & 11 deletions harpy/snakefiles/demultiplex_gen1.smk
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ def barcodedict(smpl):
for i in f.readlines():
# a casual way to ignore empty lines or lines with !=2 fields
try:
smpl, bc = i.split()
d[smpl] = bc
sample, bc = i.split()
if sample not in d:
d[sample] = [bc]
else:
d[sample].append(bc)
except ValueError:
continue
return d

samples = barcodedict(samplefile)
samplenames = [i for i in samples.keys()]
samplenames = [i for i in samples]

rule barcode_segments:
output:
Expand All @@ -49,21 +52,21 @@ rule demultiplex:
R2 = config["inputs"]["R2"],
I1 = config["inputs"]["I1"],
I2 = config["inputs"]["I2"],
schema = config["inputs"]["demultiplex_schema"],
segment_a = f"{outdir}/workflow/segment_A.bc",
segment_b = f"{outdir}/workflow/segment_B.bc",
segment_c = f"{outdir}/workflow/segment_C.bc",
segment_d = f"{outdir}/workflow/segment_D.bc"
output:
fw = temp(collect(outdir + "/{sample}.R1.fq", sample = samplenames)),
rv = temp(collect(outdir + "/{sample}.R2.fq", sample = samplenames)),
valid = f"{outdir}/logs/valid_barcodes.log",
invalid = f"{outdir}/logs/invalid_barcodes.log"
fw = pipe(outdir + "/{sample}.R1.fq"),
rv = pipe(outdir + "/{sample}.R2.fq"),
bx_info = outdir + "/logs/{sample}.barcodes"
log:
f"{outdir}/logs/demultiplex.log"
params:
outdir = outdir,
qxrx = config["include_qx_rx_tags"],
outdir = outdir
sample = lambda wc: wc.get("sample"),
id_segments = lambda wc: samples[wc.sample]
conda:
f"{envdir}/demultiplex.yaml"
script:
Expand All @@ -74,8 +77,6 @@ rule compress_fastq:
outdir + "/{sample}.R{FR}.fq"
output:
outdir + "/{sample}.R{FR}.fq.gz"
params:
barcode = lambda wc: samples[wc.get("sample")]
container:
None
shell:
Expand Down
Loading