-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDetection.py
95 lines (68 loc) · 2.33 KB
/
Detection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import cv2
import asyncio
import websockets
import ast
import base64
import DetectNetConnector as decNet
import json
import time
import jetson.inference
import jetson.utils
#initializing network
connector = decNet.DetectNetConnector()
net = jetson.inference.detectNet("ssd-mobilenet-v2")
#Assigning csi camera as input Source
camera = jetson.utils.videoSource("csi://0") #"/dev/video0"
def reSizeImage():
# Capture image from jetson camera (cuda Image)
image = camera.Capture()
#Resizing the image for Snap (1270*720) --> (480*360)
reSizedImage = jetson.utils.cudaAllocMapped(width = image.width * 0.375, height = image.height * 0.5, format = image.format)
jetson.utils.cudaResize(image, reSizedImage)
return reSizedImage
async def process(websocket,path):
sendImage = False
while not websocket.closed:
sendFrame = False
async for message in websocket:
# deciding whether to send rendered image or not
if message == "sendImage":
sendImage = True
break
if message == "notSendImage":
sendImage = False
break
# ask for image from jetson camera
if message == "frame":
sendFrame = True
image = reSizeImage()
else:
# splitting mime-type from base64
_, img_encoded = message.split(',')
img_decoded = base64.b64decode(img_encoded)
file_name = 'myImage.jpg'
with open (file_name , 'wb') as f:
f.write(img_decoded)
# loading image to CPU
image = jetson.utils.loadImage('myImage.jpg')
# running inference
detections = connector.RunInference(image,net)
# converting CUDA image to Numpy
frame_2=jetson.utils.cudaToNumpy(image)
# openCV uses BGR format
converted_picture = cv2.cvtColor(frame_2, cv2.COLOR_RGB2BGR)
# encoding Numpy image into base64
retval, buffer = cv2.imencode('.jpg', converted_picture)
jpg_as_text = base64.b64encode(buffer)
# sending detections list and image back to client
if sendImage or sendFrame:
response = {"detections":detections, "image":str(jpg_as_text.decode('utf-8'))}
else:
response = {"detections":detections}
response = json.dumps(response)
await websocket.send(response)
async def main ():
async with websockets.serve(process, "0.0.0.0", 4040, ping_interval = None):
await asyncio.Future()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())