-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreplica_client.py
151 lines (121 loc) · 4.32 KB
/
replica_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import logging
import socket
import uuid
import pickle
import datetime
import os
import time
import psutil
import sys
import grpc
from proto import helloworld_pb2
from proto import helloworld_pb2_grpc
selfId = ""
pastWindowData = []
history = {}
def sayHello(stub, seq, replica):
global selfId
global pastWindowData
timeAtSender = datetime.datetime.now()
requestId = str(uuid.uuid1())
''' System conditions '''
cpu_percentage = psutil.cpu_percent()
memory_conditions = dict(psutil.virtual_memory()._asdict())
cpu_util_tuple = "(CPUUtil, " + str(cpu_percentage) + ")"
memory_conditions_tuple = "(Memory, " + str(memory_conditions) + ")"
response = stub.SayHelloReplica(
helloworld_pb2.HelloRequestReplica(
replicaId = selfId,
requestId = requestId,
timeAtSender = timeAtSender.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
pastWindowData = pickle.dumps(pastWindowData, 0).decode(),
history = pickle.dumps(history, 0).decode()
)
)
if(requestId == response.requestId):
'''
logging.info(
"[ " + requestId + " ]" +
"[ ServerTime ]" +
response.serverTime
)
'''
responseReceivedTime = datetime.datetime.now()
rtt = str(responseReceivedTime - timeAtSender)
'''
logging.info(
"[ " + requestId + " ]" +
"[ RTT ]" +
rtt
)
'''
pastWindowData.append({"requestId": requestId, "rtt": rtt})
if(replica not in history):
history[replica] = []
history[replica].append({"requestId": requestId, "rtt": rtt})
if(len(history[replica]) > 10):
history[replica].pop(0)
serverTimeTuple = "(ServerTime, " + response.timeAtReceiver + ")"
requestToTuple = "(ReceieverReplica, " + replica + ")"
requestIdTuple = "(RequestId, " + requestId + ")"
rttTuple = "(RTT, " + rtt + ")"
requestSentAtTuple = "(RequestSentAt, " + timeAtSender.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + ")"
responseReceivedTimeTuple = "(ResponseReceivedAt, " + responseReceivedTime.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + ")"
logging.info(
"[ResponseLog]"
+ requestIdTuple + "; "
+ requestToTuple + "; "
+ serverTimeTuple + "; "
+ requestSentAtTuple + "; "
+ responseReceivedTimeTuple + "; "
+ rttTuple + "; "
+ cpu_util_tuple + "; "
+ memory_conditions_tuple + "; "
)
if(len(pastWindowData) > 10):
logging.info(
"[ " + requestId + " ]" +
"[ DeletingPastEntry ]" +
str(pastWindowData.pop(0))
)
return("RTT completed successfully for request id: " + requestId)
def run(port, seq):
# NOTE(gRPC Python Team): .close() is possible on a channel and should be
# used in circumstances in which the with statement does not fit the needs
# of the code.
replicas = ["128.110.219.70", "128.105.144.137", "130.127.134.5", "155.98.38.21"]
for replica in replicas:
if selfId != replica:
print("Seding request to " + replica)
with grpc.insecure_channel(replica + ":" + port) as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
sayHello(stub, seq, replica)
# For local dev
# print("Seding request to 127.0.0.1")
# with grpc.insecure_channel('127.0.0.1:' + port) as channel:
# stub = helloworld_pb2_grpc.GreeterStub(channel)
# sayHello(stub, seq, "127.0.0.1")
if __name__ == '__main__':
path = "logs/"
isExist = os.path.exists(path)
if not isExist:
os.makedirs(path)
port = "50060"
logfilename = sys.argv[1]
logfilepath = "logs/" + logfilename
logging.basicConfig(
filename=logfilepath,
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.DEBUG
)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
selfId = s.getsockname()[0]
s.close()
t_end = time.time() + 60 * 20
seq = 0
while time.time() < t_end:
run(port, seq)
seq += 1