-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathdrive.py
75 lines (59 loc) · 2.15 KB
/
drive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import argparse
import base64
import numpy as np
import socketio
import eventlet.wsgi
from PIL import Image
from flask import Flask
from io import BytesIO
import os
from keras.models import model_from_json
import warnings
# Fix error with Keras and TensorFlow
import tensorflow as tf
tf.python.control_flow_ops = tf
from data import preprocess
sio = socketio.Server()
app = Flask(__name__)
model = None
prev_image_array = None
@sio.on('telemetry')
def telemetry(sid, data):
# The current steering angle of the car
steering_angle = data["steering_angle"]
# The current throttle of the car
throttle = data["throttle"]
# The current speed of the car
speed = data["speed"]
# The current image from the center camera of the car
imgString = data["image"]
image = Image.open(BytesIO(base64.b64decode(imgString)))
image_array = preprocess(np.asarray(image))
transformed_image_array = image_array[None, :, :, :]
steering_angle = float(model.predict(transformed_image_array, batch_size=1))
throttle = .2 if float(speed) > 5 else 1.
print(steering_angle, throttle)
send_control(steering_angle, throttle)
@sio.on('connect')
def connect(sid, environ):
print("connect ", sid)
send_control(0, 0)
def send_control(steering_angle, throttle):
sio.emit("steer", data={
'steering_angle': steering_angle.__str__(),
'throttle': throttle.__str__()
}, skip_sid=True)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Remote Driving')
parser.add_argument('model', type=str,
help='Path to model definition json. Model weights should be on the same path.')
args = parser.parse_args()
with open(args.model, 'r') as jfile:
model = model_from_json(jfile.read())
model.compile("adam", "mse")
weights_file = args.model.replace('json', 'h5')
model.load_weights(weights_file)
# wrap Flask application with engineio's middleware
app = socketio.Middleware(sio, app)
# deploy as an eventlet WSGI server
eventlet.wsgi.server(eventlet.listen(('', 4567)), app)