-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathevogateway.py
executable file
·1277 lines (1021 loc) · 54.4 KB
/
evogateway.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import asyncio
import json
from platform import platform
import sys
import traceback
import re
import glob
from typing import Tuple
from signal import SIGINT, SIGTERM
import os
import inspect
import configparser
import paho.mqtt.client as mqtt
import time
import datetime
from threading import Timer
from datetime import timedelta as td
from types import SimpleNamespace
from colorama import init as colorama_init, Fore, Style, Back
import logging
from logging.handlers import RotatingFileHandler
from ramses_rf import Gateway, GracefulExit
from ramses_rf.const import SZ_DOMAIN_ID, SZ_SCHEDULE, SZ_UFH_IDX
from ramses_rf.discovery import GET_SCHED, SET_SCHED, spawn_scripts
from ramses_rf.version import VERSION as RAMSES_RF_VERSION
from ramses_rf.protocol.command import Command
from ramses_rf.protocol.address import HGI_DEVICE_ID, NON_DEVICE_ID, DEV_TYPE_MAP
from ramses_rf.protocol.logger import CONSOLE_COLS
from ramses_rf.protocol.exceptions import EvohomeError
from ramses_rf.protocol.message import CODE_NAMES
from ramses_rf.protocol.schemas import (
SZ_DISABLE_SENDING,
SZ_ENFORCE_KNOWN_LIST,
SZ_KNOWN_LIST,
SZ_EVOFW_FLAG,
SZ_SERIAL_PORT,
SZ_FILE_NAME,
SZ_ROTATE_BYTES,
SZ_ROTATE_BACKUPS,
)
from ramses_rf.schemas import (
SCH_GLOBAL_CONFIG,
SZ_SCHEMA,
SZ_MAIN_TCS,
SZ_CONFIG,
SZ_DISABLE_DISCOVERY,
SZ_ENABLE_EAVESDROP,
SZ_REDUCE_PROCESSING,
SZ_SYSTEM,
SZ_ORPHANS,
SZ_ORPHANS_HEAT,
SZ_DHW_SYSTEM,
SZ_UFH_SYSTEM,
SZ_APPLIANCE_CONTROL,
SZ_SENSOR,
SZ_DEVICES,
SZ_ZONES,
SZ_ZONE_IDX,
SZ_MAX_ZONES,
SZ_CIRCUITS,
SZ_PACKET_LOG,
SZ_USE_ALIASES,
SZ_ALIAS,
SZ_NAME
)
LIB_KEYS = tuple(SCH_GLOBAL_CONFIG({}).keys()) + (SZ_SERIAL_PORT,)
DEFAULT_COLORS = {" I": f"{Fore.WHITE}", "RP": f"{Fore.LIGHTWHITE_EX}", "RQ": f"{Fore.BLACK}",
" W": f"{Fore.MAGENTA}", "temperature": f"{Fore.YELLOW}", "ERROR": f"{Back.RED}{Fore.YELLOW}",
"mqtt_command": f"{Fore.LIGHTCYAN_EX}" }
if os.path.isdir(sys.argv[0]):
os.chdir(os.path.dirname(sys.argv[0]))
#---------------------------------------------------------------------------------------------------
VERSION = "3.14-0.22.40"
CONFIG_FILE = "evogateway.cfg"
config = configparser.RawConfigParser()
config.read(CONFIG_FILE)
def get_display_colorscheme(reload_config=False):
if reload_config:
config.read(CONFIG_FILE)
colours_string = config.get("MISC", "DISPLAY_COLOURS", fallback=None)
try: # TODO! Get rid of eval and tidy up!
scheme = eval(colours_string) if colours_string else None
except:
pass
if not scheme:
scheme = DEFAULT_COLORS
if " I" not in scheme:
scheme[" I"] = DEFAULT_COLORS[" I"]
if "RQ" not in scheme:
scheme["RQ"] = DEFAULT_COLORS["RQ"]
if "RP" not in scheme:
scheme["RP"] = DEFAULT_COLORS["RP"]
if " W" not in scheme:
scheme[" W"] = DEFAULT_COLORS[" W"]
if "ERROR" not in scheme:
scheme["ERROR"] = DEFAULT_COLORS["ERROR"]
if "mqtt_command" not in scheme:
scheme["mqtt_command"] = DEFAULT_COLORS["mqtt_command"]
return scheme
COM_PORT = config.get("Serial Port","COM_PORT", fallback="/dev/ttyUSB0")
COM_BAUD = config.get("Serial Port","COM_BAUD", fallback=115200)
EVENTS_FILE = config.get("Files", "EVENTS_FILE", fallback="events.log")
PACKET_LOG_FILE = config.get("Files", "PACKET_LOG_FILE", fallback="packet.log")
LOG_FILE_ROTATE_COUNT = config.getint("Files", "LOG_FILE_ROTATE_COUNT", fallback=9)
LOG_FILE_ROTATE_BYTES = config.getint("Files", "LOG_FILE_ROTATE_BYTES", fallback=1000000)
DEVICES_FILE = config.get("Files", "DEVICES_FILE", fallback="devices.json")
ZONES_FILE = config.get("Files", "ZONES_FILE", fallback="zones.json")
LOAD_ZONES_FROM_FILE = config.getboolean("Files", "LOAD_ZONES_FROM_FILE", fallback=True)
SCHEMA_FILE = config.get("Files", "SCHEMA_FILE", fallback="ramsesrf_schema.json")
MAX_SAVE_FILE_COUNT = config.getint("Files", "MAX_SAVE_FILE_COUNT", fallback=9)
MQTT_SERVER = config.get("MQTT", "MQTT_SERVER", fallback="")
MQTT_USER = config.get("MQTT", "MQTT_USER", fallback="")
MQTT_PW = config.get("MQTT", "MQTT_PW", fallback="")
MQTT_CLIENTID = config.get("MQTT", "MQTT_CLIENTID", fallback="evoGateway")
MQTT_PUB_JSON_ONLY = config.getboolean("MQTT", "MQTT_PUB_AS_JSON", fallback=False)
MQTT_PUB_KV_WITH_JSON = config.getboolean("MQTT", "MQTT_PUB_KV_WITH_JSON", fallback=False)
if MQTT_PUB_KV_WITH_JSON:
MQTT_PUB_JSON_ONLY = False
MQTT_GROUP_BY_ZONE = config.getboolean("MQTT", "MQTT_GROUP_BY_ZONE", fallback=True)
MQTT_REQUIRE_ZONE_NAMES = config.getboolean("MQTT", "MQTT_REQUIRE_ZONE_NAMES", fallback=True)
MQTT_SUB_TOPIC = config.get("MQTT", "MQTT_SUB_TOPIC", fallback="")
MQTT_PUB_TOPIC = config.get("MQTT", "MQTT_PUB_TOPIC", fallback="")
MQTT_ZONE_IND_TOPIC = config.get("MQTT", "MQTT_ZONE_INDEP_TOPIC", fallback="_zone_independent")
MQTT_ZONE_UNKNOWN = config.get("MQTT", "MQTT_ZONE_UNKNOWN", fallback="_zone_unknown")
THIS_GATEWAY_NAME = config.get("MISC", "THIS_GATEWAY_NAME", fallback="EvoGateway")
RAMSESRF_DISABLE_SENDING = config.getboolean("MISC", "DISABLE_SENDING", fallback=False)
DISPLAY_FULL_JSON = config.getboolean("MISC", "DISPLAY_FULL_JSON", fallback=False)
FORCE_SINGLE_HGI = config.getboolean("Misc", "FORCE_SINGLE_HGI", fallback=True)
DHW_ZONE_PREFIX = config.get("Misc", "DHW_ZONE_PREFIX", fallback="_dhw")
RAMSESRF_DISABLE_DISCOVERY = config.getboolean("Ramses_rf", SZ_DISABLE_DISCOVERY, fallback=False)
RAMSESRF_ALLOW_EAVESDROP = config.getboolean("Ramses_rf", SZ_ENABLE_EAVESDROP, fallback=False)
RAMSESRF_KNOWN_LIST = config.getboolean("Ramses_rf", SZ_KNOWN_LIST, fallback=True)
MIN_ROW_LENGTH = config.get("MISC", "MIN_ROW_LENGTH", fallback=160)
DISPLAY_COLOURS = get_display_colorscheme()
MQTT_STATUS_SUBTOPIC = "status"
MQTT_OFFLINE = "Offline"
MQTT_ONLINE = "Online"
SYS_CONFIG_COMMAND = "sys_config"
SYSTEM_MSG_TAG = "*"
SEND_STATUS_TRANSMITTED = "Transmitted"
SEND_STATUS_FAILED = "Failed"
SEND_STATUS_SUCCESS = "Successful"
RELAYS = {"f9": "Radiators", "fa": "DHW", "fc": "Appliance Controller"}
SZ_TOPIC_IDX = "topic_idx"
SZ_LOG_IDX = "log_idx"
SZ_FRAG_NUMBER = "frag_number"
SZ_FORCE_IO = "force_io"
GET_SCHED_WAIT_PERIOD = 5
# -----------------------------------
DEVICES = {}
ZONES = {}
UFH_CIRCUITS = {}
MQTT_CLIENT = None
GWY = None
GWY_MODE = None
LAST_SEND_MSG = None
# -----------------------------------
log = logging.getLogger("evogateway_log")
log.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s [%(lineno)s] %(message)s')
# %(funcName)20s() [%(levelname)s]
# Log file handler
file_handler = RotatingFileHandler(EVENTS_FILE, maxBytes=LOG_FILE_ROTATE_BYTES,
backupCount=LOG_FILE_ROTATE_COUNT)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
log.addHandler(file_handler)
# Log console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)
console_handler.setFormatter(formatter)
log.addHandler(console_handler)
_first_cap_re = re.compile('(.)([A-Z][a-z]+)')
_all_cap_re = re.compile('([a-z0-9])([A-Z])')
def to_snake(name):
if name:
name=name.strip().replace("'","").replace(" ","_")
s1 = _first_cap_re.sub(r'\1_\2', name)
s2 = _all_cap_re.sub(r'\1_\2', s1).lower()
return s2.replace("__","_")
def truncate_str(str, length):
if str:
return (str[:length - 3] + '...') if len(str) > length else str
def _proc_kwargs(obj, kwargs) -> Tuple[dict, dict]:
lib_kwargs, cli_kwargs = obj
lib_kwargs[SZ_CONFIG].update({k: v for k, v in kwargs.items() if k in LIB_KEYS})
cli_kwargs.update({k: v for k, v in kwargs.items() if k not in LIB_KEYS})
return lib_kwargs, cli_kwargs
def get_parent_keys(d, value):
for k,v in d.items():
if isinstance(v, dict):
p = get_parent_keys(v, value)
if p:
return [k] + p
elif v == value:
return [k]
def get_device_name(device_address):
try:
if device_address.id == HGI_DEVICE_ID or (FORCE_SINGLE_HGI and device_address.type in "18"):
name = THIS_GATEWAY_NAME
elif device_address.type in "01":
name = "Controller"
elif device_address.type in "63":
name = "UNBOUND"
else:
name = DEVICES[device_address.id][SZ_ALIAS] if device_address.id in DEVICES \
else device_address.id
if name == NON_DEVICE_ID:
name = ""
try:
dev_type = DEV_TYPE_MAP[device_address.type]
except:
dev_type = ""
name = "{} {}".format(dev_type, name).strip()
return name
except Exception as ex:
log.error(f"{Style.BRIGHT}{DISPLAY_COLOURS.get('ERROR')}Exception occured for "
"device_address '{device_address}': {ex}{Style.RESET_ALL}", exc_info=True)
traceback.print_stack()
def get_msg_zone_name(src, target_zone_id=None):
""" Use any 'target' zone name given in the payload, otherwise fall back
to zone name of the sending device
"""
# Use the standard zone names if target_zone_id available (unless source type is BDR or OTB)
if src.type not in "13 10" and target_zone_id and int(target_zone_id, 16) >= 0:
if target_zone_id not in ZONES:
update_zones_from_gwy()
if target_zone_id.strip().lower() in "f9 fa fc":
# These are BDRs or UFH relays. F9 = DHW, FA = Radiators and FC = UFH
if src.type == "01":
zone_name = f"{MQTT_ZONE_IND_TOPIC}"
else:
# Default to placing these under relays as they are not directly from controller
zone_name = f"{MQTT_ZONE_IND_TOPIC}/relays"
else:
zone_name = ZONES[target_zone_id] if target_zone_id in ZONES \
else "_zone_{}".format(target_zone_id)
else:
# i.e. device source type is BDR/OTB _or_ (not BDR/OTB but target_zone_id < 0)
try:
device = GWY.get_device(src.id)
src_zone_id = device.zone.zone_idx if hasattr(device, "zone") and hasattr(device.zone, SZ_ZONE_IDX) else None
if src_zone_id and src_zone_id not in 'FF HW' and src_zone_id in ZONES:
zone_name = ZONES[src_zone_id]
elif src.type in "01 18" or target_zone_id == "-1":
# Controllers and HGI
zone_name = MQTT_ZONE_IND_TOPIC
elif src.type in "07":
# DHW Wireless sender
zone_name = DHW_ZONE_PREFIX
elif src.type in "02 10 13" or (src_zone_id and src_zone_id !="HW" and int(src_zone_id, 16) > 11) :
# Relay types, e.g. BDR, OTB, UFC
zone_name = f"{MQTT_ZONE_IND_TOPIC}/relays"
elif src_zone_id and int(src_zone_id, 16) >= 0 and src_zone_id in ZONES:
# Normal 'zones'
zone_name = ZONES[src_zone_id]
else:
log.error(f"----> Unknown zone for src: '{src} {DEVICES[src.id] if src.id in DEVICES else ''}'")
zone_name = MQTT_ZONE_UNKNOWN
except Exception as e:
log.error(f"Error: {e}", exc_info=True)
zone_name = MQTT_ZONE_UNKNOWN
return zone_name
def get_opentherm_msg(msg):
if msg.code_name == "opentherm_msg":
name = msg.payload.get("msg_name", None)
if name:
# some msg_name are unhashable/dict/have multiple data elements
key = name if isinstance(name, str) else "OpenTherm"
# return the whole payload dict as we don't know which message component is of interest
return key, {key: msg.payload}
else:
log.error(f"Invalid opentherm_msg. msg.code_name: {msg.code_name}")
return None, None
def spawn_schedule_task(action, **kwargs):
""" WIP......... """
ctl_id = GWY.tcs.id
if action == GET_SCHED:
if not SZ_ZONE_IDX in kwargs:
log.error("get_schedules requires 'zone_idx'")
return
zone_idx = kwargs[SZ_ZONE_IDX]
force_io = kwargs[SZ_FORCE_IO] if SZ_FORCE_IO in kwargs else None
zone = GWY.tcs.zone_by_idx[zone_idx]
if zone.schedule and not force_io:
# Schedule already available, so no need for any further io unless we are forcing
display_schedule_for_zone(zone_idx)
return
asyncio.ensure_future(zone.get_schedule(force_io=force_io), loop=GWY._loop)
elif action == SET_SCHED:
if not SZ_ZONE_IDX in kwargs:
log.error("'set_schedule' requires 'zone_idx' to be specified")
return
if not SZ_SCHEDULE in kwargs and type(kwargs[SZ_SCHEDULE] is not list):
log.error("'set_schedule' requires 'schedule' json")
return
zone_idx = kwargs[SZ_ZONE_IDX]
zone = GWY.tcs.zone_by_idx[zone_idx]
schedule = kwargs[SZ_SCHEDULE]
force_refresh = kwargs["force_refresh"] if "force_refresh" in kwargs else None
asyncio.ensure_future(zone.set_schedule(schedule), loop=GWY._loop)
# Create timer to display schedule data after GET_SCHED_WAIT_PERIOD seconds
timer = Timer(GET_SCHED_WAIT_PERIOD, display_schedule_for_zone, [zone_idx])
timer.start()
def cleanup_display_text(msg, display_text):
""" Clean up/Simplify the displayed text for given message. display_text must be a dict """
try:
if type(display_text) == dict:
if msg.code_name in display_text:
# remove the command name (dict key) from the displayed text
filtered_text = display_text[msg.code_name]
# Formatting for temperature/demand numbers
if msg.code_name in "temperature setpoint" and filtered_text is not None:
filtered_text = "{:>05.2f}°C".format(float(filtered_text))
elif "_demand" in msg.code_name and filtered_text is not None:
filtered_text = "{:> 5.0f}%".format(float(filtered_text) * 100)
else:
filtered_text = display_text
# Remove extra detail, not required for 'simple/clean' display
for key in [SZ_ZONE_IDX, "parent_idx", "msg_id", "msg_type"] + [k for k in filtered_text if "unknown" in k]:
if key in filtered_text:
del filtered_text[key]
if "value" in filtered_text and "temperature" in str(filtered_text.keys()) and filtered_text["keys]"]:
filtered_text["value"] = "{:.1f}°C".format(float(filtered_text))
if "heat_demand" in filtered_text and filtered_text["heat_demand]"] is not None:
filtered_text["heat_demand"] = "{:.0f}%".format(float(filtered_text["heat_demand"]) * 100)
if "relay_demand" in filtered_text and filtered_text["relay_demand]"] is not None:
filtered_text["relay_demand"] = "{:.0f}%".format(float(filtered_text["relay_demand"]) * 100)
if "modulation_level" in filtered_text and filtered_text["modulation_level"] is not None:
filtered_text["modulation_level"] = "{:.0f}%".format(float(filtered_text["modulation_level"]) * 100)
filtered_text = json.dumps(filtered_text, sort_keys=True)[1:-1]
filtered_text = filtered_text.replace('"', '').strip()
if msg.verb == "RQ":
filtered_text = "REQUEST: {}{}".format("" if filtered_text else msg.code_name, filtered_text)
return filtered_text
else:
return display_text
except Exception as ex:
log.error(f"Exception occured: {ex}", exc_info=True)
log.error(f"msg.payload: {msg.payload}, display_text: {display_text}")
def process_gwy_message(msg, prev_msg=None) -> None:
""" Process received ramses_rf message from Gateway """
log.debug("") # spacer, as we have other debug entries for a given received msg
log.info(msg) # Log event to file
# Message class in ramses_rf lib does not seem to have the code name, so add it
msg.code_name = CODE_NAMES[msg.code]
if DISPLAY_FULL_JSON:
display_full_msg(msg)
# As some payloads are arrays, and others not, make consistent
payload = [msg.payload] if not isinstance(msg.payload, list) else msg.payload
for item in payload:
# ramses_rf library seems to send each item as a dict
try:
if type(item) != dict:
# Convert to a dict...
item = {msg.code_name: str(item) }
if not DISPLAY_FULL_JSON:
zone_id = item[SZ_ZONE_IDX] if SZ_ZONE_IDX in item else None
display_simple_msg(msg, item, zone_id, "")
mqtt_publish_received_msg(msg, item)
except Exception as e:
log.error(f"Exception occured: {e}", exc_info=True)
log.error(f"item: {item}, payload: {payload} ")
log.error(f"msg: {msg}")
def print_ramsesrf_gwy_schema(gwy):
schema = get_current_schema(gwy)
print(f"Schema[gateway] = {json.dumps(schema, indent=4)}\r\n")
print(f"Params[gateway] = {json.dumps(gwy.params)}\r\n")
print(f"Status[gateway] = {json.dumps(gwy.status)}")
orphans = [d for d in sorted(gwy.schema[SZ_ORPHANS_HEAT])]
print(f"Schema[{SZ_ORPHANS_HEAT}] = {json.dumps({'schema': orphans}, indent=4)}\r\n")
update_devices_from_gwy()
if DEVICES:
devices = {str(k) : {SZ_ALIAS : DEVICES[k][SZ_ALIAS]} for k in DEVICES if k is not None}
print(f"DEVICES = {json.dumps(devices, indent=4)}")
def display_full_msg(msg):
""" Show the full json payload (as in the ramses_rf cli client) """
dtm = f"{msg.dtm:%H:%M:%S.%f}"[:-3]
if msg.src.type == "18":
print(f"{Style.BRIGHT}{DISPLAY_COLOURS.get(msg.verb)}{dtm} {msg}"[:CONSOLE_COLS])
elif msg.verb:
print(f"{DISPLAY_COLOURS.get(msg.verb)}{dtm} {msg}"[:CONSOLE_COLS])
else:
print(f"{Style.RESET_ALL}{dtm} {msg}"[:CONSOLE_COLS])
def display_simple_msg(msg, payload_dict, target_zone_id, suffix_text=""):
src = get_device_name(msg.src)
dst = get_device_name(msg.dst) if msg.src.id != msg.dst.id else ""
# Make a copy as we are deleting elements from the displayed text
display_text = payload_dict.copy() if isinstance(payload_dict, dict) else payload_dict
filtered_text = cleanup_display_text(msg, display_text)
try:
zone_name = "@ {:<20}".format(truncate_str(ZONES[target_zone_id], 20)) if target_zone_id and int(target_zone_id, 16) >= 0 and target_zone_id in ZONES else ""
zone_id = "[Zone {:<3}]".format(target_zone_id) if target_zone_id and int(target_zone_id, 16) >= 0 else ""
if msg.src.type == "18": # Messages from the HGI device
style_prefix = f"{Fore.LIGHTBLACK_EX}"
elif msg.code_name.lower() in DISPLAY_COLOURS :
style_prefix = f"{DISPLAY_COLOURS.get(msg.code_name)}"
elif msg.verb:
style_prefix = f"{DISPLAY_COLOURS.get(msg.verb)}"
else:
style_prefix = f"{Style.RESET_ALL}"
main_txt = f"{filtered_text if filtered_text else '-': <45} {zone_name:<25}"
print_formatted_row(src, dst, msg.verb, msg.code_name, f"{main_txt: <75} {zone_id} {suffix_text}", msg._pkt._rssi, style_prefix)
except Exception as e:
log.error(f"Exception occured: {e}", exc_info=True)
log.error(f"msg: {msg}, payload_dict: {payload_dict}, target_zone_id: {target_zone_id}, suffix_text: {suffix_text}")
log.error(f"type(display_text): {type(display_text)}")
log.error(f"filtered_text: {filtered_text}" if filtered_text else "filtered_text is None")
log.error(f"Display row: {msg.code_name}: {msg.verb}| {src} -> {dst} | {display_text} {zone_name} [Zone {target_zone_id}] {suffix_text}")
log.error(f"|rssi '{msg._pkt._rssi}'| src '{src}' -> dst '{dst}' | verb '{msg.verb}'| cmd '{msg.code_name}'")
def print_formatted_row(src="", dst="", verb="", cmd="", text="", rssi=" ", style_prefix=""):
dtm = datetime.datetime.now().strftime("%Y-%m-%d %X")
if src:
row = f"{dtm} |{rssi}| {truncate_str(src, 21) if src else '':<21} -> {truncate_str(dst, 21) if dst else '':<21} |{verb:<2}| {cmd:<15} | {text}"
else:
row = f"{dtm} | {text}"
row = "{:<{min_row_width}}".format(row, min_row_width=MIN_ROW_LENGTH)
print(f"{Style.RESET_ALL}{style_prefix}{row.strip()}{Style.RESET_ALL}")
def send_command_callback(msg) -> None:
""" Callback receives msg object on success, and False on failure """
status=SEND_STATUS_SUCCESS if msg else SEND_STATUS_FAILED
mqtt_publish_send_status(None, status)
if msg:
# print(f"code_name: {msg.code_name}, code: {msg.code}, is_expired: {msg.is_expired}")
display_text = f"COMMAND SEND SUCCESS: '{msg.code_name}'"
else:
if "code" in LAST_SEND_MSG:
cmd = LAST_SEND_MSG["code"]
elif "command" in LAST_SEND_MSG:
cmd = LAST_SEND_MSG["command"]
else:
cmd = "UNKNOWN"
display_text = f"COMMAND SEND FAILED for '{LAST_SEND_MSG}'"
print_formatted_row(THIS_GATEWAY_NAME, text=display_text, style_prefix=f"{DISPLAY_COLOURS['mqtt_command']}")
log.info(display_text)
def get_current_schema(gwy):
config = {SZ_CONFIG: vars(gwy.config)}
known_list = { SZ_KNOWN_LIST: gwy.known_list}
schema = {**config, **gwy.schema, **known_list}
return schema
def save_schema_and_devices():
if not GWY:
log.error("Schema cannot be saved as GWY is none")
return
try:
# Save the new discovered/'eavesdropped' ramses_rf schema
schema = schema = get_current_schema(GWY)
save_json_to_file(schema, SCHEMA_FILE, False)
update_zones_from_gwy()
update_devices_from_gwy()
if DEVICES:
devices_simple = {str(k) : {SZ_ALIAS : DEVICES[k][SZ_ALIAS]} for k in DEVICES if k is not None}
save_json_to_file(devices_simple, DEVICES_FILE, True)
if ZONES:
save_json_to_file(ZONES, ZONES_FILE, False)
print(f"Updated '{DEVICES_FILE}' and ramses_rf schema files generated")
except Exception as e:
log.error(f"Exception occured: {e}", exc_info=True)
traceback.print_stack()
def save_zones():
update_zones_from_gwy()
if ZONES:
save_json_to_file(ZONES, ZONES_FILE, False)
def get_existing_device_name(device_id):
return DEVICES[device_id][SZ_ALIAS] if device_id in DEVICES and SZ_ALIAS in DEVICES[device_id] else None
def update_devices_from_gwy(ignore_unnamed_zones=False):
""" Refresh the local DEVICES collection with the devices that GWY has found """
schema = GWY.tcs.schema if GWY.tcs else GWY.schema
global DEVICES
controller_id = GWY.tcs.id if GWY and GWY.tcs else (GWY.schema[SZ_MAIN_TCS] if SZ_MAIN_TCS in GWY.schema else None)
if controller_id is not None and controller_id not in DEVICES:
DEVICES[controller_id] = {SZ_ALIAS: f"Controller"}
if SZ_SYSTEM in schema and schema[SZ_SYSTEM] and SZ_APPLIANCE_CONTROL in schema[SZ_SYSTEM]:
device_id = schema[SZ_SYSTEM][SZ_APPLIANCE_CONTROL]
org_name = get_existing_device_name(device_id)
DEVICES[device_id] = {SZ_ALIAS: org_name if org_name else get_device_type_and_id(device_id)}
if SZ_ZONES in schema:
for zone_id, zone_items in schema[SZ_ZONES].items():
if SZ_SENSOR in zone_items:
sensor_id = zone_items[SZ_SENSOR]
org_name = get_existing_device_name(sensor_id)
DEVICES[sensor_id] = {SZ_ALIAS: org_name if org_name else f"{get_device_type_and_id(sensor_id)}", "zone_id": zone_id}
if SZ_DEVICES in zone_items:
if zone_id in ZONES:
zone_name = ZONES[zone_id]
elif not ignore_unnamed_zones:
zone_name = f"Zone_{zone_id}"
else:
zone_name = None
for device_id in zone_items[SZ_DEVICES]:
if device_id is not None:
org_name = get_existing_device_name(device_id)
DEVICES[device_id] = {SZ_ALIAS: org_name if org_name else f"{zone_name} {get_device_type_and_id(device_id)}", "zone_id": zone_id}
if SZ_DHW_SYSTEM in schema:
for dhw_device_type in schema[SZ_DHW_SYSTEM]:
device_id = schema[SZ_DHW_SYSTEM][dhw_device_type]
if device_id:
DEVICES[device_id] = {SZ_ALIAS: dhw_device_type.replace("_"," ").title()}
if SZ_UFH_SYSTEM in schema:
ufc_ids = list(schema[SZ_UFH_SYSTEM].keys())
for ufc_id in ufc_ids:
org_name = get_existing_device_name(ufc_id)
DEVICES[ufc_id] = {SZ_ALIAS: org_name if org_name else f"UFH Controller {get_device_type_and_id(ufc_id)}"}
if SZ_ORPHANS in schema and schema[SZ_ORPHANS]:
for device_id in schema[SZ_ORPHANS]:
org_name = get_existing_device_name(device_id)
DEVICES[device_id] = {SZ_ALIAS: org_name if org_name else get_device_type_and_id(device_id)}
mqtt_publish_schema()
def update_zones_from_gwy(schema={}, params={}):
""" Refresh local ZONES with zones detected by GWY and has got zone names """
if GWY:
if not schema:
schema = GWY.tcs.schema if GWY.tcs else GWY.schema
if not params:
params = GWY.tcs.params if GWY.tcs else GWY.params
global ZONES
global UFH_CIRCUITS
# GWY.tcs.zones contains list of zone
# GWY.tcs.zone_by_idx['00'] gets zone object (e.g GWY.tcs.zone_by_idx['00'].name)
# ZONES = {}
if SZ_ZONES in schema and params:
for zone_id in schema[SZ_ZONES]:
if SZ_ZONES in params and SZ_NAME in params[SZ_ZONES][zone_id] and params[SZ_ZONES][zone_id][SZ_NAME]:
ZONES[zone_id] = params[SZ_ZONES][zone_id][SZ_NAME]
if schema and SZ_UFH_SYSTEM in schema:
ufc_ids = list(schema[SZ_UFH_SYSTEM].keys())
for ufc_id in ufc_ids:
#TODO! If there are multiple ufh controllers, circuit numbers in ufh_circuits will have to be dependent on controller ID - is this available in messages?
if SZ_CIRCUITS in schema[SZ_UFH_SYSTEM][ufc_id] and len(schema[SZ_UFH_SYSTEM][ufc_id][SZ_CIRCUITS]) > 0:
for c in schema[SZ_UFH_SYSTEM][ufc_id][SZ_CIRCUITS]:
if c not in UFH_CIRCUITS.keys():
UFH_CIRCUITS[c] = schema[SZ_UFH_SYSTEM][ufc_id][SZ_CIRCUITS][c]
# Only publish if GWY initialised
if GWY:
mqtt_publish_schema()
def get_device_type_and_id(device_id):
if device_id and ":" in device_id and len(device_id) == 9:
id_parts = device_id.split(":")
dev_type = DEV_TYPE_MAP[id_parts[0]]
return f"{dev_type}:{id_parts[1]}"
else:
log.debug(f"get_device_type_and_id: Ignorning invalid device_id of '{device_id}'")
log.debug(traceback.format_exc())
def get_sys_status_dict(status):
return {"status": status, "status_ts": datetime.datetime.now().strftime("%Y-%m-%dT%X")}
def mqtt_initialise():
if not MQTT_SERVER:
log.error("MQTT Server details not found. Exiting...")
raise SystemExit
global MQTT_CLIENT
MQTT_CLIENT = mqtt.Client()
MQTT_CLIENT.on_connect = mqtt_on_connect
MQTT_CLIENT.on_message = mqtt_on_message
MQTT_CLIENT.will_set(f"{MQTT_PUB_TOPIC}/{MQTT_STATUS_SUBTOPIC}",
payload=json.dumps(get_sys_status_dict(MQTT_OFFLINE), indent=4), qos=0, retain=True)
if MQTT_USER:
MQTT_CLIENT.username_pw_set(MQTT_USER, MQTT_PW)
MQTT_CLIENT.connect(MQTT_SERVER)
return MQTT_CLIENT
def mqtt_on_connect(client, *_):
log.info(f"Connected to MQTT broker. Subscribing to topic {MQTT_SUB_TOPIC} for commands")
client.subscribe(MQTT_SUB_TOPIC)
client.publish(f"{MQTT_PUB_TOPIC}/{MQTT_STATUS_SUBTOPIC}", MQTT_ONLINE)
mqtt_publish_status(MQTT_ONLINE)
def mqtt_on_message(client, _, msg):
payload = str(msg.payload.decode("utf-8"))
print_formatted_row("MQTT", text=f"Received MQTT message: {payload}", style_prefix=f"{DISPLAY_COLOURS['mqtt_command']}")
log.info(f"MQTT message received: {payload}")
mqtt_process_msg(payload)
def mqtt_publish_status(status):
MQTT_CLIENT.publish(f"{MQTT_PUB_TOPIC}/{MQTT_STATUS_SUBTOPIC}", json.dumps(get_sys_status_dict(status), indent=4), 0, True)
def mqtt_publish_received_msg(msg, payload, no_unpack=False):
""" We explicitly receive the payload instead of just using msg.payload, so that any pre-processing of the payload is assumed to be already done
Payloads are assumed to always be dict
"""
if not (MQTT_CLIENT and msg and (not MQTT_PUB_JSON_ONLY or payload)):
return
if not MQTT_CLIENT.is_connected:
print_formatted_row(SYSTEM_MSG_TAG, text="[WARN] MQTT publish failed as client is not connected to broker")
return
if not isinstance(payload, dict):
log.error(f"Payload in mqtt_publish_received_msg is not of type dict. type(payload): {type(payload)}, payload arg: {payload}, msg.payload: {msg.payload}")
try:
target_zone_id = None
if "parent_idx" in payload and msg.src.type not in "10 13":
# Ignore parent_idx if device type is OTB or BDR
target_zone_id = payload["parent_idx"]
elif SZ_ZONE_IDX in payload:
target_zone_id = payload[SZ_ZONE_IDX]
elif SZ_DOMAIN_ID in payload:
target_zone_id = payload[SZ_DOMAIN_ID]
elif SZ_UFH_IDX in str(payload):
if not UFH_CIRCUITS: # May just need an update
update_zones_from_gwy()
if UFH_CIRCUITS and payload[SZ_UFH_IDX] in UFH_CIRCUITS and SZ_ZONE_IDX in UFH_CIRCUITS[payload[SZ_UFH_IDX]]:
target_zone_id = UFH_CIRCUITS[payload[SZ_UFH_IDX]][SZ_ZONE_IDX]
if msg.src.id not in DEVICES: # Refresh zones/devices list
update_zones_from_gwy()
update_devices_from_gwy()
if hasattr(msg.src, "zone") and msg.src.zone and hasattr(msg.src.zone, "idx") and msg.src.zone.idx and not "HW" in msg.src.zone.idx:
src_zone_id = msg.src.zone.idx
elif hasattr(msg.src, "_domain_id") and msg.src._domain_id and int(msg.src._domain_id, 16) >= 0:
src_zone_id = msg.src._domain_id
else:
src_zone_id = None
if (target_zone_id and 0 <= int(target_zone_id, 16) < 12) or (src_zone_id and 0 <= int(src_zone_id, 16) < 12):
if MQTT_GROUP_BY_ZONE and MQTT_REQUIRE_ZONE_NAMES and (not ZONES or (target_zone_id not in ZONES and src_zone_id not in ZONES)):
# MQTT topic requires zone name...
update_zones_from_gwy()
if target_zone_id and target_zone_id not in ZONES and src_zone_id not in ZONES:
log.error(f"Both 'target_zone_id' and 'src_zone_id' not found in ZONES")
return # Return unless we have the zone name, as otherwise cannot build topic
src_zone = to_snake(get_msg_zone_name(msg.src, target_zone_id)) #if not target_zone_id or target_zone_id <1 else get_device_zone_name(target_zone_id)
src_device = to_snake(get_device_name(msg.src))
if ("dhw_" in msg.code_name or "dhw_" in src_device or (src_zone_id and "HW" in src_zone_id)) and DHW_ZONE_PREFIX:
# treat DHW as a zone if we are grouping by zone, otherwise as a device prefix
if MQTT_GROUP_BY_ZONE:
src_zone = f"{DHW_ZONE_PREFIX}"
else:
src_device = f"{DHW_ZONE_PREFIX}/{src_device}"
if not MQTT_PUB_JSON_ONLY and "until" in payload and payload["until"] and " " in payload["until"]:
# Patch with T separator
try:
d, t = payload["until"].split(" ")
payload["until"] = f"{d}T{t}"
except Exception as ex:
log.error(f"Exception occured in patching 'until' value '{payload['until']}': {ex}", exc_info=True)
# Need separate sub-topics for certain payloads under CTL, HGI or UFH controller, such as fault log entries
if "topic_idx" in payload:
# topic_idx is not currently sent in ramses_rf payloads. Use here for custom topics, e.g. schedules
topic_idx = f"/{payload['topic_idx']}"
elif "log_idx" in payload:
topic_idx = f"/{payload['log_idx']}"
elif SZ_FRAG_NUMBER in payload:
topic_idx = f"/fragment_{payload['frag_number']}"
elif src_zone.endswith("/relays") and "ufx_idx" in payload:
topic_idx = f"/_ufx_idx_{payload['ufx_idx']}"
elif src_zone.startswith(MQTT_ZONE_IND_TOPIC) and (src_device.startswith("hgi_") or src_device.startswith("ctl_") or src_device.startswith("ufc_")) and (SZ_ZONE_IDX in payload or SZ_DOMAIN_ID in payload):
if SZ_ZONE_IDX in payload:
topic_idx = f"/{payload[SZ_ZONE_IDX]}"
elif payload[SZ_DOMAIN_ID].lower() in RELAYS:
topic_idx = f"/_domain_{payload['domain_id'].upper()}_{to_snake(RELAYS[payload['domain_id'].lower()])}"
else:
topic_idx = payload[SZ_DOMAIN_ID].lower()
else:
topic_idx = ""
if MQTT_GROUP_BY_ZONE and src_zone:
topic_base = f"{MQTT_PUB_TOPIC}/{src_zone}/{src_device}/{msg.code_name}{topic_idx}"
else:
topic_base = f"{MQTT_PUB_TOPIC}/{src_device}/{msg.code_name}{topic_idx}"
subtopic = topic_base
# if msg.code_name == "relay_demand" or SZ_DOMAIN_ID in payload:
# log.info(f"[DEBUG] -----> : payload: {payload}, target_zone_id: {target_zone_id}, msg: {msg}")
# log.info(f"[DEBUG] -----> : subtopic: '{subtopic}', topic_idx: '{topic_idx}', src_zone: {src_zone}, src_device: {src_device}")
timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%X%Z")
if not MQTT_PUB_JSON_ONLY and not no_unpack:
#Unpack the JSON and publish the individual key/value pairs
if MQTT_PUB_KV_WITH_JSON:
# Publish the payload JSON into the subtopic key
MQTT_CLIENT.publish(subtopic, json.dumps(payload | {"timestamp": timestamp}), 0, True)
if msg.code_name == "opentherm_msg":
# This is an opentherm_msg. Extract msg item and updated_payload as new dict, with msg_name as key
new_key, updated_payload = get_opentherm_msg(msg)
else:
updated_payload = payload
new_key = None
subtopic = f"{topic_base}/{to_snake(new_key)}" if new_key else topic_base
# As some payloads are received as lists, others not, convert everything to a list so we can process in same way
if updated_payload and not isinstance(updated_payload, list):
updated_payload = [updated_payload]
# Iterate through the list. payload_item should be a dict as updated_payload should now be a list of dict [{...}]
if updated_payload:
for payload_item in updated_payload:
try:
if isinstance(payload_item, dict): # we may have a further dict in the updated_payload - e.g. opentherm msg, system_fault etc
for k in payload_item:
MQTT_CLIENT.publish(f"{subtopic}/{to_snake(k)}", str(payload_item[k]), 0, True)
log.debug(f" -> mqtt_publish_received_msg: 2. Posted subtopic: {subtopic}/{to_snake(k)}, value: {payload_item[k]}")
else:
MQTT_CLIENT.publish(subtopic, str(payload_item), 0, True)
log.info(f" -> mqtt_publish_received_msg: 3. item is not a dict. Posted subtopic: {subtopic}, value: {payload_item}, type(playload_item): {type(payload_item)}")
except Exception as e:
log.error(f"Exception occured: {e}", exc_info=True)
log.error(f"------------> payload_item: \"{payload_item}\", type(payload_item): \"{type(payload_item)}\", updated_payload: \"{updated_payload}\"")
log.error(f"------------> msg: {msg}")
else:
# Publish the JSON
MQTT_CLIENT.publish(subtopic, json.dumps(msg.payload), 0, True)
MQTT_CLIENT.publish(f"{topic_base}/{msg.code_name}_ts", timestamp, 0, True)
# print("published to mqtt topic {}: {}".format(topic, msg))
except Exception as e:
log.error(f"Exception occured: {e}", exc_info=True)
log.error(f"msg.src.id: {msg.src.id}, command: {msg.code_name}, payload: {payload}, pub_json: {MQTT_PUB_JSON_ONLY}")
log.error(f"msg: {msg}")
traceback.print_exc()
pass
def mqtt_publish_zone_schedules(with_display=False):
""" Publish all avialable zone schedules"""
for zone in GWY.tcs.zones:
if zone.schedule:
# Fake a Message object for publishing...
msg = SimpleNamespace(**{"code_name":"zone_schedule", SZ_ZONE_IDX: zone.idx, "src": SimpleNamespace(**{"id": GWY.tcs.id, "type": GWY.get_device(GWY.tcs.id).type, "zone": zone})})
mqtt_publish_received_msg(msg, {SZ_SCHEDULE: zone.schedule, SZ_ZONE_IDX: zone.idx})
if with_display:
display_schedule_for_zone(zone)
def display_schedule_for_zone(zone_idx):
""" Display schedule for given zone and post to mqtt"""
zone = GWY.tcs.zone_by_idx[zone_idx]
if zone and zone.schedule:
schedule = json.dumps(zone.schedule)
dtm = f"{datetime.datetime.now():%H:%M:%S.%f}"[:-3]
zone_name = f"{zone.name} [{zone.idx}]" if zone.name else f"{zone.idx}"
if DISPLAY_FULL_JSON:
print(f"{DISPLAY_COLOURS.get('RP')}{dtm} "
f"Schedule for zone {zone_name}: {schedule}"[:CONSOLE_COLS])
else:
print_formatted_row(SYSTEM_MSG_TAG,
text=f"Schedule for zone '{zone_name}': {schedule}")
# Fake a Message object for publishing...
msg = SimpleNamespace(**{"code_name":"zone_schedule", SZ_ZONE_IDX: zone.idx, "src": SimpleNamespace(**{"id": GWY.tcs.id, "type": GWY.get_device(GWY.tcs.id).type, "zone": zone})})
mqtt_publish_received_msg(msg, {SZ_SCHEDULE: zone.schedule, SZ_ZONE_IDX: zone.idx})
def mqtt_publish_send_status(cmd, status):
if not cmd and not status:
log.error("mqtt_publish_send_status: Both 'cmd' and 'status' cannot be None")
return
topic = f"{MQTT_SUB_TOPIC}/_last_command"
timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%X")
if cmd:
MQTT_CLIENT.publish(f"{topic}/command", cmd, 0, True)
MQTT_CLIENT.publish(f"{topic}/command_ts", timestamp, 0, True)
MQTT_CLIENT.publish(f"{topic}/status", status, 0, True)
MQTT_CLIENT.publish(f"{topic}/status_ts", timestamp, 0, True)
def mqtt_publish_schema():
topic = f"{MQTT_PUB_TOPIC}/{MQTT_ZONE_IND_TOPIC}/_gateway_config"
MQTT_CLIENT.publish(f"{topic}/gwy_mode", "eavesdrop" if RAMSESRF_ALLOW_EAVESDROP else "monitor", 0, True)
MQTT_CLIENT.publish(f"{topic}/schema", json.dumps(GWY.schema if GWY.tcs is None else GWY.tcs.schema, sort_keys=True), 0, True)
MQTT_CLIENT.publish(f"{topic}/params", json.dumps(GWY.params if GWY.tcs is None else GWY.tcs.params, sort_keys=True), 0, True)
MQTT_CLIENT.publish(f"{topic}/status", json.dumps(GWY.status if GWY.tcs is None else GWY.tcs.status, sort_keys=True), 0, True)
MQTT_CLIENT.publish(f"{topic}/config", json.dumps(vars(GWY.config), sort_keys=True), 0, True)
MQTT_CLIENT.publish(f"{topic}/devices", json.dumps({str(k): v for k, v in DEVICES.items()}, sort_keys=True), 0, True)
MQTT_CLIENT.publish(f"{topic}/zones", json.dumps(ZONES), 0, True)
MQTT_CLIENT.publish(f"{topic}/uhf_circuits", json.dumps(UFH_CIRCUITS, sort_keys=True), 0, True)
timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%X")
MQTT_CLIENT.publish(f"{topic}/_gateway_config_ts", timestamp, 0, True)
def mqtt_process_msg(msg):
log.debug(f"MQTT message received: {msg}")
try:
json_data = json.loads(msg)
except:
log.error(f"mqtt message is not in JSON format: '{msg}'")
return
try:
if SYS_CONFIG_COMMAND in json_data:
if json_data[SYS_CONFIG_COMMAND].upper().strip() == "DISPLAY_FULL_JSON":
global DISPLAY_FULL_JSON
DISPLAY_FULL_JSON = json_data["value"] if "value" in json_data else False
elif json_data[SYS_CONFIG_COMMAND].upper().strip() == "RELOAD_DISPLAY_COLOURS":
global DISPLAY_COLOURS
DISPLAY_COLOURS = get_display_colorscheme(True)
elif json_data[SYS_CONFIG_COMMAND].upper().strip() == "POST_SCHEMA":
update_zones_from_gwy()
update_devices_from_gwy()
elif json_data[SYS_CONFIG_COMMAND].upper().strip() == "SAVE_SCHEMA":
update_zones_from_gwy()
update_devices_from_gwy()
save_schema_and_devices()
else:
print_formatted_row(SYSTEM_MSG_TAG, text="System configuration command '{}' not recognised".format(json_data[SYS_CONFIG_COMMAND]))
return
else:
if "code" in json_data:
command_code = json_data["code"]
if type(command_code) is int:
command_code = hex(command_code)
command_code = command_code.upper().replace("0X","")
if "verb" not in json_data or "payload" not in json_data:
log.error(f"Failed to send command '{command_code}'. Both 'verb' and 'payload' must be provided when 'code' is used instead of 'command'")
return
verb = json_data["verb"]
payload = json_data["payload"]
dest_id = json_data["dest_id"] if "dest_id" in json_data else GWY.tcs.id
if "from_id" in json_data: # Allow addition of from_id kwarg
from_id = json_data["from_id"]
gw_cmd = GWY.create_cmd(verb, dest_id, command_code, payload, from_id=from_id)
else:
gw_cmd = GWY.create_cmd(verb, dest_id, command_code, payload) # Command.from_attrs()
log.debug(f"--------> MQTT message converted to Command: '{gw_cmd}'")
elif "command" in json_data:
command_name = json_data["command"]
if command_name in GET_SCHED:
zone_idx = json_data[SZ_ZONE_IDX] if SZ_ZONE_IDX in json_data else None
force_refresh = json_data["force_refresh"] if "force_refresh" in json_data else None
spawn_schedule_task(GET_SCHED, zone_idx=zone_idx, force_refresh=force_refresh)
return
elif command_name in SET_SCHED:
if SZ_SCHEDULE in json_data:
spawn_schedule_task(action=SET_SCHED, zone_idx=json_data[SZ_ZONE_IDX],schedule=json_data[SZ_SCHEDULE])
elif "schedule_json_file" in json_data:
with open(json_data["schedule_json_file"], 'r') as fp:
schedule = json.load(fp)
spawn_schedule_task(action=SET_SCHED, schedule=schedule)
else:
log.error("'set_schedule' command requires a 'schedule' json")