-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmultiThreaded_nested_loop_join.py
49 lines (46 loc) · 1.79 KB
/
multiThreaded_nested_loop_join.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from fhipe import ipe
from timeit import default_timer as timer
import multiprocessing,resource
import sys
#Thread that queries all rows of the B table for a row that meets the join predicate and has the
# matching join attribute
def indexesMatch(target, indicies, row):
for t in range(0,len(indicies)):
if(target[t] != row[indicies[t]-1]):
return False;
return True;
#Writes to a queue the index of the table with the matching join attribute or -1 otherwise
def join_b_thread(pp, tag2, ct2, b_pt_attributes, b_enc_attributes, q, count, target, indicies):
for y in range(0,len(b_pt_attributes)):
if(indexesMatch(target, indicies, b_pt_attributes[y])):
(tag1, ct1) = b_enc_attributes[y];
if(ipe.decrypt(pp, tag2, ct2) == ipe.decrypt(pp, tag1, ct1)):
q.put(count);
q.put(-1);
# Function that handles the inner join, takes in a vector of encrypted attributes and plaintext
# attributes
def inner_join(a_enc_attributes,a_pt_attributes,b_enc_attributes,b_pt_attributes,pp, target, indicies):
ret = []
threads = [];
queues = [];
#This is the actual join code
for x in range(0,len(a_pt_attributes)):
(tag2, ct2) = a_enc_attributes[x];
q = multiprocessing.Queue()
threads.append(
multiprocessing.Process(
target=join_b_thread,args=(pp, tag2, ct2,b_pt_attributes,b_enc_attributes,q,x, target, indicies)));
queues.append(q);
start = timer()
for t in threads:
t.start();
for q in queues:
ret.append(q.get())
for t in threads:
t.join();
end = timer();
#print((ret))
print("Total memory used "+str(sys.getsizeof(a_enc_attributes)+sys.getsizeof(b_enc_attributes)+sys.getsizeof(a_pt_attributes)+sys.getsizeof(b_pt_attributes))+"bytes")
print("Peak memory usage "+str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss/resource.getpagesize()))
print(str(end - start)+"\n")
print(ret)