Skip to content

Commit

Permalink
add nanopore commands and fix equidistant bug
Browse files Browse the repository at this point in the history
  • Loading branch information
David Feldman committed Feb 9, 2024
1 parent ddb5e6f commit b068da3
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 11 deletions.
33 changes: 25 additions & 8 deletions src/ngs_analysis/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
write_fake_fastq, write_fasta)
from .timer import Timer
from .types import *
from .utils import assert_unique, nglob, sample_at_coverage
from .utils import nglob
from .load import *
from .nanopore import setup_from_nanopore_fastqs, demux_reads, write_flye_commands, collect_assemblies


### MAIN STEPS
Expand Down Expand Up @@ -326,8 +327,10 @@ def parse_sequences(config, sequences):
re_insert = re.compile('{left_adapter}(.*){right_adapter}'.format(**config))
capture = config['protein'].get('capture', {})
optional = config['protein'].get('optional', [])
re_proteins = [re.compile(format_string_to_capture_regex(
x, optional, **capture)) for x in config['protein']['patterns']]
if 'protein' in config:

re_proteins = [re.compile(format_string_to_capture_regex(
x, optional, **capture)) for x in config['protein']['patterns']]

arr = []
for i, dna in enumerate(sequences):
Expand All @@ -344,7 +347,8 @@ def parse_sequences(config, sequences):
m = re.findall(pat, insert)
if m:
entry[name] = m[0]

if 'protein' not in config:
continue
protein_start = re.search(config['protein']['start_at'], insert)
if protein_start is None:
continue
Expand Down Expand Up @@ -376,13 +380,18 @@ def match_mapped(df_mapped: Candidates, field):
distance = f'{field}_distance'
equidistant = f'{field}_match_equidistant'

df_mapped = df_mapped.copy()
# the reference is deduplicated in mmseqs_make_design_db, so
# we have to track counts separately
f = f'3_mapped/map/{field}.counts.csv'
counts = pd.read_csv(f).rename(columns={'name': 'reference_name'})

df_mapped = df_mapped.merge(counts)
df_mapped[distance] = [Levenshtein.distance(a,b) for a,b in df_mapped[['query', 'reference']].values]
# keep only the minimum values
min_distance = df_mapped.groupby('read_index')[distance].transform('min')
df_mapped = df_mapped.loc[lambda x: x[distance] == min_distance]

A = df_mapped.groupby('read_index').size().rename(equidistant).reset_index()
A = df_mapped.groupby('read_index')['count'].sum().rename(equidistant).reset_index()
B = (df_mapped.sort_values(distance).drop_duplicates('read_index')
.rename(columns={'reference_name': match})[['read_index', match, distance]])
return A.merge(B)[['read_index', match, distance, equidistant]]
Expand Down Expand Up @@ -473,10 +482,16 @@ def mmseqs_make_design_db():
for field in fields:
aa = field.endswith('_aa')
f = f'3_mapped/map/{field}.fa' # should live in get_filenames
seqs_to_index = df_designs[['name', field]]
write_fasta(f, seqs_to_index.drop_duplicates(field))
seqs_to_index = (df_designs[['name', field]]
.assign(count=lambda x: x.groupby(field).transform('size'))
.drop_duplicates(field)
)
write_fasta(f, seqs_to_index[['name', field]])
make_mmseqs_db(f, aa, is_query=False)

f = f'3_mapped/map/{field}.counts.csv'
seqs_to_index[['name', 'count']].to_csv(f, index=None)


