-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfoo_client.py
125 lines (96 loc) · 3.16 KB
/
foo_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# -*- coding: utf-8 -*-
"""
Created on Thu Jun 08 19:44:03 2017
@author: John Smith
"""
import time
import socket
import sys
import numpy as np
import exceptions as exc
import string
import inputs
import foo_common as foo
class AxisInfo:
def __init__(self,_min,_max,cap_min, cap_max):
if _min >= _max:
raise exc.Exception("min is not less than max")
self.min = _min
self.max = _max
self.cap_min = cap_min
self.cap_max = cap_max
self.multipier = (cap_max-cap_min)/((self.max-self.min)*1.0)
self.val_cur = self.transform(self.min)
def transform(self,val_in):
return int((val_in-self.min)*self.multipier+self.cap_min)
def update(self,val_in):
self.val_cur = self.transform(val_in)
def set_val_cure(self,val_cur):
self.val_cur = val_cur
def get_val_cur(self):
return self.val_cur
def set_multiplier(self, multipier):
self.multipier = multipier
def get_multiplier(self):
return self.multipier
def init_axis_info(range):
axis_info = {}
for axis in foo.ANALOG:
axis_info[axis] = (AxisInfo(foo.MINMAX[axis][0],foo.MINMAX[axis][1],-foo.SYMMETRIC[axis]*range/(1+foo.SYMMETRIC[axis]),range/(1+foo.SYMMETRIC[axis])))
axis_info[axis].set_multiplier(axis_info[axis].get_multipier()*(1+foo.SYMMETRIC)
return axis_info
def read_gamepad(gamepad, axis_info, cur_vals):
events = gamepad.read()
for event in events:
if 'Absolute' in event.ev_type:
if event.code in foo.ANALOG: # and abs(event.state) > 2000:
val_new = axis_info[event.code].transform(event.state)
cur_vals[event.code] = val_new
elif 'Key' in event.ev_type:
pass
elif 'Sync' in event.ev_type:
pass
else:
print "-----",event.ev_type
def create_msg(cur_vals, deadzone):
return string.join([str((deadzone < cur_vals[x])*cur_vals[x]) for x in foo.ANALOG],";")
gamepad = inputs.devices.gamepads[0]
address = '192.168.1.32'
port = 9999
freq_1 = 60 # Hz
freq_2 = 120 # Hz
period_1 = 1.0/freq_1
period_2 = 1.0/freq_2
deadzonePerc = 0.1
span = 100
axis_info = init_axis_info(span)
cur_vals = {'ABS_RX': 0,
'ABS_RY': 0,
'ABS_RZ': 0,
'ABS_X': 0,
'ABS_Y': 0,
'ABS_Z': 0}
deadzone = deadzonePerc * span
t0 = time.clock()
t_1 = t0
while True:
t = time.clock()
if t - t_1 > period_1:
t_1 = t
message = create_msg(cur_vals,deadzone)
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_address = (address, port)
sock.connect(server_address)
try:
# Send data
message = create_msg(cur_vals, deadzone)
print >> sys.stderr, 'sending "%s"' % message
sock.sendall(message)
response = sock.recv(1)
print >> sys.stderr, 'success: "%s"' % response
finally:
print >> sys.stderr, 'closing socket'
sock.close()
read_gamepad(gamepad, axis_info, cur_vals)