-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathanalysis_procon.py
141 lines (124 loc) · 4.37 KB
/
analysis_procon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding:utf-8 -*-
import os
import json
import logging
import logconfig
import pandas as pd
from Bio import SeqIO
import itertools
import numpy as np
logconfig.setup_logging()
log = logging.getLogger("cov2")
def load_procon_res(res_path):
d1, d2, d3 = [pd.read_csv(i) for i in res_path]
# type1
r1 = {}
for i, row in d1.iterrows():
k = int(row.position[:-1])
r1[k] = row.to_dict()
yield r1
# type2
r2 = {}
for i, row in d2.iterrows():
k = tuple(sorted([int(row.site1[:-1]), int(row.site2[:-1])]))
r2[k] = row.to_dict()
yield r2
# type3
r3 = {}
for i, row in d3.iterrows():
k = tuple(sorted([int(row.site1[:-1]), int(row.site2[:-1]), int(row.site3[:-1])]))
r3[k] = row.to_dict()
yield r3
def check_aa(seq, aa):
position = aa[1:-1]
if position.isdigit():
position = int(position)
else:
print("error", aa)
return False
origin = aa[0].upper()
if seq[position - 1].upper() == origin:
return True
else:
print("error", aa)
return False
if __name__ == '__main__':
# load
variation_file = "./data/总结 - 20220709 - filter.xlsx"
log.info("load")
variation = pd.read_excel(variation_file, sheet_name=1, index_col=0)
variation["Year and month first detected"] = pd.to_datetime(variation["Year and month first detected"])
variation["Year and month first detected"] = variation["Year and month first detected"].dt.strftime('%B %d, %Y')
log.debug("columns: %s", variation.columns)
log.debug(variation)
# remove no clear: Impact on transmissibility Impact on immunity Impact on severity
evidence_columns = ["Impact on transmissibility", "Impact on immunity", "Impact on severity"]
# print(np.unique(variation[evidence_columns].values.reshape((1, -1))))
is_vital = variation[evidence_columns].applymap(lambda x: "increase" in x.lower()).any(axis=1)
variation = variation[is_vital]
log.debug(f"count {variation.shape}")
# check
fasta_file = "./data/YP_009724390.1.txt"
fasta = next(SeqIO.parse(fasta_file, "fasta")).seq
t_fasta = {}
for i, row in variation.iterrows():
aas = row.loc["Spike mutations of interest"].split(" ")
aas = map(lambda x: x.strip(), aas)
# filter
aas = filter(lambda x: check_aa(fasta, x), aas)
aas = list(aas)
t_fasta[i] = row.to_dict()
t_fasta[i]["aas"] = aas
fasta = t_fasta
# load procon result
result_dir = "./data/procon"
result_files = [os.path.join(result_dir, "type{}_parse.csv".format(i)) for i in range(1, 4)]
log.debug("parse to dict")
pr = load_procon_res(result_files)
log.info("parse type1")
pr1 = next(pr)
log.debug("type1: %s", pr1)
for i, row in fasta.items():
aas = row["aas"]
fasta[i]["type1"] = []
fasta[i]["t1"] = {}
for aa in aas:
pst = int(aa[1:-1])
res = pr1.get(pst, None)
if res is None:
res = {"rank": "", "position": aa[1:-1] + aa[0], "information": "", "rate": ""}
res["aa"] = aa
fasta[i].get("type1", ).append(res)
# fasta[i]["t1"][aa] = res
log.debug("type1 result: %s", fasta)
log.info("parse type2")
pr2 = next(pr)
log.debug("type2: %s", pr2)
for i, row in fasta.items():
aas = row["aas"]
fasta[i]["type2"] = []
# fasta[i]["t2"] = {}
for aa2 in itertools.combinations(aas, 2):
pst2 = tuple(sorted([int(a[1:-1]) for a in aa2]))
res = pr2.get(pst2, )
if res is not None:
fasta[i].get("type2").append(res)
# fasta[i]["t2"][tuple(aa2)] = res
log.debug("type2 result: %s", fasta)
log.info("parse type3")
pr3 = next(pr)
for i, row in fasta.items():
aas = row["aas"]
fasta[i]["type3"] = []
# fasta[i]["t3"] = {}
for aa3 in itertools.combinations(aas, 3):
pst3 = tuple(sorted([int(a[1:-1]) for a in aa3]))
res = pr3.get(pst3, None)
if res is not None:
fasta[i].get("type3").append(res)
# fasta[i]["t3"][pst3] = res
log.debug("type3 result: %s", fasta)
log.info("save")
analysis_res_path = "./data/procon/analysis.json"
with open(analysis_res_path, "w") as f:
f.write(json.dumps(fasta, indent="\t"))