From bda3a7a5febbc4f1235ecb81623d957314aabdc1 Mon Sep 17 00:00:00 2001 From: Hans IJntema Date: Sun, 18 Sep 2022 20:45:54 +0200 Subject: [PATCH] 1.2.6 --- README.md | 5 ++- mqtt.py | 95 ++++++++++++++++++++++++++++++++++++++---- trannergy-mqtt.py | 2 +- trannergy_tcpclient.py | 12 +++++- 4 files changed, 101 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index cded751..83cb4f9 100755 --- a/README.md +++ b/README.md @@ -37,8 +37,11 @@ Tested under Linux; there is no reason why it does not work under Windows. GPL v3 ## Versions +1.2.6 +* Exit on Socket Connect error (and systemd will restart) + 1.2.5: -* Exit after maxretries to connect to inverter +* Exit after maxretries to connect to inverter (and systemd will restart) 1.2.4: * Fix exit code (SUCCESS vs FAILURE) diff --git a/mqtt.py b/mqtt.py index c480381..99d783c 100755 --- a/mqtt.py +++ b/mqtt.py @@ -9,6 +9,7 @@ v1.0.0: initial version v1.0.1: add last will + v1.1.0: add subscription This program is free software: you can redistribute it and/or modify @@ -24,11 +25,11 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . - + V1.1.0: 31-7-2022: Add subsribing to MQTT server """ -__version__ = "1.0.3" +__version__ = "1.1.2" __author__ = "Hans IJntema" __license__ = "GPLv3" @@ -47,7 +48,7 @@ logger = logging.getLogger(script + "." + __name__) # Error values -#MQTT_ERR_AGAIN = -1 +MQTT_ERR_AGAIN = -1 MQTT_ERR_SUCCESS = 0 MQTT_ERR_NOMEM = 1 MQTT_ERR_PROTOCOL = 2 @@ -107,8 +108,12 @@ def __init__(self, mqtt_broker, mqtt_port, mqtt_client_id, mqtt_rate, mqtt_qos, :param int mqtt_qos: MQTT QoS 0,1,2 :param str username: :param str password: - :param threading.Event() mqtt_stopper: - :param threading.Event() threads_stopper: stopper event for other threads + :param threading.Event() mqtt_stopper: indicate to stop the mqtt thread; typically as last thread in main loop to flush out all mqtt messages + :param threading.Event() threads_stopper: stopper event for other worker threads; typically the worker threads are + stopped in the main loop before the mqtt thread;but mqtt thread can also set this in case of failure + + TODO REMOVE threads_stopper, as it is always set when mqtt_stopper is set + (is that possible?) Returns: None @@ -139,10 +144,16 @@ def __init__(self, mqtt_broker, mqtt_port, mqtt_client_id, mqtt_rate, mqtt_qos, # Call back functions self.__mqtt.on_connect = self.__on_connect self.__mqtt.on_disconnect = self.__on_disconnect + self.__mqtt.on_message = self.__on_message # Uncomment if needed for debugging # self.__mqtt.on_publish = self.__on_publish # self.__mqtt.on_log = self.__on_log + self.__mqtt.on_subscribe = self.__on_subscribe + self.__mqtt.on_unsubscribe = self.__on_unsubscribe + + # Not yet implemented + # self.__mqtt.on_unsubscribe = self.__on_unsubscribe self.__connected_flag = False @@ -158,6 +169,10 @@ def __init__(self, mqtt_broker, mqtt_port, mqtt_client_id, mqtt_rate, mqtt_qos, self.__status_payload = None self.__status_retain = None + # For processing subscribed messages + self.__message_trigger = None + self.__message = None + def __del__(self): logger.info(f">>") @@ -195,6 +210,7 @@ def __on_connect(self, client, userdata, flags, rc): Returns: None """ + logger.debug(f"Connected: RC = {rc}") if rc == 0: logger.debug(f"Connected: client={client}; userdata={userdata}; flags={flags}; rc={rc}") self.__connected_flag = True @@ -223,6 +239,20 @@ def __on_disconnect(self, client, userdata, rc): else: logger.error(f"Unexpected disconnect, rc = {rc}, {rc_dict[rc]}") + def __on_message(self, client, userdata, message): + """ + :param client: + :param userdata: + :param message: Queue() + :return: + """ + logger.debug(f">> message = {message.topic} {message.payload}") + + self.__message.put(message) + + if self.__message_trigger != None: + self.__message_trigger.set() + def __on_publish(self, client, userdata, mid): """ Callback: when a message that was to be sent using the publish() call has completed transmission to the broker. @@ -235,8 +265,27 @@ def __on_publish(self, client, userdata, mid): Returns: None """ - None - # logger.debug( f"userdata={userdata}; mid={mid}" ) + logger.debug(f"userdata={userdata}; mid={mid}") + return None + + def __on_subscribe(self, client, obj, mid, granted_qos): + """ + :param client: + :param obj: + :param mid: + :param granted_qos: + :return: + """ + logger.debug(f">> Subscribed: {mid} {granted_qos}") + + def __on_unsubscribe(self, client, obj, mid): + """ + :param client: + :param obj: + :param mid: + :return: + """ + logger.debug(f">> Unsubscribed: {mid}") def __on_log(self, client, obj, level, buf): """ @@ -341,6 +390,7 @@ def __do_mqtt(self): logger.debug(f"Received from Queue: TOPIC={topic} MESSAGE={message};...{self.__queue.qsize()} message(s) left in queued") except queue.Empty: continue + #return None self.__mqtt.publish(topic, message, qos=self.__qos, retain=retainflag) counter += 1 @@ -352,6 +402,32 @@ def __do_mqtt(self): # stopper is set... logger.info(f"Shutting down MQTT Client... {counter} MQTT messages have been published") + def set_message_trigger(self, message, trigger=None): + """ + + :param message: Queue() - as received by on_message (topic, payload,..) + :param trigger: threading.Event(); OPTIONAL: to indicate that message has been received + :return: + """ + + self.__message_trigger = trigger + self.__message = message + + def subscribe(self, topic): + logger.debug(f">> topic = {topic}") + + # Subscribing will not work if client is not connected + # Wait till there is a connection + while not self.__connected_flag and not self.__mqtt_stopper.is_set(): + logger.warning(f"No connection with MQTT Broker; cannot subscribe...wait for connection") + time.sleep(0.1) + + self.__mqtt.subscribe(topic, self.__qos) + + def unsubscribe(self, topic): + logger.debug(f">> topic = {topic}") + self.__mqtt.unsubscribe(topic) + def run(self): logger.info(f"Broker = {self.__mqtt_broker}>>") self.__run = True @@ -381,8 +457,9 @@ def run(self): return else: + logger.info(f"start mqtt loop...") self.__mqtt.loop_start() - logger.info(f"mqtt loop started...") + logger.debug(f"mqtt loop started...") # Start infinite loop which sends queued messages every second while not self.__mqtt_stopper.is_set(): @@ -390,7 +467,7 @@ def run(self): time.sleep(0.1) # Close mqtt broker - logger.debug(f"CLose down MQTT client & connection to broker") + logger.debug(f"Close down MQTT client & connection to broker") self.__mqtt.loop_stop() self.__mqtt.disconnect() self.__mqtt_stopper.set() diff --git a/trannergy-mqtt.py b/trannergy-mqtt.py index 6dcb4b4..5e1f713 100755 --- a/trannergy-mqtt.py +++ b/trannergy-mqtt.py @@ -36,7 +36,7 @@ along with this program. If not, see . """ -__version__ = "1.2.5" +__version__ = "1.2.6" __author__ = "Hans IJntema" __license__ = "GPLv3" diff --git a/trannergy_tcpclient.py b/trannergy_tcpclient.py index 1750fe2..b3ef277 100755 --- a/trannergy_tcpclient.py +++ b/trannergy_tcpclient.py @@ -70,12 +70,20 @@ def __socket_connect(self): self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__sock.connect((cfg.INV_IP, cfg.INV_TCPCLIENTPORT)) return + except OSError as e: + logger.error(f"Socket Exception OSError: {type(e).__name__}: {str(e)}; set stopper") + self.__stopper.set() + return + except Exception as e: - logger.info(f"Read SOCKS: {type(e).__name__}: {str(e)}") - time.sleep(30) counter += 1 + logger.info(f"Socket Exception: {type(e).__name__}: {str(e)}; counter = {counter}") + time.sleep(30) + if counter > cfg.INV_MAXRETRIES: + logger.error(f"Socket MAX_RETRIES exceeded. set stopper") self.__stopper.set() + return def __request_string(self, ser): """