Skip to content

Commit

Permalink
1.2.6
Browse files Browse the repository at this point in the history
  • Loading branch information
Hans IJntema committed Sep 18, 2022
1 parent 61a7692 commit bda3a7a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 13 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ Tested under Linux; there is no reason why it does not work under Windows.
GPL v3

## Versions
1.2.6
* Exit on Socket Connect error (and systemd will restart)

1.2.5:
* Exit after maxretries to connect to inverter
* Exit after maxretries to connect to inverter (and systemd will restart)

1.2.4:
* Fix exit code (SUCCESS vs FAILURE)
Expand Down
95 changes: 86 additions & 9 deletions mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
v1.0.0: initial version
v1.0.1: add last will
v1.1.0: add subscription
This program is free software: you can redistribute it and/or modify
Expand All @@ -24,11 +25,11 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
V1.1.0: 31-7-2022: Add subsribing to MQTT server
"""

__version__ = "1.0.3"
__version__ = "1.1.2"
__author__ = "Hans IJntema"
__license__ = "GPLv3"

Expand All @@ -47,7 +48,7 @@
logger = logging.getLogger(script + "." + __name__)

# Error values
#MQTT_ERR_AGAIN = -1
MQTT_ERR_AGAIN = -1
MQTT_ERR_SUCCESS = 0
MQTT_ERR_NOMEM = 1
MQTT_ERR_PROTOCOL = 2
Expand Down Expand Up @@ -107,8 +108,12 @@ def __init__(self, mqtt_broker, mqtt_port, mqtt_client_id, mqtt_rate, mqtt_qos,
:param int mqtt_qos: MQTT QoS 0,1,2
:param str username:
:param str password:
:param threading.Event() mqtt_stopper:
:param threading.Event() threads_stopper: stopper event for other threads
:param threading.Event() mqtt_stopper: indicate to stop the mqtt thread; typically as last thread in main loop to flush out all mqtt messages
:param threading.Event() threads_stopper: stopper event for other worker threads; typically the worker threads are
stopped in the main loop before the mqtt thread;but mqtt thread can also set this in case of failure
TODO REMOVE threads_stopper, as it is always set when mqtt_stopper is set
(is that possible?)
Returns:
None
Expand Down Expand Up @@ -139,10 +144,16 @@ def __init__(self, mqtt_broker, mqtt_port, mqtt_client_id, mqtt_rate, mqtt_qos,
# Call back functions
self.__mqtt.on_connect = self.__on_connect
self.__mqtt.on_disconnect = self.__on_disconnect
self.__mqtt.on_message = self.__on_message

# Uncomment if needed for debugging
# self.__mqtt.on_publish = self.__on_publish
# self.__mqtt.on_log = self.__on_log
self.__mqtt.on_subscribe = self.__on_subscribe
self.__mqtt.on_unsubscribe = self.__on_unsubscribe

# Not yet implemented
# self.__mqtt.on_unsubscribe = self.__on_unsubscribe

self.__connected_flag = False

Expand All @@ -158,6 +169,10 @@ def __init__(self, mqtt_broker, mqtt_port, mqtt_client_id, mqtt_rate, mqtt_qos,
self.__status_payload = None
self.__status_retain = None

# For processing subscribed messages
self.__message_trigger = None
self.__message = None

def __del__(self):
logger.info(f">>")

Expand Down Expand Up @@ -195,6 +210,7 @@ def __on_connect(self, client, userdata, flags, rc):
Returns:
None
"""
logger.debug(f"Connected: RC = {rc}")
if rc == 0:
logger.debug(f"Connected: client={client}; userdata={userdata}; flags={flags}; rc={rc}")
self.__connected_flag = True
Expand Down Expand Up @@ -223,6 +239,20 @@ def __on_disconnect(self, client, userdata, rc):
else:
logger.error(f"Unexpected disconnect, rc = {rc}, {rc_dict[rc]}")

def __on_message(self, client, userdata, message):
"""
:param client:
:param userdata:
:param message: Queue()
:return:
"""
logger.debug(f">> message = {message.topic} {message.payload}")

self.__message.put(message)

if self.__message_trigger != None:
self.__message_trigger.set()

