-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutilities.py
76 lines (54 loc) · 2.84 KB
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import argparse
from kortex_api.TCPTransport import TCPTransport
from kortex_api.UDPTransport import UDPTransport
from kortex_api.RouterClient import RouterClient, RouterClientSendOptions
from kortex_api.SessionManager import SessionManager
from kortex_api.autogen.messages import Session_pb2
def parseConnectionArguments(parser = argparse.ArgumentParser()):
parser.add_argument("--ip", type=str, help="IP address of destination", default="192.168.0.10") #CHANGE IP HERE
parser.add_argument("-u", "--username", type=str, help="username to login", default="admin")
parser.add_argument("-p", "--password", type=str, help="password to login", default="admin")
return parser.parse_args()
class DeviceConnection:
TCP_PORT = 10000
UDP_PORT = 10001
@staticmethod
def createTcpConnection(args):
"""
returns RouterClient required to create services and send requests to device or sub-devices,
"""
return DeviceConnection(args.ip, port=DeviceConnection.TCP_PORT, credentials=(args.username, args.password))
@staticmethod
def createUdpConnection(args):
"""
returns RouterClient that allows to create services and send requests to a device or its sub-devices @ 1khz.
"""
return DeviceConnection(args.ip, port=DeviceConnection.UDP_PORT, credentials=(args.username, args.password))
def __init__(self, ipAddress, port=TCP_PORT, credentials = ("","")):
self.ipAddress = ipAddress
self.port = port
self.credentials = credentials
self.sessionManager = None
# Setup API
self.transport = TCPTransport() if port == DeviceConnection.TCP_PORT else UDPTransport()
self.router = RouterClient(self.transport, RouterClient.basicErrorCallback)
# Called when entering 'with' statement
def __enter__(self):
self.transport.connect(self.ipAddress, self.port)
if (self.credentials[0] != ""):
session_info = Session_pb2.CreateSessionInfo()
session_info.username = self.credentials[0]
session_info.password = self.credentials[1]
session_info.session_inactivity_timeout = 10000 # (milliseconds)
session_info.connection_inactivity_timeout = 2000 # (milliseconds)
self.sessionManager = SessionManager(self.router)
print("Logging as", self.credentials[0], "on device", self.ipAddress)
self.sessionManager.CreateSession(session_info)
return self.router
# Called when exiting 'with' statement
def __exit__(self, exc_type, exc_value, traceback):
if self.sessionManager != None:
router_options = RouterClientSendOptions()
router_options.timeout_ms = 1000
self.sessionManager.CloseSession(router_options)
self.transport.disconnect()