-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathcamera_screen.py
111 lines (91 loc) · 3.65 KB
/
camera_screen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from kivy.app import App
from kivy.lang import Builder
from kivy.uix.screenmanager import Screen
from kivy.clock import Clock, mainthread
from kivy.properties import StringProperty
from kivy.uix.image import Image
from kivy.core.image import Image as CoreImage
from kivy.logger import Logger, LOG_LEVELS
import io
import urllib.request
import threading
import time
Builder.load_string('''
<MjpegViewer>:
<CameraScreen>:
on_enter: self.start()
on_leave: self.stop()
BoxLayout:
orientation: 'vertical'
MjpegViewer:
id: viewer
Button:
size_hint_y: None
height: 40
text: 'Back'
on_press: root.manager.current= 'main'
''')
class MjpegViewer(Image):
def start(self):
app = App.get_running_app()
self.url = app.camera_url
self.realm = app.config.get('Web', 'camera_realm', fallback=None)
self.user = app.config.get('Web', 'camera_user', fallback=None)
self.pw = app.config.get('Web', 'camera_password', fallback=None)
self.singleshot = app.config.getboolean('Web', 'camera_singleshot', fallback=False)
self.flipy = app.config.getboolean('Web', 'camera_flip_y', fallback=False)
self.flipx = app.config.getboolean('Web', 'camera_flip_x', fallback=False)
self.quit = False
self.t = threading.Thread(target=self._read_stream)
self.t.start()
def stop(self):
self.quit = True
self.t.join()
def _read_stream(self):
Logger.info("MjpegViewer: started thread")
try:
if self.realm and self.user and self.pw:
auth_handler = urllib.request.HTTPBasicAuthHandler()
auth_handler.add_password(realm=self.realm, uri=self.url, user=self.user, passwd=self.pw)
opener = urllib.request.build_opener(auth_handler)
urllib.request.install_opener(opener)
with urllib.request.urlopen(self.url) as stream:
bytes = b''
while not self.quit:
# read in stream until we get the entire frame
bytes += stream.read(1024)
a = bytes.find(b'\xff\xd8')
b = bytes.find(b'\xff\xd9')
if a != -1 and b != -1:
jpg = bytes[a:b + 2]
bytes = bytes[b + 2:]
data = io.BytesIO(jpg)
im = CoreImage(data, ext="jpeg", nocache=True)
self.update_image(im)
if self.singleshot:
# camera only supplies a snapshot not a stream
time.sleep(0.2)
stream = urllib.request.urlopen(self.url)
bytes = b''
except Exception as err:
if hasattr(err, 'code') and err.code == 401:
Logger.error("MjpegViewer: url: {} - requires authentication: {}".format(self.url, err.headers['www-authenticate']))
self.quit = True
Logger.error("MjpegViewer: Failed to open url: {} - error: {}".format(self.url, err))
Logger.info("MjpegViewer: ending thread")
@mainthread
def update_image(self, im):
if self.quit:
return
if im is not None:
self.texture = im.texture
self.texture_size = im.texture.size
if self.flipy:
self.texture.flip_vertical()
if self.flipx:
self.texture.flip_horizontal()
class CameraScreen(Screen):
def start(self):
self.ids.viewer.start()
def stop(self):
self.ids.viewer.stop()