From c746c589351377761ca58570af408aba3df772d4 Mon Sep 17 00:00:00 2001 From: Mips2648 Date: Thu, 19 Dec 2024 16:34:07 +0100 Subject: [PATCH] fix memory leak in jeedom_com & small code quality improvment --- resources/demond/demond.py | 110 +++--- resources/demond/jeedom/jeedom.py | 567 ++++++++++++++---------------- 2 files changed, 324 insertions(+), 353 deletions(-) diff --git a/resources/demond/demond.py b/resources/demond/demond.py index 7e8709f..491ebac 100644 --- a/resources/demond/demond.py +++ b/resources/demond/demond.py @@ -14,71 +14,62 @@ # along with Jeedom. If not, see . import logging -import string import sys import os import time -import datetime import traceback -import re import signal -from optparse import OptionParser -from os.path import join import json import argparse -try: - from jeedom.jeedom import * -except ImportError: - print("Error: importing module jeedom.jeedom") - sys.exit(1) +from jeedom.jeedom import jeedom_socket, jeedom_utils, jeedom_serial, jeedom_com, JEEDOM_SOCKET_MESSAGE def read_socket(): - global JEEDOM_SOCKET_MESSAGE - if not JEEDOM_SOCKET_MESSAGE.empty(): - logging.debug("Message received in socket JEEDOM_SOCKET_MESSAGE") - message = json.loads(jeedom_utils.stripped(JEEDOM_SOCKET_MESSAGE.get())) - if message['apikey'] != _apikey: - logging.error("Invalid apikey from socket: %s", message) - return - try: - print ('read') - except Exception as e: - logging.error('Send command to demon error: %s' ,e) + global JEEDOM_SOCKET_MESSAGE + if not JEEDOM_SOCKET_MESSAGE.empty(): + logging.debug("Message received in socket JEEDOM_SOCKET_MESSAGE") + message = json.loads(jeedom_utils.stripped(JEEDOM_SOCKET_MESSAGE.get())) + if message['apikey'] != _apikey: + logging.error("Invalid apikey from socket: %s", message) + return + try: + print ('read') + except Exception as e: + logging.error('Send command to demon error: %s' ,e) def listen(): - jeedom_socket.open() - try: - while 1: - time.sleep(0.5) - read_socket() - except KeyboardInterrupt: - shutdown() + my_jeedom_socket.open() + try: + while 1: + time.sleep(0.5) + read_socket() + except KeyboardInterrupt: + shutdown() # ---------------------------------------------------------------------------- def handler(signum=None, frame=None): - logging.debug("Signal %i caught, exiting...", int(signum)) - shutdown() + logging.debug("Signal %i caught, exiting...", int(signum)) + shutdown() def shutdown(): - logging.debug("Shutdown") - logging.debug("Removing PID file %s", _pidfile) - try: - os.remove(_pidfile) - except: - pass - try: - jeedom_socket.close() - except: - pass - try: - jeedom_serial.close() - except: - pass - logging.debug("Exit 0") - sys.stdout.flush() - os._exit(0) + logging.debug("Shutdown") + logging.debug("Removing PID file %s", _pidfile) + try: + os.remove(_pidfile) + except: + pass + try: + my_jeedom_socket.close() + except: + pass + # try: # if you need jeedom_serial + # my_jeedom_serial.close() + # except: + # pass + logging.debug("Exit 0") + sys.stdout.flush() + os._exit(0) # ---------------------------------------------------------------------------- @@ -97,13 +88,13 @@ def shutdown(): parser.add_argument("--loglevel", help="Log Level for the daemon", type=str) parser.add_argument("--callback", help="Callback", type=str) parser.add_argument("--apikey", help="Apikey", type=str) -parser.add_argument("--cycle", help="Cycle to send event", type=str) +parser.add_argument("--cycle", help="Cycle to send event", type=float) parser.add_argument("--pid", help="Pid file", type=str) -parser.add_argument("--socketport", help="Port for Zigbee server", type=str) +parser.add_argument("--socketport", help="Port for socket server", type=int) args = parser.parse_args() if args.device: - _device = args.device + _device = args.device if args.loglevel: _log_level = args.loglevel if args.callback: @@ -115,7 +106,7 @@ def shutdown(): if args.cycle: _cycle = float(args.cycle) if args.socketport: - _socketport = args.socketport + _socket_port = args.socketport _socket_port = int(_socket_port) @@ -133,10 +124,15 @@ def shutdown(): signal.signal(signal.SIGTERM, handler) try: - jeedom_utils.write_pid(str(_pidfile)) - jeedom_socket = jeedom_socket(port=_socket_port,address=_socket_host) - listen() + jeedom_utils.write_pid(str(_pidfile)) + my_jeedom_com = jeedom_com(apikey=_apikey, url=_callback, cycle=_cycle) + if not my_jeedom_com.test(): + logging.error('Network communication issues. Please fixe your Jeedom network configuration.') + shutdown() + # my_jeedom_serial = jeedom_serial(device=_device) # if you need jeedom_serial + my_jeedom_socket = jeedom_socket(port=_socket_port,address=_socket_host) + listen() except Exception as e: - logging.error('Fatal error: %s', e) - logging.info(traceback.format_exc()) - shutdown() + logging.error('Fatal error: %s', e) + logging.info(traceback.format_exc()) + shutdown() diff --git a/resources/demond/jeedom/jeedom.py b/resources/demond/jeedom/jeedom.py index 2a7644d..1dd6600 100644 --- a/resources/demond/jeedom/jeedom.py +++ b/resources/demond/jeedom/jeedom.py @@ -16,333 +16,308 @@ import time import logging -import threading +from threading import Thread import requests -import datetime -try: - from collections.abc import Mapping -except ImportError: - from collections import Mapping +from collections.abc import Mapping import serial import os -from os.path import join -import socket from queue import Queue import socketserver from socketserver import (TCPServer, StreamRequestHandler) -import signal import unicodedata import pyudev # ------------------------------------------------------------------------------ class jeedom_com(): - def __init__(self,apikey = '',url = '',cycle = 0.5,retry = 3): - self.apikey = apikey - self.url = url - self.cycle = cycle - self.retry = retry - self.changes = {} - if cycle > 0 : - self.send_changes_async() - logging.info('Init request module v%s', requests.__version__) - - def send_changes_async(self): - try: - if len(self.changes) == 0: - resend_changes = threading.Timer(self.cycle, self.send_changes_async) - resend_changes.start() - return - start_time = datetime.datetime.now() - changes = self.changes - self.changes = {} - logging.info('Send to jeedom: %s', changes) - i=0 - while i < self.retry: - try: - r = requests.post(self.url + '?apikey=' + self.apikey, json=changes, timeout=(0.5, 120), verify=False) - if r.status_code == requests.codes.ok: - break - except Exception as error: - logging.error('Error on send request to jeedom %s retry : %i/%i', error, i, self.retry) - i = i + 1 - if r.status_code != requests.codes.ok: - logging.error('Error on send request to jeedom, return code %i', r.status_code) - dt = datetime.datetime.now() - start_time - ms = (dt.days * 24 * 60 * 60 + dt.seconds) * 1000 + dt.microseconds / 1000.0 - timer_duration = self.cycle - ms - if timer_duration < 0.1 : - timer_duration = 0.1 - if timer_duration > self.cycle: - timer_duration = self.cycle - resend_changes = threading.Timer(timer_duration, self.send_changes_async) - resend_changes.start() - except Exception as error: - logging.error('Critical error on send_changes_async %s', error) - resend_changes = threading.Timer(self.cycle, self.send_changes_async) - resend_changes.start() - - def add_changes(self,key,value): - if key.find('::') != -1: - tmp_changes = {} - changes = value - for k in reversed(key.split('::')): - if k not in tmp_changes: - tmp_changes[k] = {} - tmp_changes[k] = changes - changes = tmp_changes - tmp_changes = {} - if self.cycle <= 0: - self.send_change_immediate(changes) - else: - self.merge_dict(self.changes,changes) - else: - if self.cycle <= 0: - self.send_change_immediate({key:value}) - else: - self.changes[key] = value - - def send_change_immediate(self,change): - threading.Thread( target=self.thread_change,args=(change,)).start() - - def thread_change(self,change): - logging.info('Send to jeedom : %s', change) - i=0 - while i < self.retry: - try: - r = requests.post(self.url + '?apikey=' + self.apikey, json=change, timeout=(0.5, 120), verify=False) - if r.status_code == requests.codes.ok: - break - except Exception as error: - logging.error('Error on send request to jeedom %s retry : %i/%i', error, i, self.retry) - i = i + 1 - - def set_change(self,changes): - self.changes = changes - - def get_change(self): - return self.changes - - def merge_dict(self,d1, d2): - for k,v2 in d2.items(): - v1 = d1.get(k) # returns None if v1 has no value for this key - if isinstance(v1, Mapping) and isinstance(v2, Mapping): - self.merge_dict(v1, v2) - else: - d1[k] = v2 - - def test(self): - try: - response = requests.get(self.url + '?apikey=' + self.apikey, verify=False) - if response.status_code != requests.codes.ok: - logging.error('Callback error: %s %s. Please check your network configuration page', response.status_code, response.reason) - return False - except Exception as e: - logging.error('Callback result as a unknown error: %s. Please check your network configuration page', e.message) - return False - return True + def __init__(self,apikey = '',url = '',cycle = 0.5,retry = 3): + self._apikey = apikey + self._url = url + self._cycle = cycle + self._retry = retry + self._changes = {} + if self._cycle > 0: + Thread(target=self.__thread_changes_async, daemon=True).start() + logging.info('Init request module v%s', requests.__version__) + + def __thread_changes_async(self): + if self._cycle <= 0: + return + logging.info('Start changes async thread') + while True: + try: + time.sleep(self._cycle) + if len(self._changes) == 0: + continue + changes = self._changes + self._changes = {} + self.__post_change(changes) + except Exception as error: + logging.error('Critical error on send_changes_async %s', error) + + def add_changes(self, key: str, value): + if key.find('::') != -1: + tmp_changes = {} + changes = value + for k in reversed(key.split('::')): + if k not in tmp_changes: + tmp_changes[k] = {} + tmp_changes[k] = changes + changes = tmp_changes + tmp_changes = {} + if self._cycle <= 0: + self.send_change_immediate(changes) + else: + self.merge_dict(self._changes, changes) + else: + if self._cycle <= 0: + self.send_change_immediate({key: value}) + else: + self._changes[key] = value + + def send_change_immediate(self, change): + Thread(target=self.__post_change, args=(change,)).start() + + def __post_change(self, change): + logging.debug('Send to jeedom: %s', change) + for i in range(self._retry): + try: + r = requests.post(self._url + '?apikey=' + self._apikey, json=change, timeout=(0.5, 120), verify=False) + if r.status_code == requests.codes.ok: + return True + else: + logging.warning('Error on send request to jeedom, return code %s', r.status_code) + time.sleep(0.5) + except Exception as error: + logging.error('Error on send request to jeedom "%s" retry: %i/%i', error, i, self._retry) + return False + + def set_change(self,changes): + self._changes = changes + + def get_change(self): + return self._changes + + def merge_dict(self,d1, d2): + for k,v2 in d2.items(): + v1 = d1.get(k) # returns None if v1 has no value for this key + if isinstance(v1, Mapping) and isinstance(v2, Mapping): + self.merge_dict(v1, v2) + else: + d1[k] = v2 + + def test(self): + try: + response = requests.get(self._url + '?apikey=' + self._apikey, verify=False) + if response.status_code != requests.codes.ok: + logging.error('Callback error: %s %s. Please check your network configuration page', response.status_code, response.reason) + return False + except Exception as e: + logging.error('Callback result as a unknown error: %s. Please check your network configuration page', e) + return False + return True # ------------------------------------------------------------------------------ class jeedom_utils(): - @staticmethod - def convert_log_level(level = 'error'): - LEVELS = {'debug': logging.DEBUG, - 'info': logging.INFO, - 'notice': logging.WARNING, - 'warning': logging.WARNING, - 'error': logging.ERROR, - 'critical': logging.CRITICAL, - 'none': logging.CRITICAL} - return LEVELS.get(level, logging.CRITICAL) - - @staticmethod - def set_log_level(level = 'error'): - FORMAT = '[%(asctime)-15s][%(levelname)s] : %(message)s' - logging.basicConfig(level=jeedom_utils.convert_log_level(level),format=FORMAT, datefmt="%Y-%m-%d %H:%M:%S") - - @staticmethod - def find_tty_usb(idVendor, idProduct, product = None): - context = pyudev.Context() - for device in context.list_devices(subsystem='tty'): - if 'ID_VENDOR' not in device: - continue - if device['ID_VENDOR_ID'] != idVendor: - continue - if device['ID_MODEL_ID'] != idProduct: - continue - if product is not None: - if 'ID_VENDOR' not in device or device['ID_VENDOR'].lower().find(product.lower()) == -1 : - continue - return str(device.device_node) - return None - - @staticmethod - def stripped(str): - return "".join([i for i in str if i in range(32, 127)]) - - @staticmethod - def ByteToHex( byteStr ): - return byteStr.hex() - - @staticmethod - def dec2bin(x, width=8): - return ''.join(str((x>>i)&1) for i in xrange(width-1,-1,-1)) - - @staticmethod - def dec2hex(dec): - if dec is None: - return '0x00' - return "0x{:02X}".format(dec) - - @staticmethod - def testBit(int_type, offset): - mask = 1 << offset - return(int_type & mask) - - @staticmethod - def clearBit(int_type, offset): - mask = ~(1 << offset) - return(int_type & mask) - - @staticmethod - def split_len(seq, length): - return [seq[i:i+length] for i in range(0, len(seq), length)] - - @staticmethod - def write_pid(path): - pid = str(os.getpid()) - logging.info("Writing PID %s to %s", pid, path) - open(path, 'w').write("%s\n" % pid) - - @staticmethod - def remove_accents(input_str): - nkfd_form = unicodedata.normalize('NFKD', unicode(input_str)) - return u"".join([c for c in nkfd_form if not unicodedata.combining(c)]) - - @staticmethod - def printHex(hex): - return ' '.join([hex[i:i + 2] for i in range(0, len(hex), 2)]) + @staticmethod + def convert_log_level(level = 'error'): + LEVELS = {'debug': logging.DEBUG, + 'info': logging.INFO, + 'notice': logging.WARNING, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL, + 'none': logging.CRITICAL} + return LEVELS.get(level, logging.CRITICAL) + + @staticmethod + def set_log_level(level = 'error'): + FORMAT = '[%(asctime)-15s][%(levelname)s] : %(message)s' + logging.basicConfig(level=jeedom_utils.convert_log_level(level),format=FORMAT, datefmt="%Y-%m-%d %H:%M:%S") + + @staticmethod + def find_tty_usb(idVendor, idProduct, product = None): + context = pyudev.Context() + for device in context.list_devices(subsystem='tty'): + if 'ID_VENDOR' not in device: + continue + if device['ID_VENDOR_ID'] != idVendor: + continue + if device['ID_MODEL_ID'] != idProduct: + continue + if product is not None: + if 'ID_VENDOR' not in device or device['ID_VENDOR'].lower().find(product.lower()) == -1 : + continue + return str(device.device_node) + return None + + @staticmethod + def stripped(str): + return "".join([i for i in str if i in range(32, 127)]) + + @staticmethod + def ByteToHex( byteStr ): + return byteStr.hex() + + @staticmethod + def dec2bin(x, width=8): + return ''.join(str((x>>i)&1) for i in range(width-1,-1,-1)) + + @staticmethod + def dec2hex(dec): + if dec is None: + return '0x00' + return "0x{:02X}".format(dec) + + @staticmethod + def testBit(int_type, offset): + mask = 1 << offset + return(int_type & mask) + + @staticmethod + def clearBit(int_type, offset): + mask = ~(1 << offset) + return(int_type & mask) + + @staticmethod + def split_len(seq, length): + return [seq[i:i+length] for i in range(0, len(seq), length)] + + @staticmethod + def write_pid(path): + pid = str(os.getpid()) + logging.info("Writing PID %s to %s", pid, path) + open(path, 'w').write("%s\n" % pid) + + @staticmethod + def remove_accents(input_str: str): + nkfd_form = unicodedata.normalize('NFKD', input_str) + return u"".join([c for c in nkfd_form if not unicodedata.combining(c)]) + + @staticmethod + def printHex(hex): + return ' '.join([hex[i:i + 2] for i in range(0, len(hex), 2)]) # ------------------------------------------------------------------------------ class jeedom_serial(): - def __init__(self,device = '',rate = '',timeout = 9,rtscts = True,xonxoff=False): - self.device = device - self.rate = rate - self.timeout = timeout - self.port = None - self.rtscts = rtscts - self.xonxoff = xonxoff - logging.info('Init serial module v%s', serial.VERSION) - - def open(self): - if self.device: - logging.info("Open serial port on device: %s, rate %s, timeout: %i", self.device, self.rate, self.timeout) - else: - logging.error("Device name missing.") - return False - logging.info("Open Serialport") - try: - self.port = serial.Serial( - self.device, - self.rate, - timeout=self.timeout, - rtscts=self.rtscts, - xonxoff=self.xonxoff, - parity=serial.PARITY_NONE, - stopbits=serial.STOPBITS_ONE - ) - except serial.SerialException as e: - logging.error("Error: Failed to connect on device %s. Details : %s", self.device, e) - return False - if not self.port.isOpen(): - self.port.open() - self.flushOutput() - self.flushInput() - return True - - def close(self): - logging.info("Close serial port") - try: - self.port.close() - logging.info("Serial port closed") - return True - except: - logging.error("Failed to close the serial port (%s)", self.device) - return False - - def write(self,data): - logging.info("Write data to serial port: %s", str(jeedom_utils.ByteToHex(data))) - self.port.write(data) - - def flushOutput(self,): - logging.info("flushOutput serial port ") - self.port.flushOutput() - - def flushInput(self): - logging.info("flushInput serial port ") - self.port.flushInput() - - def read(self): - if self.port.inWaiting() != 0: - return self.port.read() - return None - - def readbytes(self,number): - buf = b'' - for i in range(number): - try: - byte = self.port.read() - except IOError as e: - logging.error("Error: %s", e) - except OSError as e: - logging.error("Error: %s", e) - buf += byte - return buf + def __init__(self,device = '',rate = '',timeout = 9,rtscts = True,xonxoff=False): + self.device = device + self.rate = rate + self.timeout = timeout + self.port = None + self.rtscts = rtscts + self.xonxoff = xonxoff + logging.info('Init serial module v%s', serial.VERSION) + + def open(self): + if self.device: + logging.info("Open serial port on device: %s, rate %s, timeout: %i", self.device, self.rate, self.timeout) + else: + logging.error("Device name missing.") + return False + logging.info("Open Serialport") + try: + self.port = serial.Serial( + self.device, + self.rate, + timeout=self.timeout, + rtscts=self.rtscts, + xonxoff=self.xonxoff, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE + ) + except serial.SerialException as e: + logging.error("Error: Failed to connect on device %s. Details : %s", self.device, e) + return False + if not self.port.isOpen(): + self.port.open() + self.flushOutput() + self.flushInput() + return True + + def close(self): + logging.info("Close serial port") + try: + self.port.close() + logging.info("Serial port closed") + return True + except: + logging.error("Failed to close the serial port (%s)", self.device) + return False + + def write(self,data): + logging.info("Write data to serial port: %s", str(jeedom_utils.ByteToHex(data))) + self.port.write(data) + + def flushOutput(self,): + logging.info("flushOutput serial port ") + self.port.flushOutput() + + def flushInput(self): + logging.info("flushInput serial port ") + self.port.flushInput() + + def read(self): + if self.port.inWaiting() != 0: + return self.port.read() + return None + + def readbytes(self,number): + buf = b'' + for i in range(number): + try: + byte = self.port.read() + except IOError as e: + logging.error("Error: %s", e) + except OSError as e: + logging.error("Error: %s", e) + buf += byte + return buf # ------------------------------------------------------------------------------ JEEDOM_SOCKET_MESSAGE = Queue() class jeedom_socket_handler(StreamRequestHandler): - def handle(self): - global JEEDOM_SOCKET_MESSAGE - logging.info("Client connected to [%s:%d]", self.client_address[0], self.client_address[1]) - lg = self.rfile.readline() - JEEDOM_SOCKET_MESSAGE.put(lg) - logging.info("Message read from socket: %s", str(lg.strip())) - self.netAdapterClientConnected = False - logging.info("Client disconnected from [%s:%d]", self.client_address[0], self.client_address[1]) + def handle(self): + global JEEDOM_SOCKET_MESSAGE + logging.info("Client connected to [%s:%d]", self.client_address[0], self.client_address[1]) + lg = self.rfile.readline() + JEEDOM_SOCKET_MESSAGE.put(lg) + logging.info("Message read from socket: %s", str(lg.strip())) + self.netAdapterClientConnected = False + logging.info("Client disconnected from [%s:%d]", self.client_address[0], self.client_address[1]) class jeedom_socket(): - def __init__(self,address='localhost', port=55000): - self.address = address - self.port = port - socketserver.TCPServer.allow_reuse_address = True - - def open(self): - self.netAdapter = TCPServer((self.address, self.port), jeedom_socket_handler) - if self.netAdapter: - logging.info("Socket interface started") - threading.Thread(target=self.loopNetServer, args=()).start() - else: - logging.info("Cannot start socket interface") - - def loopNetServer(self): - logging.info("LoopNetServer Thread started") - logging.info("Listening on: [%s:%d]", self.address, self.port) - self.netAdapter.serve_forever() - logging.info("LoopNetServer Thread stopped") - - def close(self): - self.netAdapter.shutdown() - - def getMessage(self): - return self.message + def __init__(self,address='localhost', port=55000): + self.address = address + self.port = port + socketserver.TCPServer.allow_reuse_address = True + + def open(self): + self.netAdapter = TCPServer((self.address, self.port), jeedom_socket_handler) + if self.netAdapter: + logging.info("Socket interface started") + Thread(target=self.loopNetServer).start() + else: + logging.info("Cannot start socket interface") + + def loopNetServer(self): + logging.info("LoopNetServer Thread started") + logging.info("Listening on: [%s:%d]", self.address, self.port) + self.netAdapter.serve_forever() + logging.info("LoopNetServer Thread stopped") + + def close(self): + self.netAdapter.shutdown() + + def getMessage(self): + return self.message # ------------------------------------------------------------------------------ # END