-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmotion_detector_test.py
executable file
·95 lines (75 loc) · 2.83 KB
/
motion_detector_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020-2021 by Murray Altheim. All rights reserved. This file is part
# of the Robot Operating System project, released under the MIT License. Please
# see the LICENSE file included as part of this package.
#
# author: altheim
# created: 2020-03-31
# modified: 2020-03-31
import time, sys, traceback, itertools
from colorama import init, Fore, Style
init()
from lib.config_loader import ConfigLoader
from lib.event import Event
from lib.logger import Logger, Level
from lib.message_factory import MessageFactory
from lib.devnull import DevNull
from lib.motion_detector import MotionDetector
# ..............................................................................
class MockMessageBus():
'''
This message bus waits for one MOTION_DETECT event.
'''
def __init__(self, level):
super().__init__()
self._log = Logger("mock-queue", Level.INFO)
self._counter = itertools.count()
self._experienced_event = False
# ......................................................
def handle(self, message):
message.number = next(self._counter)
self._log.info('handling message #{}: priority {}: {}'.format(message.number, message.priority, message.description))
event = message.event
if event is Event.MOTION_DETECT:
self._experienced_event = True
else:
self._log.info('other event: {}'.format(event.description))
# ......................................................
def is_complete(self):
return self._experienced_event
# main .........................................................................
def main():
_log = Logger("mot-det test", Level.INFO)
_log.info('starting test...')
_log.info(Fore.YELLOW + Style.BRIGHT + ' INFO : Press Ctrl+C to exit.')
_motion_detector = None
try:
_loader = ConfigLoader(Level.INFO)
filename = 'config.yaml'
_config = _loader.configure(filename)
_message_factory = MessageFactory(Level.INFO)
_message_bus = MockMessageBus(Level.INFO)
_motion_detector = MotionDetector(_config, _message_factory, _message_bus, Level.INFO)
_log.info('starting motion detector...')
_motion_detector.enable()
_counter = itertools.count()
while not _message_bus.is_complete():
if next(_counter) % 5 == 0:
_log.info('waiting for motion... ')
time.sleep(1.0)
time.sleep(5.0)
_motion_detector.close()
_log.info('test complete.')
except KeyboardInterrupt:
if _motion_detector:
_motion_detector.close()
except Exception:
_log.info(traceback.format_exc())
finally:
if _motion_detector:
_motion_detector.close()
if __name__== "__main__":
main()
#EOF