-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathextron_simulator.py
105 lines (91 loc) · 3.68 KB
/
extron_simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/env python2
"""
Minimal simulation of the SIS protocol as implemented by Extron DXP switches.
Really only supports enough of the protocol to talk to dvi_matrix_control.py.
NOTE: This script is just used for development.
It is *not* required to use dvi_matrix_control with a real switch!
"""
from __future__ import print_function
import SocketServer
class ConnectionHandler(SocketServer.BaseRequestHandler):
def handle(self):
self.request.sendall("(c) Copyright 20nn, Extron Electronics DXP DVI-HDMI, Vn.nn, 60-nnnn-01\r\nDdd, DD Mmm YYYY HH:MM:SS\r\n")
try:
print("connected")
buf = ""
cmd = None
wait = False
while True:
if cmd:
if pos > 0:
print("dummy data:", repr(buf[:pos]))
print(cmd + " command:", repr(buf[pos:]))
if response:
self.request.sendall(response)
buf = ""
wait = False
cmd = pos = response = None
c = self.request.recv(1)
if not c:
print("disconnected")
return
buf += c
# print("\x1b[37m" + repr(buf) + "\x1b[0m")
# detect multi-tie command
if buf.endswith("\x1b+Q"):
wait = True
continue
# detect single-tie command
if not(wait) and (buf[-1] in "!&%$"):
pos = buf.rfind("*")
if pos < 1:
print("bogus command:", repr(buf))
buf = ""
continue
while pos and buf[pos-1].isdigit(): pos -= 1
cmd = "single tie"
response = "OutX InY All\r\n"
continue
# detect status command
if not(wait) and (buf in "XINQS"):
cmd = "info"
response = "whatever\r\n"
continue
# check for end-of-line
if buf[-1] != "\n":
continue
# handle multi-tie command
pos = buf.find("\x1b+Q")
if pos >= 0:
cmd = "multi tie"
response = "Qik\r\n"
continue
# handle EDID upload command
edid = buf.find("EDID")
pos = buf.find("\x1bI")
if (pos >= 0) and (edid > pos):
cmd = "EDID upload"
response = "EdidI" + buf[pos+2 : edid] + "\r\n"
bytes_left = 256
while bytes_left > 0:
bytes_left -= len(self.request.recv(bytes_left))
continue
# handle EDID assign command
edid = buf.find("*EDID")
pos = buf.find("\x1bA")
if (pos >= 0) and (edid > pos):
cmd = "EDID assign"
response = "EdidA0*" + buf[pos+2 : edid] + "\r\n"
continue
# handle ordinary end-of-line
if buf.strip():
print("unrecognized command:", repr(buf))
else:
print("whitespace:", repr(buf))
buf = ""
except EnvironmentError as e:
print("connect error:", e)
class TCPServer(SocketServer.TCPServer):
allow_reuse_address = True
if __name__ == "__main__":
TCPServer(("localhost", 2323), ConnectionHandler).serve_forever()