def __on_publish(self, client, userdata, mid):
"""
Callback: when a message that was to be sent using the publish() call has completed transmission to the broker.
Expand All @@ -235,8 +265,27 @@ def __on_publish(self, client, userdata, mid):
Returns:
None
"""
None
# logger.debug( f"userdata={userdata}; mid={mid}" )
logger.debug(f"userdata={userdata}; mid={mid}")
return None

def __on_subscribe(self, client, obj, mid, granted_qos):
"""
:param client:
:param obj:
:param mid:
:param granted_qos:
:return:
"""
logger.debug(f">> Subscribed: {mid} {granted_qos}")

def __on_unsubscribe(self, client, obj, mid):
"""
:param client:
:param obj:
:param mid:
:return:
"""
logger.debug(f">> Unsubscribed: {mid}")

def __on_log(self, client, obj, level, buf):
"""
Expand Down Expand Up @@ -341,6 +390,7 @@ def __do_mqtt(self):
logger.debug(f"Received from Queue: TOPIC={topic} MESSAGE={message};...{self.__queue.qsize()} message(s) left in queued")
except queue.Empty:
continue
#return None

self.__mqtt.publish(topic, message, qos=self.__qos, retain=retainflag)
counter += 1
Expand All @@ -352,6 +402,32 @@ def __do_mqtt(self):
# stopper is set...
logger.info(f"Shutting down MQTT Client... {counter} MQTT messages have been published")

def set_message_trigger(self, message, trigger=None):
"""
:param message: Queue() - as received by on_message (topic, payload,..)
:param trigger: threading.Event(); OPTIONAL: to indicate that message has been received
:return:
"""

self.__message_trigger = trigger
self.__message = message

def subscribe(self, topic):
logger.debug(f">> topic = {topic}")

# Subscribing will not work if client is not connected
# Wait till there is a connection
while not self.__connected_flag and not self.__mqtt_stopper.is_set():
logger.warning(f"No connection with MQTT Broker; cannot subscribe...wait for connection")
time.sleep(0.1)

self.__mqtt.subscribe(topic, self.__qos)

def unsubscribe(self, topic):
logger.debug(f">> topic = {topic}")
self.__mqtt.unsubscribe(topic)

def run(self):
logger.info(f"Broker = {self.__mqtt_broker}>>")
self.__run = True
Expand Down Expand Up @@ -381,16 +457,17 @@ def run(self):
return

else:
logger.info(f"start mqtt loop...")
self.__mqtt.loop_start()
logger.info(f"mqtt loop started...")
logger.debug(f"mqtt loop started...")

# Start infinite loop which sends queued messages every second
while not self.__mqtt_stopper.is_set():
self.__do_mqtt()
time.sleep(0.1)

# Close mqtt broker
logger.debug(f"CLose down MQTT client & connection to broker")
logger.debug(f"Close down MQTT client & connection to broker")
self.__mqtt.loop_stop()
self.__mqtt.disconnect()
self.__mqtt_stopper.set()
Expand Down
2 changes: 1 addition & 1 deletion trannergy-mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""

__version__ = "1.2.5"
__version__ = "1.2.6"
__author__ = "Hans IJntema"
__license__ = "GPLv3"

Expand Down
12 changes: 10 additions & 2 deletions trannergy_tcpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,20 @@ def __socket_connect(self):
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__sock.connect((cfg.INV_IP, cfg.INV_TCPCLIENTPORT))
return
except OSError as e:
logger.error(f"Socket Exception OSError: {type(e).__name__}: {str(e)}; set stopper")
self.__stopper.set()
return

except Exception as e:
logger.info(f"Read SOCKS: {type(e).__name__}: {str(e)}")
time.sleep(30)
counter += 1
logger.info(f"Socket Exception: {type(e).__name__}: {str(e)}; counter = {counter}")
time.sleep(30)

if counter > cfg.INV_MAXRETRIES:
logger.error(f"Socket MAX_RETRIES exceeded. set stopper")
self.__stopper.set()
return

def __request_string(self, ser):
"""
Expand Down

0 comments on commit bda3a7a

Please sign in to comment.