-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmake_consensus_sites.py
126 lines (102 loc) · 4.54 KB
/
make_consensus_sites.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""
Reads all alignment files in the input directory and creates a consensus
sequence for each alignment file. In addition, it makes a "merged" consensus
sequences using all the sites of a certain type (e.g. all attB sites), excluding
the overlap sequence, common to all att sites of a given number
(e.g. all attX1 sites contain `twtGTACAAAaaa` as the overlap sequence).
"""
import os
import glob
import re
from Bio import AlignIO
from Bio.Align import MultipleSeqAlignment
from Bio.Data.IUPACData import ambiguous_dna_values
from make_combinatorial_att_sites import overlap_dict, compute_regex_site
# We create a dictinary that maps a set of bases to the ambibuous base
# that represents them.
ambiguous_base_dict = {}
for ambiguous, bases in ambiguous_dna_values.items():
ambiguous_base_dict["".join(sorted(bases))] = ambiguous
def validate_consensus_sequence(
consensus: str, alignment: MultipleSeqAlignment, site_name: str
):
pattern = compute_regex_site(consensus)
for seq in alignment:
if seq.id.startswith(site_name) and re.search(pattern, str(seq.seq)) is None:
raise ValueError(
f"{site_name}: Consensus sequence {consensus} does not match sequence from {seq.id}: {seq.seq}"
)
elif site_name.endswith("x") and re.search(pattern, str(seq.seq)) is None:
raise ValueError(
f"{site_name}: Consensus sequence {consensus} does not match sequence from {seq.id}: {seq.seq}"
)
def get_consensus_base(all_letters: str) -> str:
if "-" in all_letters:
return "-"
key = "".join(sorted(set(*[all_letters])))
return ambiguous_base_dict[key]
def get_consensus_sequence(alignment: MultipleSeqAlignment):
consensus = ""
for column in range(alignment.alignment.shape[1]):
all_letters = alignment[:, column]
consensus += get_consensus_base(all_letters)
# Remove flanking - characters
consensus = re.sub(r"^-+", "", consensus)
consensus = re.sub(r"-+$", "", consensus)
# Remove flanking N characters
consensus = re.sub(r"^N+", "", consensus)
consensus = re.sub(r"N+$", "", consensus)
# There should be no - left in the consensus
if "-" in consensus:
raise ValueError(f"Consensus sequence contains '-': {consensus}")
return consensus
def main(input_dir: str, output_file: str):
out_dict = {}
for alignment_file in glob.glob(f"{input_dir}/*.clu"):
site_name = os.path.basename(alignment_file).split(".")[0]
alignment = AlignIO.read(alignment_file, "clustal")
try:
consensus = get_consensus_sequence(alignment)
except ValueError as e:
raise ValueError(f"Error getting consensus for {site_name}: {e}")
validate_consensus_sequence(consensus, alignment, site_name)
out_dict[site_name] = consensus
# These are alignments that contain all sites of a certain type
# (e.g. attB1, attB2, etc.)
# We make a consensus where the only differing sequence
# is the overlap sequence
if site_name.endswith("x"):
# This is the consensus of the overlap sequence present in all sites
# (capital letters in overlap_dict)
# twtGTACAAAaaa
# twtGTACAAGaaa
# twtGTATAATaaa
# twtGTATAGAaaa
# twtGTATACAaaa
# GTAYAVD
core_consensus = "GTAYAVD"
# Should be present only once in the consensus
if len(re.findall(core_consensus, consensus)) != 1:
raise ValueError(f"Expected 1 core sequence for {site_name}")
for num in range(1, 6):
core = overlap_dict[str(num)][3:-3]
merged_consensus = consensus.replace(core_consensus, core)
merged_site = f"merged_{site_name[:-1]}{num}"
validate_consensus_sequence(
merged_consensus, alignment, f"{site_name[:-1]}{num}"
)
out_dict[merged_site] = merged_consensus
# Sort the sites alphabetically
out_dict = dict(sorted(out_dict.items()))
with open(output_file, "w") as f:
for site_name, consensus in out_dict.items():
f.write(f"{site_name}\t{consensus}\n")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input-dir", type=str, default="results/alignment")
parser.add_argument(
"--output-file", type=str, default="results/consensus_sites.tsv"
)
args = parser.parse_args()
main(args.input_dir, args.output_file)