forked from huehueteotl/raspipool
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathAtlasI2C.py
198 lines (165 loc) · 6 KB
/
AtlasI2C.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/python
# Code from https://github.com/Atlas-Scientific/Raspberry-Pi-sample-code
# TODO: might be better to have a git submodule here.
import io
import sys
import fcntl
import time
import copy
import string
class AtlasI2C:
# the timeout needed to query readings and calibrations
LONG_TIMEOUT = 1.5
# timeout for regular commands
SHORT_TIMEOUT = .3
# the default bus for I2C on the newer Raspberry Pis,
# certain older boards use bus 0
DEFAULT_BUS = 1
# the default address for the sensor
DEFAULT_ADDRESS = 98
LONG_TIMEOUT_COMMANDS = ("R", "CAL")
SLEEP_COMMANDS = ("SLEEP", )
def __init__(self, address=None, moduletype = "", name = "", bus=None):
'''
open two file streams, one for reading and one for writing
the specific I2C channel is selected with bus
it is usually 1, except for older revisions where its 0
wb and rb indicate binary read and write
'''
self._address = address or self.DEFAULT_ADDRESS
self.bus = bus or self.DEFAULT_BUS
self._long_timeout = self.LONG_TIMEOUT
self._short_timeout = self.SHORT_TIMEOUT
self.file_read = io.open(file="/dev/i2c-{}".format(self.bus),
mode="rb",
buffering=0)
self.file_write = io.open(file="/dev/i2c-{}".format(self.bus),
mode="wb",
buffering=0)
self.set_i2c_address(self._address)
self._name = name
self._module = moduletype
@property
def long_timeout(self):
return self._long_timeout
@property
def short_timeout(self):
return self._short_timeout
@property
def name(self):
return self._name
@property
def address(self):
return self._address
@property
def moduletype(self):
return self._module
def set_i2c_address(self, addr):
'''
set the I2C communications to the slave specified by the address
the commands for I2C dev using the ioctl functions are specified in
the i2c-dev.h file from i2c-tools
'''
I2C_SLAVE = 0x703
fcntl.ioctl(self.file_read, I2C_SLAVE, addr)
fcntl.ioctl(self.file_write, I2C_SLAVE, addr)
self._address = addr
def write(self, cmd):
'''
appends the null character and sends the string over I2C
'''
cmd += "\00"
self.file_write.write(cmd.encode('latin-1'))
def handle_raspi_glitch(self, response):
'''
Change MSB to 0 for all received characters except the first
and get a list of characters
NOTE: having to change the MSB to 0 is a glitch in the raspberry pi,
and you shouldn't have to do this!
'''
if self.app_using_python_two():
return list(map(lambda x: chr(ord(x) & ~0x80), list(response)))
else:
return list(map(lambda x: chr(x & ~0x80), list(response)))
def app_using_python_two(self):
return sys.version_info[0] < 3
def get_response(self, raw_data):
if self.app_using_python_two():
response = [i for i in raw_data if i != '\x00']
else:
response = raw_data
return response
def response_valid(self, response):
valid = True
error_code = None
if(len(response) > 0):
if self.app_using_python_two():
error_code = str(ord(response[0]))
else:
error_code = str(response[0])
if error_code != '1': #1:
valid = False
return valid, error_code
def get_device_info(self):
if(self._name == ""):
return self._module + " " + str(self.address)
else:
return self._module + " " + str(self.address) + " " + self._name
def read(self, num_of_bytes=31):
'''
reads a specified number of bytes from I2C, then parses and displays the result
'''
raw_data = self.file_read.read(num_of_bytes)
response = self.get_response(raw_data=raw_data)
#print(response)
is_valid, error_code = self.response_valid(response=response)
if is_valid:
char_list = self.handle_raspi_glitch(response[1:])
# get rid of unwanted null chars
string_response = str(''.join(char_list)).replace('\x00','')
result = "Success " + self.get_device_info() + ": " + string_response
#result = "Success: " + str(''.join(char_list))
else:
result = "Error " + self.get_device_info() + ": " + error_code
return result
def get_command_timeout(self, command):
timeout = None
if command.upper().startswith(self.LONG_TIMEOUT_COMMANDS):
timeout = self._long_timeout
elif not command.upper().startswith(self.SLEEP_COMMANDS):
timeout = self.short_timeout
return timeout
def query(self, command):
'''
write a command to the board, wait the correct timeout,
and read the response
'''
print("QUERRY " + command)
self.write(command)
current_timeout = self.get_command_timeout(command=command)
if not current_timeout:
return "sleep mode"
else:
time.sleep(current_timeout)
response = self.read()
print("RESPONSE " + response)
return response
def close(self):
self.file_read.close()
self.file_write.close()
def list_i2c_devices(self):
'''
save the current address so we can restore it after
'''
prev_addr = copy.deepcopy(self._address)
i2c_devices = []
for i in range(0, 128):
try:
self.set_i2c_address(i)
self.read(1)
i2c_devices.append(i)
except IOError:
pass
# restore the address we were using
self.set_i2c_address(prev_addr)
return i2c_devices