forked from nyucan/can-selfdriving
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcar.py
200 lines (185 loc) · 7.99 KB
/
car.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
""" Project Entrance.
"""
from __future__ import absolute_import, division, print_function
import sys
import io
import struct
import time
import picamera
import threading
import socket
from math import atan, floor
from os.path import join
import numpy as np
import cv2
# from PIL import Image
from control.controller import Controller
from control.processImage import processImage
from util.detect import Detector
from util import img_process
from config import configs
import client
IMG_W = configs['data']['image_width']
IMG_H = configs['data']['image_height']
NUM_OF_POINTS = configs['fitting']['num_of_points']
class Car(object):
""" Offline car-control, with only one thread.
"""
def __init__(self):
self.contorller = Controller()
self.detector = Detector()
self.pre_img_id = -1
self.cur_img_id = -1
@staticmethod
def unpackage_paras(packaged_parameters):
""" Unpackage the parameters.
@Paras:
packaged_parameters: np array
@Returns:
distance_to_center
"""
cur_paras = packaged_parameters[0:13]
w_left, w_right, w_middle = cur_paras[0:3], cur_paras[3:6], cur_paras[6:9]
distance_to_center = cur_paras[9]
distance_at_middle = cur_paras[10]
radian = cur_paras[11]
curvature = cur_paras[12]
stop_signal = (np.all(w_left == np.zeros(3)) and np.all(w_right == np.zeros(3)))
return distance_to_center, distance_at_middle, curvature, stop_signal
@staticmethod
def unpackage_paras_from_buffer(buffer):
""" Unpackage the parameters from buffer.
@Paras:
buffer: str
The recv buffer.
Note that the default recv size should be 112 (np.array(13, dtype=float64))
@Returns:
distance_to_tangent
angle_of_tangent
"""
num_of_paras = floor(len(buffer) / 128)
packaged_parameters = np.frombuffer(buffer, dtype=np.float64).reshape(int(16 * num_of_paras))
if len(packaged_parameters) < 16:
return -1, 0, 0, False
cur_paras = packaged_parameters[0:16]
image_id = int(cur_paras[0])
w_left, w_right = cur_paras[1:4], cur_paras[4:7]
distance_to_tangent = cur_paras[14]
angle_of_tangent = cur_paras[15]
stop_signal = (np.all(w_left == np.zeros(3)) and np.all(w_right == np.zeros(3)))
return image_id, distance_to_tangent, angle_of_tangent, stop_signal
def send_images(self, connection, stream):
""" Send images. Single thread, will block.
Helper function for online mode.
"""
connection.write(struct.pack('<L', stream.tell()))
connection.flush()
stream.seek(0)
connection.write(stream.read())
stream.seek(0)
stream.truncate()
def recv_parameters(self, client_socket):
""" Receive parameters from the server. Single thread, will block.
Helper function for online mode.
"""
buffer = client_socket.recv(1024)
if (buffer is not None):
img_id, d2t, aot, stop_signal = Car.unpackage_paras_from_buffer(buffer)
if img_id <= self.pre_img_id:
return
self.cur_img_id = img_id
if stop_signal:
self.contorller.finish_control()
else:
self.contorller.make_decision_with_policy(1, d2t, aot)
self.pre_img_id = img_id
def run_offline(self, debug=True):
stream = io.BytesIO()
first_start = True
waitting_for_ob = configs['avoid_collision']
with picamera.PiCamera(resolution='VGA') as camera:
with io.BytesIO() as stream:
for _ in camera.capture_continuous(stream, format='jpeg', use_video_port=True):
stream.seek(0)
ori_image = img_process.img_load_from_stream(stream)
# ------------- preprocessing -------------
debug_img = img_process.crop_image(ori_image, 0.45, 0.85)
debug_img = img_process.down_sample(debug_img, (160, 48))
image = img_process.binarize(debug_img)
# -----------------------------------------
# st = time.time()
paras = self.detector.get_wrapped_all_parameters(image)
dc, dm, cur, ss = Car.unpackage_paras(paras)
dis_2_tan, pt = Detector.get_distance_2_tan(paras[6:9])
radian_at_tan = atan(paras[14])
# print('detect time.:', time.time() - st)
if waitting_for_ob:
ob = img_process.detect_obstacle(ori_image)
# display the fitting result in real time
if configs['debug']:
# ------------- 1. display fitting result on the fly -------------
debug_img = img_process.compute_debug_image(debug_img, IMG_W, IMG_H, NUM_OF_POINTS, pt, paras)
img_process.show_img(debug_img)
# ----------------------------------------------------------------
if first_start:
self.contorller.start()
first_start = False
# Control the car according to the parameters
if waitting_for_ob and ob:
ob = False
print("attampting to avoid ...")
# self.contorller.collision_avoid(time.time())
waitting_for_ob = False
else:
# st = time.time()
## 1. ADP
# self.contorller.make_decision_with_policy(1, dis_2_tan, radian_at_tan)
## 2. pure pursuit
# l_d, sin_alpha = Detector.get_distance_angle_pp(paras[6:9])
# self.contorller.make_decision_with_policy(2, l_d, sin_alpha)
## 3. Car following with ADP
d_arc = img_process.detect_distance(ori_image)
print(d_arc)
# self.contorller.make_decision_with_policy(3, dis_2_tan, radian_at_tan, d_arc)
# print('decision time: ', time.time() - st)
## 4. Car followings on stright lane
# self.contorller.make_decision_with_policy(4, d_arc)
## 5. Coupled controller - Car following
self.contorller.make_decision_with_policy(5, d_arc, dis_2_tan, radian_at_tan)
stream.seek(0)
stream.truncate()
def run_online(self, ip, port):
pass
def run_online_single(self, ip, port):
client_socket = socket.socket()
client_socket.connect((ip, port))
connection = client_socket.makefile('wb')
first_start = True
with picamera.PiCamera() as camera:
camera.resolution = (640, 480)
camera.framerate = 30
time.sleep(1)
stream = io.BytesIO()
for _ in camera.capture_continuous(stream, 'jpeg', use_video_port=True):
# start_time = time.time()
self.send_images(connection, stream)
self.recv_parameters(client_socket)
if first_start:
self.contorller.start()
first_start = False
# print('processed img ' + str(self.cur_img_id), time.time() - start_time)
connection.write(struct.pack('<L', 0))
connection.close()
client_socket.close()
def run_as_fast_as_you_can(self):
self.contorller.start()
def stop(self):
self.contorller.finish_control()
self.contorller.cleanup()
if __name__ == '__main__':
try:
car = Car()
time.sleep(1)
car.run_as_fast_as_you_can()
except KeyboardInterrupt:
car.stop()