-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
399 lines (332 loc) · 12.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
#!/usr/bin/env python3
# vim: set fileencoding=utf-8 :
# TODO: Finish implementing the Counter class in the rest of the code
# TODO: Implement multitasking so that multiple bikes can be unlocked at once
import machine
import time
import hmac
from hashlib import _sha256 as sha256
from contextlib import contextmanager
from lib import Counter
# The hmac library's sha256 is broken, so use our version,
# downloaded from the micropython repo
class Keypad():
"""
An interface between the code and the matrix membrane keypad attached to
the arduino. Able to read input from the keypad.
"""
# The order of the keys as they show up on the keypad, used for determining
# the key value of a specific row/column location.
buttons = (('1', '2', '3'),
('4', '5', '6'),
('7', '8', '9'),
('*', '0', '#'))
def __init__(self, col_pins, row_pins):
"""
Accepts the esp pins for columns and rows, increasing along with the
buttons tuples. Set column pins to output and row pins to input.
"""
self.cols = [machine.Pin(i, machine.Pin.OUT) for i in col_pins]
self.rows = [machine.Pin(i, machine.Pin.IN) for i in row_pins]
@property
def pressed(self):
"""
Returns true if any of the keys on the keypad is pressed. For each
column pin, output a signal. If a row pin is receiving a signal, the
key at the row/column intersection is being pressed, and the
corresponding string in 'buttons' is returned. If no key is pressed,
return None.
"""
for i, col in enumerate(self.cols):
col.value(1)
for j, row in enumerate(self.rows):
if row.value() == 1:
col.value(0)
return self.buttons[j][i]
col.value(0)
return None
def get_next_pressed(self):
"""
Returns the most recently pressed key on the keypad. The first while
loop waits until a key is being pressed, and then the second while loop
waits until the key is released to return.
"""
# Make sure val is the most recently pressed key that is not None
val = self.pressed
while not val:
val = self.pressed
val2 = val
# Sets val2 to the most recently pressed key until no key is being
# pressed (val == None)
while val:
val = self.pressed # get current pressed key
if val:
val2 = val # sets val2 to the most recent key pressed
return val2 # return most recently pressed key
def get_input_message(self):
"""
Stores a list of the keys pressed as a string. Keeps recording key
presses until it finds the escape character, '#', and then return
the message.
"""
message = ""
button = self.get_next_pressed()
while not button == '#':
message += button
button = self.get_next_pressed()
return message
class Key():
"""
Represents a hashed code able to unlock a bike. 12 subcodes are stored,
each one linked to a bike number, any of which can be used to unlock
a specific bike.
"""
def __init__(self, key, n):
"""
Initializes 'keys' a list of 12 codes, one for each bike. With 'key'
being the initial hash code, each code is 'key' + hash('key' + 'bike
number'). 'invaltime' is the timestamp that the key became invalidated
(when the key is skipped, see invalidate_codes() in Pool), or None if
the key is valid. 'n' is the value of the counter that this key was
generated with. This is used to compare keys, to see which one came
first. This is useful when invaliding and removing unused keys.
"""
self.n = n
self.keys = [None] * 12
for i in range(len(self.keys)):
self.keys[i] = key + str(hotp.at(key + str(i)))
self.invaltime = None
print("key done")
def __getitem__(self, i):
return self.keys[i]
def __contains__(self, code):
return code in self.keys
def __str__(self):
s = '['
for i in range(len(self.keys)):
if self.invaltime:
s += 'I'
s += self.keys[i]
if i+1 < len(self.keys):
s += ', '
s += ']'
return s
class HOTP():
"""
An HOTP encryption class, follows the standard guidelines. Use at() to get
an encrypted message.
"""
# The number of digits the returned code will be
codelen = 3
def __init__(self, secret, hash_func=sha256.sha256):
"""
Pass this a secret and a hash function. The function should be sha256
"""
self.secret = secret
self.hash_func = hash_func
def at(self, counter):
"""
Pass a message ('counter') and it returns the HOTP encrypted form, with
the hex form truncated as according to truncate()
"""
return self.truncate(
hmac.new(
self.secret.encode(),
str(counter).encode(),
self.hash_func
).hexdigest())
def truncate(self, hashstr):
"""
Takes a hexdigest from at(), turns it into an integer, and returns the
first number of digits specified by 'self.codelen'
"""
return format(int(hashstr, 16) % (10 ** self.codelen),
'0{}d'.format(self.codelen))
def format(i, args):
"""
Helper method to replace String's format() because it's not supported in
micropython. Accepts an object 'i' and formats according to arguments
'args' as normal format() would do.
"""
argstr = '{:' + args + '}'
return argstr.format(i)
class Pool():
"""
A pool of key objects. Used as backup in case someone requests a code but
never uses it. Should someone request a code, it would be used up on the
server side. If the code is never entered into the bike rack, it would be
forever looking for that code, even when other codes are being distributed
by the server. A pool of available codes is the solution.
"""
counter = Counter("counter")
def __init__(self, size, encryption, inval_time_limit):
"""
Accepts three arguments:
-the size of the pool
-the class used to encrypt the counter, should be HOTP()
-the length of time after which an invalid code will be removed
Initially fills up the pool
"""
self.pool = [None] * size
self.encryption = encryption
self.inval_time_limit = inval_time_limit
self.repopulate()
def repopulate(self):
"""
For each None value in the pool, generate a new key according to the
value of 'counter' and increment the counter. Return the counter so
that the counter object can be updated.
"""
for i in range(len(self.pool)):
if not self.pool[i]:
key = self.encryption.at(self.counter.__get__(self))
self.pool[i] = Key(key, self.counter.__get__(self))
self.counter.__set__(self, self.counter.__get__(self) + 1)
print("Done repopulating pool")
return self.counter.__get__(self)
def remove_key(self, key):
"""
Removes the given key from the pool if discovered, and shifts all items
after it to the left. Successfully removing a key will leave a None
value at the end of the list, so repopulate() will be needed.
"""
found = False
for i in range(len(self.pool)):
if self.pool[i] and self.pool[i].keys is key.keys:
found = True
if found:
if i + 1 >= len(self.pool):
self.pool[i] = None
else:
self.pool[i] = self.pool[i + 1]
def invalidate_codes(self, n):
"""
Invalidates any key whose n value is less than 'n', that is, any key
that was created before the counter reached 'n'. This should be called
whenever a key is accepted (because any keys before the accepted key
will never be used). Invalidates the keys by setting their 'invaltime'
to the current time.
"""
for i in range(len(self.pool)):
key = self.pool[i]
if key and key.invaltime is None and key.n < n:
key.invaltime = time.time()
def accept_code(self, code):
"""
Checks to see if 'code' is in the pool. Iterates through the subkeys
in each key and returns the subkey index (the bike number) if found.
Currently when a key is found, this function removes it from the pool,
removes invalidated codes, invalidates any codes before the accepted
key, and then reopulates the pool, all before returning the bike index.
This means there's a significant time gap as the microcontroller
generates a new key, and should be fixed.
"""
for i in range(len(self.pool)):
key = self.pool[i]
if key:
for j in range(len(key.keys)):
if code == key.keys[j]:
self.remove_key(key)
self.remove_inval_codes()
self.invalidate_codes(key.n)
# update counter here
return j
return None
def remove_inval_codes(self):
"""
Removes any key that has been invalidated for greater than the time
limit, as designated by 'self.inval_time_limit'
"""
i = 0
while i < len(pool):
found = False
key = self.pool[i]
if key and key.invaltime:
diff = time.time() - key.invaltime
if diff >= self.inval_time_limit:
self.remove_key(key)
found = True
if not found:
i += 1
def __len__(self):
return len(self.pool)
def __getitem__(self, index):
return self.pool[index]
def __str__(self):
s = "["
for i in range(len(self.pool)):
s += str(self.pool[i])
if not i+1 == len(self.pool):
s += ", "
s += "]"
return s
class Relay():
"""
An interface for the relay, which physically unlocks the bikes on the rack.
As of currently, no new code can be accepted while a bike is unlocked,
meaning only one bike can be unlocked at the same time. This needs to
change.
"""
def __init__(self, pins):
"""
Sets up the pins connected to the relay as output, as passed by 'pins'.
HIGH is closed, so output a signal for all pins.
"""
self.pins = [machine.Pin(i, machine.Pin.OUT) for i in pins]
for i in range(len(self.pins)):
self.close_motor(i)
def open_motor(self, index):
self.pins[index].value(0)
def close_motor(self, index):
self.pins[index].value(1)
def unlock_bike(self, bike_num, unlocktime=5):
"""
Unlocks the bike of 'bike_num', which corresponds to 'self.pins' index
"""
with self(unlocktime):
time.sleep(unlocktime)
@contextmanager
def __call__(self, bike_num):
self.open_motor(bike_num)
yield
self.close_motor(bike_num)
kp = Keypad((21, 22, 23), (16, 17, 18, 19))
relay = Relay((4, 0, 15, 10, 9, 13, 14, 27, 26, 25, 33, 32))
pin2 = machine.Pin(2, machine.Pin.OUT)
DEBUG = False
if DEBUG:
for bike in range(12):
relay.unlock_bike(bike)
if DEBUG:
# Debug mode
while True:
try:
z = kp.get_input_message()
relay.unlock_bike(int(z) - 1, 1)
except (ValueError, IndexError) as e:
print(e)
print(z)
pin2.value(1)
time.sleep(1)
pin2.value(0)
# Turn ESP32 blue light on for dev testing
pin2.value(1)
hotp = HOTP("ITSAKEY", sha256.sha256)
# set invalid time limit to an hour (3600 seconds)
pool = Pool(10, hotp, 3600)
print(pool)
# Turn ESP32 blue light off to signify setup completion
pin2.value(0)
# Main loop, continually tries to accept input from the keypad, and unlocks the
# bike if the input is accepted as a code.
while True:
bike_index = pool.accept_code(kp.get_input_message())
print(bike_index)
if bike_index is not None:
pin2.value(1)
print(bike_index)
with relay(bike_index):
# TODO / BUG: code freezes during repopulation, impliment async
pool.counter.__set__(pool, pool.repopulate())
print(pool)
pin2.value(0)