-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathumls_tag.py
executable file
·166 lines (138 loc) · 4.69 KB
/
umls_tag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import sys
import json
import time
import argparse
import fileinput
from trie import *
#demarcation of trie branch
_end = "_end_"
#CUI codes that are noisy
stopcuis = set(open("data/stopcuis.txt").read().splitlines())
"""
Create dictionary entry for cui match and preferred term
INPUTS:
cui (str): UMLS CUI code
term (str): preferred UMLS English name
OUTPUTS:
(dict): tagged CUI in dict format {"count": (int), "cui": (str), "pref_name": (str)}
"""
def add_match(cui, term):
match = dict()
match["cui"] = cui
match["pref_name"] = term
match["count"] = 1
return match
"""
Scans a string and checks if a medical term matches occur
INPUTS:
trie (dict of dict): character trie where "_end_" demarcates a branch
word (str): one preprocessed, lowercased document
OUTPUTS:
(lst of dict): tagged CUIs in dict format {"count": (int), "cui": (str), "pref_name": (str)}
"""
def in_trie(trie, word):
cuis = dict()
current_dict = trie
#current matching key, will add characters
c = ""
#checks to make sure the same space is not restarted over and over again (infinite loop)
same_space = -2
#holds the last space
last_space = -1
i = 0
while i < len(word):
letter = word[i]
if letter == " ":
last_space = i
if letter in current_dict:
#very first character
if i == 0:
current_dict = current_dict[letter]
c += letter
else:
#very first start of new match
if len(c) == 0:
#only possible that new match occurs when previous character is a space
if word[i-1] == " ":
current_dict = current_dict[letter]
c += letter
#this is a midword match which we don't want
else:
current_dict = trie
c = ""
#in the middle of matching something, so continue
else:
current_dict = current_dict[letter]
c += letter
else:
#we want to restart at the very last space
current_dict = trie
if last_space != same_space:
#forces character scanning to start at last space
i = last_space
same_space = last_space
else:
#reset same_space
space_space = -2
c = ""
#end of trie is reached aka match
if _end in current_dict:
cui = current_dict[_end]
#not at end of text and not a midword match
if i < len(word)-1 and not word[i+1].isalpha():
#drop 2 letter matches since have been found to be noisy
if len(cui) > 2:
if cui not in stopcuis:
if cui in cuis:
cuis[cui]["count"] += 1
else:
cuis[cui] = add_match(cui, c)
c = ""
i += 1
#edge case for very last character fulfilling a match
if _end in current_dict:
cui = current_dict[_end]
#drop 2 letter matches since have been found to be noisy
if len(cui) > 2:
if cui not in stopcuis:
if cui in cuis:
cuis[cui]["count"] += 1
else:
cuis[cui] = add_match(cui, c)
return list(cuis.values())
"""
Dumps the list of list of dictionaries to json format
INPUTS:
save_path (str): path to save .json
lst (lst): lst dump of tagged CUIs
"""
def save2json(save_path, lst):
f = open(save_path, "w")
json.dump(lst, f, indent=4)
f.close()
def main(arguments):
global arg
parser = argparse.ArgumentParser(
description = __doc__,
formatter_class = argparse.RawDescriptionHelpFormatter)
parser.add_argument('-input', help="input path", type=str)
parser.add_argument('-output', help="output path", type=str)
args = parser.parse_args(arguments)
input_path = args.input
save_path = args.output
start_time = time.time()
cui_trie = create_medical_trie()
print(str(round(time.time() - start_time, 3)), "sec")
print("Tagging terms...")
lst_dump = []
for i, line in enumerate(fileinput.input([input_path])):
if i % 10000 == 0:
print("DOCUMENT:", i)
line = line.strip()
cuis = in_trie(cui_trie, line)
lst_dump.append(cuis)
save2json(save_path, lst_dump)
print("Done!")
print(str(round(time.time() - start_time, 3)), "sec")
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))