Skip to content

Commit

Permalink
implement process/normalize_reads()
Browse files Browse the repository at this point in the history
  • Loading branch information
Gordon J. Köhn committed Jan 16, 2025
1 parent 31edc97 commit 5fc82ec
Show file tree
Hide file tree
Showing 6 changed files with 15,630 additions and 28 deletions.
107 changes: 107 additions & 0 deletions src/sr2silo/process/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import logging
import re
import tempfile
from pathlib import Path

Expand All @@ -27,3 +29,108 @@ def bam_to_sam(bam_file: Path) -> str:
temp_sam.seek(0)
temp_sam_content = temp_sam.read().decode()
return temp_sam_content


def parse_cigar(cigar: str) -> list[tuple[str, int]]:
"""
Parse a cigar string into a list of tuples.
Args:
cigar: A string representing the cigar string.
Returns:
A list of tuples where the first element is the operation
type and the second is the length of the operation.
Credits: adapted from David Gicev @davidgicev
"""
pattern = re.compile(r"(\d+)([MIDNSHP=X])")

parsed_cigar = pattern.findall(cigar)

return [(op, int(length)) for length, op in parsed_cigar]


def normalize_reads(sam_data: str, output_fasta: Path, output_insertions: Path) -> None:
"""
Normalize (to clear text sequence using CIGAR)
all reads in a SAM file output FASTA and insertions files.
Note that the input SAM file must be read in
its entirety before calling this function,
whilst the output files can be written incrementally.
Args:
sam_data: A file-like object containing SAM formatted data.
Returns:
A string with merged, normalized reads in FASTA format.
TODO: annotate the output format
Credits: adapted from David Gicev @davidgicev
"""

logging.warning(
"pair_normalize_reads: Nuliotide Insertions are not yet implemented, "
"{output_insertions} will be empty."
)

unpaired = dict()

with output_fasta.open("w") as fasta_file, output_insertions.open(
"w"
) as insertions_file:
for line in sam_data.splitlines():
if line.startswith("@"):
continue

fields = line.strip().split("\t")

qname = fields[0] # Query template NAME
pos = int(fields[3]) # 1-based leftmost mapping position
cigar = parse_cigar(fields[5]) # cigar string
seq = fields[9] # segment sequence
qual = fields[10] # ASCII of Phred-scaled base quality + 33

result_sequence = ""
result_qual = ""
index = 0
inserts = []

for operation in cigar:
ops_type, count = operation
if ops_type == "S":
index += count
continue
if ops_type == "M":
result_sequence += seq[index : index + count]
result_qual += qual[index : index + count]
index += count
continue
if ops_type == "D":
result_sequence += "-" * count
result_qual += "!" * count
continue
if ops_type == "I":
inserts.append((index + pos, seq[index : index + count]))
index += count
continue

read = {
"pos": pos,
"cigar": cigar,
"RESULT_seqUENCE": result_sequence,
"RESULT_qual": result_qual,
"insertions": inserts,
}

fasta_file.write(f">{qname}|{read['pos']}\n{read['RESULT_seqUENCE']}\n")

insertions = read["insertions"].copy()
# insertion_index = read["pos"] + len(read["RESULT_seqUENCE"])

insertions_file.write(f"{qname}\t{insertions}\n")

for read_id, unpaired_read in unpaired.items():
fasta_file.write(
f">{read_id}|{unpaired_read['pos']}\n{unpaired_read['RESULT_seqUENCE']}\n"
)
insertions_file.write(f"{read_id}\t{unpaired_read['insertions']}\n")
20 changes: 1 addition & 19 deletions src/sr2silo/process/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,9 @@
from __future__ import annotations

import logging
import re
from pathlib import Path


def parse_cigar(cigar: str) -> list[tuple[str, int]]:
"""
Parse a cigar string into a list of tuples.
Args:
cigar: A string representing the cigar string.
Returns:
A list of tuples where the first element is the operation
type and the second is the length of the operation.
Credits: adapted from David Gicev @davidgicev
"""
pattern = re.compile(r"(\d+)([MIDNSHP=X])")

parsed_cigar = pattern.findall(cigar)

return [(op, int(length)) for length, op in parsed_cigar]
from sr2silo.process.convert import parse_cigar


def pair_normalize_reads(
Expand Down
46 changes: 46 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,36 @@ def unit_test_mocks(monkeypatch: None):

# Define test data paths
TEST_DATA_DIR = Path(__file__).parent / "data"

INPUT_BAM_PATH = TEST_DATA_DIR / "REF_aln_trim_subsample.bam"
EXPECTED_SAM_PATH = TEST_DATA_DIR / "REF_aln_trim_subsample_expected.sam"


LARGE_TEST_DATA_DIR = (
TEST_DATA_DIR
/ "samples_large"
/ "A1_05_2024_10_08/20241024_2411515907"
/ "alignments"
)
INPUT_BAM_INSERTIONS_PATH = LARGE_TEST_DATA_DIR / "REF_aln_trim.bam"
EXPECTED_BAM_INSERTIONS_PATH_inserts = (
LARGE_TEST_DATA_DIR / "REF_aln_trim_subsample_insertions.fasta"
)
EXPECTED_BAM_INSERTIONS_PATH_cleartext = (
LARGE_TEST_DATA_DIR / "REF_aln_trim_subsample.fasta"
)


@pytest.fixture
def bam_data() -> dict:
"""Return a sample BAM data path and its corresponding SAM data as a string."""
dict_data = dict()
# read in these files as test
dict_data["bam_path"] = INPUT_BAM_PATH
with open(EXPECTED_SAM_PATH) as f:
dict_data["sam_data"] = f.read()

return dict_data


@pytest.fixture
Expand All @@ -43,6 +72,23 @@ def sam_data():
return bam_to_sam(INPUT_BAM_PATH)


@pytest.fixture
def sam_with_insert_data():
"""Return a sample SAM data as a string with insertions."""

data_expected = dict()
data_expected["sam_data"] = bam_to_sam(INPUT_BAM_INSERTIONS_PATH)
# Read in the insertions file
insertions_content = EXPECTED_BAM_INSERTIONS_PATH_inserts.read_text()
data_expected["insertions"] = insertions_content

# Read in the cleartext file
cleartext_content = EXPECTED_BAM_INSERTIONS_PATH_cleartext.read_text()
data_expected["cleartext"] = cleartext_content

return data_expected


@pytest.fixture
def temp_dir():
"""Return a temporary directory as a Path object."""
Expand Down
Loading

0 comments on commit 5fc82ec

Please sign in to comment.