This repository has been archived by the owner on Jul 14, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhdgim.py
277 lines (212 loc) · 11.8 KB
/
hdgim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import torch
import dna
import math
import random
from torch.utils.data import DataLoader
import torch.nn.functional as F
import scipy.stats as stats
import numpy as np
class HDGIM:
def __init__(self, hypervector_dimension, dna_sequence_length, dna_subsequence_length, bit_precision, noise_probability):
# [begin] hyperparameters
self.hypervector_dimension = hypervector_dimension
self.dna_sequence_length = dna_sequence_length
self.dna_subsequence_length = dna_subsequence_length
self.bit_precision = bit_precision
self.noise_probability = noise_probability
# [end] hyperparameters
self.max_value = pow(2, self.bit_precision) - 1 # max value of quantized hypervector
self.voltage_matrix = None # (max_value + 1)-dim int tensor, denoted M^c in paper
self.dna_sequence = None # 1-dim DNA tensor
self.dna_subsequences = None # 2-dim DNA tensor
self.base_hypervectors = None # dictionary { DNA: tensor }
self.encoded_hypervector = None # 1-dim double tensor
self.encoded_hypervector_library = None # 2-dim double tensor
self.quantized_hypervector = None # 1-dim binary tensor
self.quantized_hypervector_library = None # 2-dim binary tensor
self.noised_quantized_hypervector = None # 1-dim binary tensor
self.dna_dataset = None # DNADataset
def create_voltage_matrix(self):
self.voltage_matrix = torch.ones(self.max_value + 1, self.max_value + 1)
self.voltage_matrix.fill_diagonal_(0)
def create_base_hypervectors(self):
pi = math.pi
self.base_hypervectors = {dna.DNA.A: 2 * pi * torch.rand(self.hypervector_dimension) - pi,
dna.DNA.C: 2 * pi * torch.rand(self.hypervector_dimension) - pi,
dna.DNA.G: 2 * pi * torch.rand(self.hypervector_dimension) - pi,
dna.DNA.T: 2 * pi * torch.rand(self.hypervector_dimension) - pi}
def create_dna_sequence(self):
self.dna_sequence = dna.DNASequence(self.dna_sequence_length)
self.dna_sequence.randomize()
def create_dna_subsequences(self):
self.dna_subsequences = self.dna_sequence.get_subsequences(self.dna_subsequence_length)
def bind(self):
chunk_hypervectors = []
for shift_count, dna_subsequence in enumerate(self.dna_subsequences):
chunk_hypervector = torch.ones(1, self.hypervector_dimension)
for _dna in dna_subsequence:
dna_value = dna.DNA(_dna.item())
base_hypervector = torch.roll(self.base_hypervectors[dna_value], shifts=shift_count, dims=0)
chunk_hypervector = torch.squeeze(torch.mul(chunk_hypervector, base_hypervector))
chunk_hypervectors.append(chunk_hypervector)
self.encoded_hypervector_library = torch.stack(chunk_hypervectors)
self.encoded_hypervector = torch.sum(self.encoded_hypervector_library, dim=0) # bundling hypervectors
def bind_dna_sequence(self, dna_sequence):
chunk_hypervector = torch.ones(1, self.hypervector_dimension)
for shift_count, _dna in enumerate(dna_sequence):
dna_value = dna.DNA(_dna.item())
base_hypervector = torch.roll(self.base_hypervectors[dna_value], shifts=shift_count, dims=0)
chunk_hypervector = torch.squeeze(torch.mul(chunk_hypervector, base_hypervector))
return chunk_hypervector
def quantize_min_max(self):
min_value = torch.min(self.encoded_hypervector)
max_value = torch.max(self.encoded_hypervector)
binary_width = (max_value - min_value) / (self.bit_precision + 1)
self.quantized_hypervector_library = torch.floor((self.encoded_hypervector_library + torch.abs(min_value)) / binary_width)
self.quantized_hypervector = torch.floor((self.encoded_hypervector + torch.abs(min_value)) / binary_width)
def quantize_cdf(self):
sorted_tensor, indices = torch.sort(self.encoded_hypervector)
np_sorted_tensor = sorted_tensor.numpy()
np_normalized_sorted_tensor = (np_sorted_tensor - np_sorted_tensor.mean()) / np_sorted_tensor.std()
binary_width = 1.0 / 2**self.bit_precision
rv = stats.norm(0, 1) # assume standard normal distribution
cdf = rv.cdf(np_normalized_sorted_tensor)
quantized_cdf = np.floor(cdf / binary_width)
self.quantized_hypervector = torch.from_numpy(quantized_cdf)[torch.argsort(indices)]
def quantize_dna_sequence_min_max(self, dna_sequence):
min_value = torch.min(dna_sequence)
max_value = torch.max(dna_sequence)
binary_width = (max_value - min_value) / (self.bit_precision + 1)
return torch.floor((dna_sequence + torch.abs(min_value)) / binary_width)
def quantize_dna_sequence_cdf(self, dna_sequence):
sorted_tensor, indices = torch.sort(dna_sequence)
np_sorted_tensor = sorted_tensor.numpy()
np_normalized_sorted_tensor = (np_sorted_tensor - np_sorted_tensor.mean()) / np_sorted_tensor.std()
binary_width = 1.0 / 2**self.bit_precision
rv = stats.norm(0, 1) # assume standard normal distribution
cdf = rv.cdf(np_normalized_sorted_tensor)
quantized_cdf = np.floor(cdf / binary_width)
return torch.from_numpy(quantized_cdf)[torch.argsort(indices)]
# Assume that left probability is same as right probability
def noise(self):
self.noised_quantized_hypervector = self.quantized_hypervector
for i, value in enumerate(self.quantized_hypervector):
is_change = (self.noise_probability > random.random())
if not is_change:
continue
left_or_right = 0 # 0: left, 1: right
value_int = value.item()
if value_int == 0:
left_or_right = 1
elif value_int == self.max_value:
left_or_right = 0
else:
left_or_right = random.randint(0, 1)
change_value = -1 if left_or_right == 0 else 1
noised_value = value_int + change_value
self.noised_quantized_hypervector[i] = noised_value
def noise_dna_sequence(self, dna_sequence):
for i, value in enumerate(self.dna_sequence.get_sequence()):
is_change = (self.noise_probability > random.random())
if not is_change:
continue
left_or_right = 0 # 0: left, 1: right
value_int = value.item()
if value_int == 0:
left_or_right = 1
elif value_int == self.max_value:
left_or_right = 0
else:
left_or_right = random.randint(0, 1)
change_value = -1 if left_or_right == 0 else 1
noised_value = value_int + change_value
dna_sequence[i] = noised_value
return dna_sequence
def set_dataset(self, dna_dataset):
self.dna_dataset = dna_dataset
def get_similarity_by_voltage_matrix(self, hypervector1, hypervector2): # 0 ~ 1000
distance = 0
for i in range(self.hypervector_dimension):
voltage = self.voltage_matrix[int(hypervector1[i].item())][int(hypervector2[i].item())]
distance += voltage
return -distance
def get_similarity_by_hamming_distance(self, hypervector1, hypervector2):
# Calculate Hamming distance element-wise
# distance = 0
# for i in range(self.hypervector_dimension):
# item1 = int(hypervector1[i].item())
# item2 = int(hypervector2[i].item())
# distance += bin(item1 ^ item2).count('1')
# return -distance
return -torch.sum(torch.abs(hypervector1 - hypervector2))
def get_similarity_by_euclidean_distance(self, hypervector1, hypervector2): # 0 ~ 1
return -torch.dist(hypervector1, hypervector2, 2)
def get_similarity_by_cosine_similarity(self, hypervector1, hypervector2): # -1 ~ 1
return F.cosine_similarity(hypervector1, hypervector2, dim=0)
def train(self, epoch, learning_rate, threshold, f='voltage', full_precision=False, return_data=False, print_info=False):
train_dataset = self.dna_dataset
train_data_loader = DataLoader(train_dataset, batch_size=1, shuffle=False)
accuracies = []
true_similarities = []
false_similarities = []
if print_info:
print("Train size: {}".format(len(train_dataset)))
similarity_function = None
if f == 'voltage':
similarity_function = self.get_similarity_by_voltage_matrix
elif f == 'hamming':
similarity_function = self.get_similarity_by_hamming_distance
elif f == 'euclidean':
similarity_function = self.get_similarity_by_euclidean_distance
elif f == 'cosine':
similarity_function = self.get_similarity_by_cosine_similarity
# train
for _epoch in range(epoch):
success_cnt = 0
true_negative_cnt = 0
true_positive_cnt = 0
false_negative_cnt = 0
false_positive_cnt = 0
true_similarities.append([])
false_similarities.append([])
for i, data in enumerate(train_data_loader):
query = torch.squeeze(data['subsequence'])
encoded_query = self.bind_dna_sequence(query)
quantized_query = self.quantize_dna_sequence_cdf(encoded_query)
similarity = 0
divided_similarity = 0
if full_precision:
similarity = similarity_function(self.encoded_hypervector, encoded_query)
divided_similarity = similarity
else:
similarity = similarity_function(self.noised_quantized_hypervector, quantized_query)
divided_similarity = similarity / self.hypervector_dimension
is_contained = data['isContained'].item()
if (divided_similarity < threshold) and (is_contained is False): # true negative
true_negative_cnt += 1
success_cnt += 1
elif (divided_similarity >= threshold) and (is_contained is True): # true positive
true_positive_cnt += 1
success_cnt += 1
elif (divided_similarity >= threshold) and (is_contained is False): # false negative
self.encoded_hypervector -= learning_rate * encoded_query
self.quantize_cdf()
self.noise()
false_negative_cnt += 1
elif (divided_similarity < threshold) and (is_contained is True): # false positive
self.encoded_hypervector += learning_rate * encoded_query
self.quantize_cdf()
self.noise()
false_positive_cnt += 1
if is_contained is False:
false_similarities[_epoch].append(divided_similarity)
else:
true_similarities[_epoch].append(divided_similarity)
accuracy = round((success_cnt / len(train_data_loader)) * 100, 2)
accuracies.append(accuracy)
if print_info:
print("Epoch {}: Accuracy {}%".format(_epoch, accuracy))
print("Average true similarity: {}, Average false similarity: {}".format(sum(true_similarities[_epoch]) / len(true_similarities[_epoch]), sum(false_similarities[_epoch]) / len(false_similarities[_epoch])))
print("True negative: {}, True positive: {}, False negative: {}, False positive: {}".format(true_negative_cnt, true_positive_cnt, false_negative_cnt, false_positive_cnt))
if return_data:
return accuracies, true_similarities, false_similarities