forked from agners/micropython-scd30
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscd30.py
211 lines (182 loc) · 7.52 KB
/
scd30.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#from machine import I2C
import time
import struct
class SCD30:
class NotFoundException(Exception):
pass
class CRCException(Exception):
pass
START_CONT_MEASURE = 0x0010
STOP_CONT_MEASURE = 0x0104
SET_MEASURE_INTERVAL = 0x4600
GET_STATUS_READY = 0x0202
READ_MEASUREMENT = 0x0300
SET_ASC = 0x5306
SET_FRC = 0x5204
SET_TEMP_OFFSET = 0x5403
SET_ALT_COMP = 0x5102
GET_FIRMWARE_VER = 0xd100
SOFT_RESET = 0xd304
CLOCK_TIME_US = 10
# Generated using
# crc_table = []
# for crc in range(256):
# for crc_bit in range(8):
# if crc & 0x80:
# crc = (crc << 1) ^ CRC8_POLYNOMIAL;
# else:
# crc = (crc << 1);
# crc = crc%256
# crc_table.append(crc)
CRC_TABLE = [
0, 49, 98, 83, 196, 245, 166, 151, 185, 136, 219, 234, 125, 76, 31, 46,
67, 114, 33, 16, 135, 182, 229, 212, 250, 203, 152, 169, 62, 15, 92, 109,
134, 183, 228, 213, 66, 115, 32, 17, 63, 14, 93, 108, 251, 202, 153, 168,
197, 244, 167, 150, 1, 48, 99, 82, 124, 77, 30, 47, 184, 137, 218, 235,
61, 12, 95, 110, 249, 200, 155, 170, 132, 181, 230, 215, 64, 113, 34, 19,
126, 79, 28, 45, 186, 139, 216, 233, 199, 246, 165, 148, 3, 50, 97, 80,
187, 138, 217, 232, 127, 78, 29, 44, 2, 51, 96, 81, 198, 247, 164, 149,
248, 201, 154, 171, 60, 13, 94, 111, 65, 112, 35, 18, 133, 180, 231, 214,
122, 75, 24, 41, 190, 143, 220, 237, 195, 242, 161, 144, 7, 54, 101, 84,
57, 8, 91, 106, 253, 204, 159, 174, 128, 177, 226, 211, 68, 117, 38, 23,
252, 205, 158, 175, 56, 9, 90, 107, 69, 116, 39, 22, 129, 176, 227, 210,
191, 142, 221, 236, 123, 74, 25, 40, 6, 55, 100, 85, 194, 243, 160, 145,
71, 118, 37, 20, 131, 178, 225, 208, 254, 207, 156, 173, 58, 11, 88, 105,
4, 53, 102, 87, 192, 241, 162, 147, 189, 140, 223, 238, 121, 72, 27, 42,
193, 240, 163, 146, 5, 52, 103, 86, 120, 73, 26, 43, 188, 141, 222, 239,
130, 179, 224, 209, 70, 119, 36, 21, 59, 10, 89, 104, 255, 206, 157, 172
]
def __init__(self, i2c, addr, pause=0.001):
self.i2c = i2c
self.pause = pause
self.addr = addr
while not i2c.try_lock():
pass
if not addr in i2c.scan():
i2c.unlock()
raise self.NotFoundException
i2c.unlock()
def start_continous_measurement(self, ambient_pressure=0):
bint = struct.pack('>H', ambient_pressure)
crc = self.__crc(bint[0], bint[1])
data = bint + bytes([crc])
while not self.i2c.try_lock():
pass
#self.i2c.writeto_mem(self.addr, self.START_CONT_MEASURE, data, addrsize=16)
self.i2c.writeto(self.addr, self.START_CONT_MEASURE.to_bytes(2, 'big')+data)
self.i2c.unlock()
def stop_continous_measurement(self):
self.__write_command(self.STOP_CONT_MEASURE)
def soft_reset(self):
self.__write_command(self.SOFT_RESET)
def get_firmware_version(self):
ver = self.__read_bytes(self.GET_FIRMWARE_VER, 3)
self.__check_crc(ver)
return struct.unpack('BB', ver[0:2])
def read_measurement(self):
measurement = self.__read_bytes(self.READ_MEASUREMENT, 18)
for i in range(0, len(measurement), 3):
self.__check_crc(measurement[i:i+3])
value = measurement[0:]
co2 = struct.unpack('>f', value[0:2] + value[3:5])[0]
value = measurement[6:]
temperature = struct.unpack('>f', value[0:2] + value[3:5])[0]
value = measurement[12:]
relh = struct.unpack('>f', value[0:2] + value[3:5])[0]
return (co2, temperature, relh)
def get_status_ready(self):
ready = self.__read_bytes(self.GET_STATUS_READY, 3)
self.__check_crc(ready)
return struct.unpack('>H', ready[0:2])[0]
def get_measurement_interval(self):
bint = self.__read_bytes(self.SET_MEASURE_INTERVAL, 3)
self.__check_crc(bint)
return struct.unpack('>H', bint)[0]
def set_measurement_interval(self, interval):
bint = struct.pack('>H', interval)
crc = self.__crc(bint[0], bint[1])
data = bint + bytes([crc])
while not self.i2c.try_lock():
pass
#self.i2c.writeto_mem(self.addr, self.SET_MEASURE_INTERVAL, data, addrsize=16)
self.i2c.writeto(self.addr, self.SET_MEASURE_INTERVAL.to_bytes(2, 'big')+data)
self.i2c.unlock()
def get_automatic_recalibration(self):
bint = self.__read_bytes(self.SET_ASC, 3)
self.__check_crc(bint)
return struct.unpack('>H', bint)[0] == 1
def set_automatic_recalibration(self, enable):
bint = struct.pack('>H', 1 if enable else 0)
crc = self.__crc(bint[0], bint[1])
data = bint + bytes([crc])
while not self.i2c.try_lock():
pass
#self.i2c.writeto_mem(self.addr, self.SET_FRC, data, addrsize=16)
self.i2c.writeto(self.addr, self.SET_FRC.to_bytes(2, 'big')+data)
self.i2c.unlock()
def get_forced_recalibration(self):
bint = self.__read_bytes(self.SET_FRC, 3)
self.__check_crc(bint)
return struct.unpack('>H', bint)[0]
def set_forced_recalibration(self, co2ppm):
bint = struct.pack('>H', co2ppm)
crc = self.__crc(bint[0], bint[1])
data = bint + bytes([crc])
while not self.i2c.try_lock():
pass
#self.i2c.writeto_mem(self.addr, self.SET_FRC, data, addrsize=16)
self.i2c.writeto(self.addr, self.SET_FRC.to_bytes(2, 'big')+data)
self.i2c.unlock()
def get_temperature_offset(self):
bint = self.__read_bytes(self.SET_TEMP_OFFSET, 3)
self.__check_crc(bint)
return struct.unpack('>H', bint)[0] / 100.0
def set_temperature_offset(self, offset):
bint = struct.pack('>H', int(offset * 100))
crc = self.__crc(bint[0], bint[1])
data = bint + bytes([crc])
while not self.i2c.try_lock():
pass
#self.i2c.writeto_mem(self.addr, self.SET_TEMP_OFFSET, data, addrsize=16
self.i2c.writeto(self.addr, self.SET_TEMP_OFFSET.to_bytes(2, 'big')+data)
self.i2c.unlock()
def get_altitude_comp(self):
bint = self.__read_bytes(self.SET_ALT_COMP, 3)
self.__check_crc(bint)
return struct.unpack('>H', bint)[0]
def set_altitude_comp(self, altitude):
bint = struct.pack('>H', altitude)
crc = self.__crc(bint[0], bint[1])
data = bint + bytes([crc])
while not self.i2c.try_lock():
pass
#self.i2c.writeto_mem(self.addr, self.SET_ALT_COMP, data, addrsize=16)
self.i2c.writeto(self.addr, self.SET_ALT_COMP.to_bytes(2, 'big')+data)
self.i2c.unlock()
def __write_command(self, cmd):
bcmd = struct.pack('>H', cmd)
while not self.i2c.try_lock():
pass
self.i2c.writeto(self.addr, bcmd)
self.i2c.unlock()
def __read_bytes(self, cmd, count):
self.__write_command(cmd)
time.sleep(self.pause)
while not self.i2c.try_lock():
pass
result = bytearray(count)
self.i2c.readfrom_into(self.addr, result)
self.i2c.unlock()
return result
def __check_crc(self, arr):
assert (len(arr) == 3)
if self.__crc(arr[0], arr[1]) != arr[2]:
raise self.CRCException
def __crc(self, msb, lsb):
crc = 0xff
crc ^= msb
crc = self.CRC_TABLE[crc]
if lsb is not None:
crc ^= lsb
crc = self.CRC_TABLE[crc]
return crc