Skip to content

Commit

Permalink
modify fq reader
Browse files Browse the repository at this point in the history
  • Loading branch information
kaizhang committed Dec 25, 2024
1 parent f1e7d71 commit e74ad35
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 75 deletions.
2 changes: 1 addition & 1 deletion precellar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ anndata-hdf5 = { git = "https://github.com/kaizhang/anndata-rs.git", rev = "0d27
bed-utils = "0.6"
bwa-mem2 = { git = "https://github.com/regulatory-genomics/bwa-mem2-rust.git", rev = "8de06bcc0a2145fd819232ffb2bf100fb795db30" }
star-aligner = { git = "https://github.com/regulatory-genomics/star-aligner", rev = "4672820b6a2c49ef514f9160e08188928d45a874" }
smallvec = "1.13"
bstr = "1.0"
itertools = "0.13"
indexmap = "2.5"
Expand All @@ -20,7 +21,6 @@ noodles = { version = "0.85", features = ["core", "gtf", "fastq", "bam", "sam",
kdam = "0.5.2"
polars = "0.45"
rayon = "1.10"
smallvec = "1.13"
serde = "1.0"
seqspec = { version = "0.1", workspace = true }
regex = "1.6"
152 changes: 79 additions & 73 deletions precellar/src/align/fastq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use indexmap::IndexMap;
use kdam::{tqdm, BarExt};
use log::info;
use noodles::{bam, fastq};
use rayon::iter::ParallelIterator;
use rayon::slice::ParallelSlice;
use seqspec::{Assay, FastqReader, Modality, Read, RegionId, SegmentInfo, SegmentInfoElem};
use smallvec::SmallVec;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -87,7 +89,7 @@ impl FastqProcessor {
num_threads: u16,
chunk_size: usize,
) -> impl Iterator<Item = Vec<(MultiMapR, Option<MultiMapR>)>> + 'a {
let fq_reader = self.gen_barcoded_fastq(true);
let fq_reader = self.gen_barcoded_fastq(true).with_chunk_size(chunk_size);
let total_reads = fq_reader.total_reads.unwrap_or(0);

let modality = self.modality();
Expand All @@ -103,7 +105,6 @@ impl FastqProcessor {
self.align_qc.insert(modality, qc);

let mut progress_bar = tqdm!(total = total_reads);
let fq_reader = VectorChunk::new(fq_reader, chunk_size);
fq_reader.map(move |data| {
let align_qc = self.align_qc.get_mut(&modality).unwrap();
let results: Vec<_> = aligner.align_reads(num_threads, data);
Expand Down Expand Up @@ -234,19 +235,27 @@ pub struct AnnotatedFastqReader {
buffer: fastq::Record,
total_reads: Option<usize>,
trim_poly_a: bool,
inner: Vec<(FastqAnnotator, FastqReader)>,
annotators: Vec<FastqAnnotator>,
readers: Vec<FastqReader>,
chunk_size: usize,
chunk: Vec<SmallVec<[fastq::Record; 4]>>,
}

impl AnnotatedFastqReader {
pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
self.chunk_size = chunk_size;
self
}

pub fn with_polya_trimmed(mut self) -> Self {
self.trim_poly_a = true;
self
}

pub fn get_all_barcodes(&self) -> Vec<(&str, usize)> {
self.inner
self.annotators
.iter()
.flat_map(|(annotator, _)| {
.flat_map(|annotator| {
annotator
.subregions
.iter()
Expand All @@ -257,9 +266,9 @@ impl AnnotatedFastqReader {
}

pub fn get_all_umi(&self) -> Vec<(&str, usize)> {
self.inner
self.annotators
.iter()
.flat_map(|(annotator, _)| {
.flat_map(|annotator| {
annotator
.subregions
.iter()
Expand All @@ -272,10 +281,10 @@ impl AnnotatedFastqReader {
pub fn is_paired_end(&self) -> bool {
let mut has_read1 = false;
let mut has_read2 = false;
self.inner.iter().for_each(|x| {
x.0.subregions.iter().for_each(|info| {
self.annotators.iter().for_each(|x| {
x.subregions.iter().for_each(|info| {
if info.region_type.is_target() {
if x.0.is_reverse {
if x.is_reverse {
has_read1 = true;
} else {
has_read2 = true;
Expand All @@ -285,53 +294,84 @@ impl AnnotatedFastqReader {
});
has_read1 && has_read2
}

fn read_chunk(&mut self) -> usize {
self.chunk.clear();

let mut accumulated_length = 0;

while accumulated_length < self.chunk_size {
let mut max_read = 0;
let mut min_read = usize::MAX;
let records = self.readers.iter_mut().flat_map(|reader| {
let n = reader
.read_record(&mut self.buffer)
.expect("error reading fastq record");
min_read = min_read.min(n);
max_read = max_read.max(n);
if n > 0 {
accumulated_length += self.buffer.sequence().len();
Some(self.buffer.clone())
} else {
None
}
}).collect();
if max_read == 0 {
break;
} else if min_read == 0 {
panic!("Unequal number of reads in the chunk");
} else {
self.chunk.push(records);
}
}

self.chunk.len()
}
}

impl FromIterator<(FastqAnnotator, FastqReader)> for AnnotatedFastqReader {
fn from_iter<T: IntoIterator<Item = (FastqAnnotator, FastqReader)>>(iter: T) -> Self {
let (annotators, readers): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
let chunk = Vec::new();
Self {
buffer: fastq::Record::default(),
total_reads: None,
inner: iter.into_iter().collect(),
annotators,
readers,
trim_poly_a: false,
chunk_size: 10000000,
chunk,
}
}
}

impl Iterator for AnnotatedFastqReader {
type Item = AnnotatedFastq;
type Item = Vec<AnnotatedFastq>;

fn next(&mut self) -> Option<Self::Item> {
let mut missing = None;
let records: SmallVec<[_; 4]> = self
.inner
.iter_mut()
.flat_map(|(annotator, reader)| {
if reader
.read_record(&mut self.buffer)
.expect("error reading fastq record")
== 0
{
missing = Some(annotator.id.as_str());
None
} else {
Some(annotator.annotate(&self.buffer).unwrap())
}
})
.collect();

if records.is_empty() {
let n = self.read_chunk();
if n == 0 {
None
} else if missing.is_some() {
panic!("Missing records in this file: {}", missing.unwrap());
} else {
let result = records
.into_iter()
.reduce(|mut this, other| {
this.join(other);
this
let n = (n / 256).max(256);
let annotators = &self.annotators;
let result = self
.chunk
.par_chunks(n)
.flat_map_iter(|chunk| {
chunk.into_iter().map(move |records| {
records
.iter()
.enumerate()
.map(|(i, record)| annotators[i].annotate(record).unwrap())
.reduce(|mut this, other| {
this.join(other);
this
})
.unwrap()
})
})
.unwrap();
.collect();
Some(result)
}
}
Expand Down Expand Up @@ -532,40 +572,6 @@ impl AnnotatedFastq {
}
}

pub struct VectorChunk<I> {
inner: I,
chunk_size: usize, // The maximum number of bases in a chunk
}

impl<I> VectorChunk<I> {
pub fn new(inner: I, chunk_size: usize) -> Self {
Self { inner, chunk_size }
}
}

impl<I: Iterator<Item = AnnotatedFastq>> Iterator for VectorChunk<I> {
type Item = Vec<I::Item>;

fn next(&mut self) -> Option<Self::Item> {
let mut chunk = Vec::new();
let mut accumulated_length = 0;

for record in self.inner.by_ref() {
accumulated_length += record.len();
chunk.push(record);
if accumulated_length >= self.chunk_size {
break;
}
}

if chunk.is_empty() {
None
} else {
Some(chunk)
}
}
}

fn slice_fastq_record(record: &fastq::Record, start: usize, end: usize) -> fastq::Record {
fastq::Record::new(
record.definition().clone(),
Expand Down
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ fn make_fastq(
None
};

for (i, record) in fq_reader.enumerate() {
for (i, record) in fq_reader.flatten().enumerate() {
if i % 1000000 == 0 {
py.check_signals().unwrap();
}
Expand Down

0 comments on commit e74ad35

Please sign in to comment.