def make_mmseqs_db(fasta_file, dbtype_is_amino_acid, is_query):
"""
Expand Down Expand Up @@ -646,6 +661,8 @@ def main():
'merge_read_pairs',
'parse_reads',
'map_parsed_reads',
'setup_from_nanopore_fastqs',
'demux_reads',
]
# if the command name is different from the function name
named = {
Expand Down
17 changes: 16 additions & 1 deletion src/ngs_analysis/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import yaml

from .constants import *
from .sequence import quick_translate
from .sequence import quick_translate, read_fastq
from .types import *
from .utils import *

Expand Down Expand Up @@ -204,3 +204,18 @@ def load_data_for_plotting(samples, simulate=False):
def reference_contains_source():
return 'source' in pd.read_csv(reference_dna_table, nrows=1)


def load_fastq(samples):
# load fastq data
arr = []
for sample in samples:
reads, quality, names = read_fastq(
f'1_reads/{sample}.fastq', include_quality=True,
include_name=True, full_name=True)
(pd.DataFrame({'read': reads, 'name': names, 'quality': quality})
.assign(read_index=lambda x: np.arange(len(x)))
.assign(sample=sample)
.pipe(arr.append)
)
return pd.concat(arr)

183 changes: 183 additions & 0 deletions src/ngs_analysis/nanopore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import gzip
import pandas as pd
import os
from .load import load_config, load_fastq, load_samples
from .utils import nglob, csv_frame, assign_format
from .timer import Timer
from .types import Samples
from .sequence import reverse_complement, write_fastq, read_fasta
import re
import shutil


def setup_from_nanopore_fastqs(path_to_fastq_pass, min_reads=10,
exclude='unclassified', write_sample_table=True):
"""Set up analysis from fastq.gz files produced by dorado
basecalling and barcode demultiplexing.
Example:
remote = '/path/to/nanopore/experiment/basecalling/fastq_pass/'
setup_from_nanopore_fastqs(remote)
:param path_to_fastq_pass: path to the directory (usually named
"fastq_pass") with folders containing .fastq.gz files for each
barcode
:param min_reads: skip barcodes with fewer reads
:param exclude: skip barcodes (folders) matching this regex
:param write_sample_table: write "samples.csv"
"""
search = os.path.join(path_to_fastq_pass, '*')
folders = [x for x in nglob(search)
if os.path.isdir(x) and not re.findall(exclude, x)]
samples = []
# combine nanopore output fastq files
for folder in folders:
name = os.path.basename(folder)
files = nglob(f'{folder}/*.fastq.gz')

num_reads = 0
arr = []
for f in files:
with gzip.open(f, 'rt') as fh:
try:
arr += [fh.read()]
num_reads += len(arr[-1].split('\n')) // 4
except:
continue
if num_reads < min_reads:
print(f'Skipping {name}, only detected {num_reads} reads')
continue

print(f'Wrote 1_reads/{name}.fastq with {num_reads} reads '
f'from {len(files)} .fastq.gz files')
with open(f'1_reads/{name}.fastq', 'w') as fh:
fh.write(''.join(arr))

samples += [name]
if write_sample_table:
(pd.DataFrame({'fastq_name': samples, 'sample': samples})
.pipe(Samples)
.to_csv('samples.csv', index=None))


def demux_reads():
shutil.rmtree('4_demux', ignore_errors=True)
os.makedirs('4_demux')
print('Cleared files from 4_demux/')

c = load_config()['demux']
max_reads_per_assembly = c.get('max_reads_per_assembly', 500)

# filter and sort reads based on this table
with Timer(verbose='Loading mapped reads...'):
df_parsed = csv_frame('2_parsed/{sample}.parsed.pq',
columns=['read_index', 'read_length', 'insert_length'])
df_mapped = csv_frame('3_mapped/{sample}.mapped.csv')
df_fastq = load_fastq(load_samples()['sample'])

key = ['sample', 'read_index']
df = (df_parsed
.merge(df_mapped, on=key)
.merge(df_fastq, on=key)
.query(c['gate'])
.pipe(assign_format, fastq_text='{name}\n{read}\n+\n{quality}')
.groupby(['sample', c['field']])
.head(max_reads_per_assembly)
)

num_files = 0
for (sample, name), df_ in df.groupby(['sample', c['field']]):
f = f'4_demux/{sample}/{name}.fastq'
os.makedirs(os.path.dirname(f), exist_ok=True)
with open(f, 'w') as fh:
fh.write('\n'.join(df_['fastq_text']))
num_files += 1

print(f'Wrote {num_files} files to 4_demux/{{sample}}/{{name}}.fastq')

write_flye_commands()


def write_flye_commands(search='4_demux/*/*.fastq', flags='--nano-hq'):
"""Write commands that can be run with bash, parallel, job submission etc.
"""
f_out = '4_demux/flye.sh'
if not shutil.which('flye'):
print('Warning! flye executable not found')
arr = []
for fastq in nglob(search):
out_dir = fastq.replace(".fastq", "")
arr += [f'flye {flags} {fastq} --out-dir {out_dir}']
with open(f_out, 'w') as fh:
fh.write('\n'.join(arr))
print(f'Wrote {len(arr)} commands to {f_out}')
print('These can be run in parallel with a command like:')
print(' cat 4_demux/flye.sh | parallel -j $(nproc)')


def collect_assemblies(backbone_start=None):
"""Collect flye assemblies into a new sample, optionally
reverse-complementing/re-indexing.
:param backbone_start: unique sequence at the start of the plasmid,
typically ~15 nt
"""
search = '4_demux/*/*/assembly.fasta'
files = nglob(search)
if len(files) == 0:
print(f'No files matched {search}, aborting')

seqs = [read_fasta(f)[0][1] for f in files]

df_assembled = (pd.DataFrame({'file': files, 'seq': seqs})
.assign(sample=lambda x: x['file'].str.split('/').str[1])
.assign(name=lambda x: x['file'].str.split('/').str[2])
.assign(length=lambda x: x['seq'].str.len())
)

if backbone_start == 'pT02':
backbone_start = 'ATTCTCCTTGGAATT'

if backbone_start is not None:
reindex = lambda x: reindex_plasmid_sequence(x, [backbone_start])
else:
# in one test, flye made an antisense assembly out of sense reads
reindex = reverse_complement

df_assembled['read_name'] = df_assembled['sample'] + '_' + df_assembled['name']
df_assembled['sense'] = df_assembled['seq'].apply(reindex)

f = '1_reads/assembled.fastq'
write_fastq('1_reads/assembled.fastq', df_assembled['read_name'], df_assembled['sense'])
print(f'Wrote {len(df_assembled)} assembled sequences to {f}')

df_samples = load_samples()
if 'assembled' not in df_samples['sample'].values:
new_sample = pd.DataFrame({'sample': ['assembled'], 'fastq_name': ['assembled']})
(pd.concat([df_samples, new_sample])
.reset_index(drop=True).pipe(Samples)
.to_csv('samples.csv', index=None)
)
print('Added sample "assembled" to samples.csv')


def reindex_plasmid_sequence(seq, starting_sequence_candidates, missing='ignore'):
"""Reindex genbank sequence and features.
"""
seq = seq.upper()
seq_rc = reverse_complement(seq)
for start in starting_sequence_candidates:
start = start.upper()
if start in seq:
break
elif start in seq_rc:
seq = seq_rc
break
else:
if missing == 'ignore':
return seq
else:
raise ValueError('Plasmid not recognized')

offset = seq.index(start.upper())
return seq[offset:] + seq[:offset]
4 changes: 3 additions & 1 deletion src/ngs_analysis/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ def format_fake_fastq(list_or_records):
return '\n'.join(lines)


def write_fastq(filename, names, sequences, quality_scores):
def write_fastq(filename, names, sequences, quality_scores=None):
if quality_scores is None:
quality_scores = ['G' * len(x) for x in sequences]
with open(filename, 'w') as fh:
fh.write(format_fastq(names, sequences, quality_scores))

Expand Down
2 changes: 1 addition & 1 deletion src/ngs_analysis/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ReferenceDna(pa.DataFrameModel):
# could also be used to generate comparison of expected vs actual results
class SamplePlan(pa.DataFrameModel):
sample: Series[str] = pa.Field(nullable=False, coerce=True)
source: Optional[Series[str]] = pa.Field(nullable=False, coerce=True)
source: Series[str] = pa.Field(nullable=False, coerce=True)
coverage: Optional[Series[float]] = pa.Field(nullable=False, coerce=True)


Expand Down
Loading

0 comments on commit b068da3

Please sign in to comment.