-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrefinement_functions.py
203 lines (157 loc) · 8.14 KB
/
refinement_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import re
from models import SyntaxRule
from typing import Union
def replace_substring_by_match_group(input_str: str, match_group: tuple[re.Match, SyntaxRule]) -> list[tuple[re.Match, SyntaxRule]]:
"""
Convert string into splitted list using match. E.g.:
input_str = 'A321B'
match = <re.Match for '321'>
returns: ['A', <re.Match for '321'>, 'B']
"""
start = input_str.find(match_group[0].group())
end = start + len(match_group[0].group())
this_list = [input_str[:start], match_group, input_str[end:]]
# Remove empty strings
return list(filter(lambda x: x != '', this_list))
def replace_allele_features_with_syntax_rules(syntax_rules: list[SyntaxRule], input_list: list[str, re.Match], match_groups: list[tuple[re.Match, SyntaxRule]], gene: dict) -> list[Union[str, tuple[re.Match, SyntaxRule]]]:
"""
Looks for matches to the regex patterns in `regex_patterns` in the strings in `input_list`,
if `matches` is an empty list. If `matches` is not empty, it uses those matches.
Then, for each match, starting from the longest one, it splits the strings of `input_list` into substrings
and a match object. For example, for regex: \d+ applied to `input_list` ['V320A'], it would return ['V', Match Object matching 320, 'A'].
The function is recursive, since `input_list` changes every time that a match is substituted.
Example input:
regex_patterns = ['\d+', '[a-zA-Z]']
input_list = ['A321B**']
matches = []
returns: [<re.Match for 'A'>, <re.Match for '321'>, <re.Match for 'B'>, '**']
"""
# The output, that will be identical to input_list if no pattern is found.
out_list = list()
for allele_substring in input_list:
# If the element is not a string, we include it as is.
if type(allele_substring) != str:
out_list.append(allele_substring)
continue
# If matches are not provided, we find them with regex, not only the match, but also we check the syntax rule further_check function.
if len(match_groups) == 0:
for syntax_rule in syntax_rules:
match_groups += [(match, syntax_rule) for match in re.finditer(syntax_rule.regex, allele_substring) if syntax_rule.further_check(match.groups(), gene)]
# We sort the matches, to replace the longest matching ones first.
match_groups.sort(key=lambda match_group: len(match_group[0].group()), reverse=True)
for match_group in match_groups:
if match_group[0].group() in allele_substring:
this_list = replace_substring_by_match_group(allele_substring, match_group)
# Recursion
this_list = replace_allele_features_with_syntax_rules(
syntax_rules, this_list, match_groups, gene)
break
else:
# If none of the matches is in the allele_substring, we just return it as is.
this_list = [allele_substring]
out_list += this_list
return out_list
def build_regex2syntax_rule(syntax_rules: list[SyntaxRule]) -> dict[str, SyntaxRule]:
"""
A dictionary in which the keys are the regex patterns and the values are the syntax_rules passed
as arguments.
"""
out_dict = dict()
for syntax_rule in syntax_rules:
out_dict[syntax_rule.regex] = syntax_rule
return out_dict
def get_allele_parts_from_result(result):
"""The result parts, excluding non-digit non-letter characters."""
allele_parts = list()
for r in result:
if type(r) == str:
if not re.match('^[^a-zA-Z\d]+$', r):
allele_parts.append(r)
else:
allele_parts.append(r[0].group())
return allele_parts
def check_allele_description(allele_description, syntax_rules: list[SyntaxRule], allele_type, allowed_types, gene):
"""
Use replace_allele_features to identify patterns based on syntax rules, then validate
the content of those patterns based on the grammar rules, and return output. See the
example from test_data/allele_expected_results.tsv
"""
result = replace_allele_features_with_syntax_rules(syntax_rules, [allele_description], [], gene)
allele_parts = get_allele_parts_from_result(result)
# Extract the matched and unmatched elements
match_groups: list[tuple[re.Match, SyntaxRule]] = list(filter(lambda x: type(x) != str, result))
# We admit any space, comma or semicolon as a valid separator
unmatched = list(filter(lambda x: type(x) == str and not re.match('^[;,\s]+$', x), result))
output_dict = {
'allele_parts': '',
'needs_fixing': True,
'change_description_to': '',
'rules_applied': '',
'pattern_error': '',
'invalid_error': '',
'sequence_error': '',
'change_type_to': ''
}
if len(unmatched):
output_dict['pattern_error'] = ','.join(unmatched)
return output_dict
# Very special case, in which the allele description contains no alphanumeric characters
# and therefore both matches and unmatched are empty (see sort_result function)
if len(match_groups) == 0:
output_dict['pattern_error'] = allele_description
return output_dict
# By default empty strings
allele_part_types = ['' for m in match_groups]
correct_name_list = ['' for m in match_groups]
sequence_error_list = ['' for m in match_groups]
rules_applied = ['' for m in match_groups]
for i, match_group in enumerate(match_groups):
syntax_rule = match_group[1]
groups_from_match = match_group[0].groups()
allele_part_types[i] = syntax_rule.type
rules_applied[i] = f'{syntax_rule.type}:{syntax_rule.rule_name}'
sequence_error_list[i] = syntax_rule.check_sequence(groups_from_match, gene)
correct_name_list[i] = syntax_rule.apply_syntax(groups_from_match)
encountered_types = frozenset(allele_part_types)
correct_type = allowed_types[encountered_types]
if correct_type != allele_type:
output_dict['change_type_to'] = correct_type
correct_name = ','.join(correct_name_list)
if correct_name != allele_description and all(correct_name_list):
output_dict['change_description_to'] = correct_name
output_dict['rules_applied'] = '|'.join(rules_applied) if any(rules_applied) else ''
output_dict['sequence_error'] = '|'.join(sequence_error_list) if any(sequence_error_list) else ''
output_dict['allele_parts'] = '|'.join(allele_parts) if any(allele_parts) else ''
must_be_empty = ['pattern_error', 'invalid_error', 'sequence_error', 'change_description_to', 'change_type_to']
output_dict['needs_fixing'] = any(output_dict[key] for key in must_be_empty)
return output_dict
def seq_error_change_description_to(allele_name, sequence_error):
"""
Apply the proposed coordinate change in sequence_error:
allele_name: 'K35A,P90T', sequence_error: 'K35A>K36A|' ==> returns K36A,P90T
If no `>` in sequence_error or there are more than 1 ==> returns ''
"""
new_allele_parts = list()
for allele_part, error_message in zip(allele_name.split(','), sequence_error.split('|')):
if '>' not in error_message:
new_allele_parts.append(allele_part)
elif error_message.count('>') == 1:
old_val, new_val = error_message.split('>')
new_allele_part = allele_part.replace(old_val, new_val)
if new_allele_part == allele_part:
return ''
new_allele_parts.append(new_allele_part)
else:
return ''
return ','.join(new_allele_parts)
def split_multiple_aa(value, regex):
"""Split into single variants: VLP123AAA => ['V123A', 'L124A', 'P125A']"""
groups = re.match(regex, value).groups()
return [f'{aa1}{int(groups[1])+i}{aa2}' for i, (aa1, aa2) in enumerate(zip(groups[0], groups[2]))]
def join_multiple_aa(values):
"""Opposite of split_multiple_aa"""
# Special case, so that it works when passing an empty dataset to .agg
if len(values) == 0:
return ''
sorted_values = sorted(values, key=lambda x: int(re.search(r'\d+', x).group()))
return ''.join(v[0] for v in sorted_values) + sorted_values[0][1:-1] + ''.join(v[-1] for v in sorted_values)