-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathawox_mqqt_all.py
362 lines (343 loc) · 14.5 KB
/
awox_mqqt_all.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
#!/usr/bin/python -u
# python script to handle awox lamp with json MQTT light
# JSON mapping
# {
# "brightness": 255, - WHITE_BRIGHTNESS
# "color_temp": 155, - COLOR_BRIGHTNESS
# "color": { - COLOR_CODE
# "r": 255,
# "g": 180,
# "b": 200,
# },
# "state": "ON", - NOT USED
# "white_value": int - WHITE_TEMPERATURE
# "effect": "toggle"
# }
# remember:
# - modified python module: python-awox-mesh-light-master
# - do not install bluepy from wheel (--use-no-wheel)
import paho.mqtt.client as mqtt
import awoxmeshlight
import time
import json
import signal
import subprocess
# Global Constants
MAX_RETRY = 1
MAX_CONNECT_TIME = 8 # [sec]
# reduced light brightness
NIGHT_BRIGHTNESS = 35
# full light brightness
FULL_BRIGHTNESS = 126
# White temperature for low light
WHITE_TEMPERATURE = 127
# White temperature for full light
WHITE_TEMPERATURE_FL = 64
# End Global Constants
# Global variables (save data over switches / MQTT messages)
# global White_Brightness
White_Brightness = 0
# global Color_Brightness
Color_Brightness = 0
# global Color_Red
Color_Red = 0
# global Color_Blue
Color_Blue = 0
# global Color_Green
Color_Green = 0
# global White_Temperature
White_Temperature = 0
# input message
home_rgbw1_data = dict()
# message received flag
message_received = 0
# loops since last message
loops_since_last_message = 0
# define my_light
my_light = awoxmeshlight.AwoxMeshLight("XX:XX:XX:XX:XX:XX", "XXXXXXXX", "XXXXXXXX")
# my_light_connected
connected = 0
# last known state of the bulb
bulb_last_known_state = 0
# End Global parameters
# define handler to exit my_light.connect() after some defined time
def handler(signum, frame):
# print "Forever is over!"
raise Exception("end of time")
# state_topic: "home/rgbw1"
# command_topic: "home/rgbw1/set"
# The callback for when the client receives a CONNACK response from the server.
def on_connect(this_client, user_data, flags, rc):
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
this_client.subscribe("home/rgbw1/set", 1)
# The callback for when a PUBLISH message is received from the server.
def on_message(this_client, user_data, msg):
# print(msg.topic + " " + str(msg.payload))
global message_received
message_received = 1
global home_rgbw1_data
home_rgbw1_data = msg.payload
# makes a structs from the binary message
def parse_result(message):
# Format message ints into string of hex
message = "".join("%02x" % b for b in message)
# define result as empty dictionary
result = dict()
# save into the dictionary the original input from the lamp
result['debug'] = message
# read meshID
mesh_id = int(message[6:8], 16)
# read mode
mode = int(message[24:26], 16)
# filter some messages that return something else
if mode < 40 and mesh_id == 0:
# define status
result['status'] = mode % 2
# define mode
result['mode'] = mode
# read whiteTemperature value
result['white_temperature'] = int(message[28:30], 16) # convert to integer value
# read whiteBrightness value
result['white_brightness'] = int(message[26:28], 16) # convert to integer value
# read color code
result['color'] = "#" + message[32:38]
# read colorBrightness
result['color_brightness'] = int(message[30:32], 16) # convert to integer value
if result['mode'] == 1 or mode == 6 or mode == 9:
if result['white_brightness'] > 100:
result['mode_string'] = "bright white"
else:
result['mode_string'] = "dark white"
else:
result['mode_string'] = "color"
return result
# this function will connect to the bulb, and based on current status will change it's settings
def change_bulb_setting(json_msg):
# print("change_bulb setting is called")
# define bulb_respond
bulb_resp = dict()
# what to change:
this_change_white = False
this_change_color = False
if connected == 1:
# "brightness": 255, - WHITE_BRIGHTNESS
if "brightness" in json_msg:
# global parameters to be updated here:
global White_Brightness
# print(int(json_msg["brightness"]))
White_Brightness = int(int(json_msg["brightness"]) * 126 / 255)
this_change_white = True
# "color_temp": 155, - COLOR_BRIGHTNESS
if "color_temp" in json_msg:
# global parameters to be updated here:
global Color_Brightness
Color_Brightness = int(int(json_msg["color_temp"]) * 100 / 500)
this_change_color = True
# "color": { - COLOR_CODE
if "color" in json_msg:
# global parameters to be updated here:
global Color_Red
global Color_Blue
global Color_Green
# read int values from message
Color_Red = (int(json_msg["color"]["r"]))
Color_Blue = (int(json_msg["color"]["b"]))
Color_Green = (int(json_msg["color"]["g"]))
# convert int values to hex without '0x' prefix
this_change_color = True
# print("red: " + str(Color_Red))
# print("green: " + str(Color_Green))
# print("blue: " + str(Color_Blue))
# "white_value": int - WHITE_TEMPERATURE
if "white_value" in json_msg:
global White_Temperature
White_Temperature = int(int(json_msg["white_value"]) * 127 / 255)
this_change_white = True
# print("W:" + str(this_change_white) + " C:" + str(this_change_color))
if this_change_white:
if White_Temperature != 0 and White_Brightness != 0:
# send the message to the bulb
my_light.setWhite(White_Temperature, White_Brightness)
if this_change_color:
if Color_Brightness != 0 and (Color_Red != 0 or Color_Blue != 0 or Color_Green != 0):
my_light.setColor(Color_Red, Color_Green, Color_Blue)
time.sleep(0)
my_light.setColorBrightness(Color_Brightness)
# sleep 1 second
time.sleep(1)
# read out the current settings
my_light.readStatus()
# make the respond readable
bulb_resp = parse_result(my_light.message)
# combine respond texts (respond + retry_cause)
bulb_state = dict()
if connected == 1:
bulb_state["status"] = "ON" # status
else:
bulb_state["status"] = "OFF" # status
bulb_state["r"] = 0
bulb_state["g"] = 0
bulb_state["b"] = 0
bulb_state["color_temp"] = 0
# as colored use, no white parameters
bulb_state["brightness"] = 0
bulb_state["white_value"] = 0
bulb_state["effect"] = "OFF"
if bulb_state["status"] == "ON":
if bulb_resp["mode_string"] == "color":
# global parameters to be updated here:
# create hex() output
red_string = "0x" + bulb_resp["color"][1] + bulb_resp["color"][2]
green_string = "0x" + bulb_resp["color"][3] + bulb_resp["color"][4]
blue_string = "0x" + bulb_resp["color"][5] + bulb_resp["color"][6]
# transform to int
Color_Red = int(red_string, 16)
Color_Green = int(green_string, 16)
Color_Blue = int(blue_string, 16)
# assign state
bulb_state["r"] = Color_Red
bulb_state["g"] = Color_Green
bulb_state["b"] = Color_Blue
bulb_state["color_temp"] = bulb_resp["color_brightness"] # COLOR_BRIGHTNESS
# as colored use, no white parameters
bulb_state["brightness"] = bulb_resp["white_brightness"] # WHITE_BRIGHTNESS
bulb_state["white_value"] = bulb_resp["white_temperature"] # WHITE_TEMPERATURE
bulb_state["effect"] = bulb_resp["mode_string"]
else:
# as white use, no color parameters
bulb_state["r"] = Color_Red
bulb_state["g"] = Color_Green
bulb_state["b"] = Color_Blue
bulb_state["color_temp"] = bulb_resp["color_brightness"] # COLOR_BRIGHTNESS
# assign state
bulb_state["brightness"] = bulb_resp["white_brightness"] # WHITE_BRIGHTNESS
bulb_state["white_value"] = bulb_resp["white_temperature"] # WHITE_TEMPERATURE
bulb_state["effect"] = bulb_resp["mode_string"]
# create output in JSON format
change_bulb_respond = json.dumps({"state": bulb_state["status"], # status
"color_temp": int(bulb_state["color_temp"] * 500 / 100), # COLOR_BRIGHTNESS
"brightness": int(bulb_state["brightness"] * 255 / 126), # WHITE_BRIGHTNESS
"white_value": int(bulb_state["white_value"] * 255 / 127), # WHITE_TEMPERATURE
"effect": bulb_state["effect"],
"color": {
"r": bulb_state["r"],
"g": bulb_state["g"],
"b": bulb_state["b"]}},
sort_keys=True, indent=4, separators=(',', ': '))
# print(change_bulb_respond)
# return the respond text
return change_bulb_respond
client = mqtt.Client("awox_bulb")
client.on_connect = on_connect
client.on_message = on_message
signal.signal(signal.SIGALRM, handler)
client.connect("192.168.XX.XXX")
client.loop_start()
while True:
# client.reconnect()
# client.loop(0.1)
if message_received == 1:
message_received = 0
loops_since_last_message = 0
retries = 0
this_msg = home_rgbw1_data.decode('utf-8')
json_msg_in = json.loads(this_msg)
if "effect" in json_msg_in and connected == 0:
# print("effect requested")
if "toggle" == json_msg_in["effect"]:
print("Bluetooth reset called")
subprocess.call(['sudo', '/usr/local/bin/restart_bluetooth'])
time.sleep(2)
retries = -1
print(time.strftime('%a %H:%M:%S'), json_msg_in)
while retries < MAX_RETRY and connected == 0:
my_light = awoxmeshlight.AwoxMeshLight("XX:XX:XX:XX:XX:XX", "XXXXXXXX", "XXXXXXXX")
# increase retry counter
retries = retries + 1
signal.alarm(MAX_CONNECT_TIME)
try:
my_light.connect()
except Exception:
pass
else:
connected = 1
signal.alarm(0)
if connected == 0:
time.sleep(1)
if bulb_last_known_state == 1:
print("Bluetooth reset called")
subprocess.call(['sudo', '/usr/local/bin/restart_bluetooth'])
time.sleep(2)
if retries == MAX_RETRY:
print("BLE connection failed")
bulb_last_known_state = 0
else:
# signal.alarm(0)
print("connected")
bulb_last_known_state = 1
if "effect" in json_msg_in and connected == 1:
# print("effect requested")
if "toggle" == json_msg_in["effect"]:
# read out the current settings
my_light.readStatus()
# make the respond readable
curr_stat = parse_result(my_light.message)
# mode = 1; 9 means white mode
if curr_stat['mode'] == 1 or curr_stat['mode'] == 9:
# if the current value is not the Night brightness
if abs(curr_stat['white_brightness'] - NIGHT_BRIGHTNESS) > 3:
# set to night mode
set_white_brightness = NIGHT_BRIGHTNESS
set_white_temp = WHITE_TEMPERATURE
# if the current value is the Night brightness
else:
# set to full brightness
set_white_brightness = FULL_BRIGHTNESS
set_white_temp = WHITE_TEMPERATURE_FL
# if not white mode, than set the night mode whiteness
else:
# set to night mode
set_white_brightness = NIGHT_BRIGHTNESS
set_white_temp = WHITE_TEMPERATURE
this_msg = json.dumps({"state": "ON", # status
"brightness": int(set_white_brightness * 255 / 126),
"white_value": int(set_white_temp * 255 / 127)},
sort_keys=True, indent=4, separators=(',', ': '))
elif "bright white" == json_msg_in["effect"]:
# set to full brightness
set_white_brightness = FULL_BRIGHTNESS
set_white_temp = WHITE_TEMPERATURE_FL
this_msg = json.dumps({"state": "ON", # status
"brightness": int(set_white_brightness * 255 / 126),
"white_value": int(set_white_temp * 255 / 127)},
sort_keys=True, indent=4, separators=(',', ': '))
elif "dark white" == json_msg_in["effect"]:
# set to night mode
set_white_brightness = NIGHT_BRIGHTNESS
set_white_temp = WHITE_TEMPERATURE
this_msg = json.dumps({"state": "ON", # status
"brightness": int(set_white_brightness * 255 / 126),
"white_value": int(set_white_temp * 255 / 127)},
sort_keys=True, indent=4, separators=(',', ': '))
else:
this_msg = json.dumps({"state": "ON"},
sort_keys=True, indent=4, separators=(',', ': '))
print(this_msg)
json_msg_in = json.loads(this_msg)
status = change_bulb_setting(json_msg_in)
# print(status)
client.publish("home/rgbw1", status, 1, True)
else:
if loops_since_last_message < 33:
loops_since_last_message += 1
else:
if connected == 1:
my_light.disconnect()
my_light = None
print("disconnected")
connected = 0
# client.loop_stop()
time.sleep(0.3)
# client.reconnect()
# client.loop_start()