Skip to content

Commit

Permalink
asyncio acaia protocol parser
Browse files Browse the repository at this point in the history
  • Loading branch information
MAKOMO committed Oct 11, 2024
1 parent 366c464 commit c0e9857
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 76 deletions.
120 changes: 48 additions & 72 deletions src/artisanlib/acaia.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
# AUTHOR
# Marko Luther, 2024


import asyncio
import logging
from enum import IntEnum
from typing import Optional, Union, List, Tuple, Final, Any
from typing import Optional, Union, List, Tuple, Final, TYPE_CHECKING

if TYPE_CHECKING:
from bleak.backends.characteristic import BleakGATTCharacteristic # pylint: disable=unused-import


try:
from PyQt6.QtCore import QObject, pyqtSignal # @UnusedImport @Reimport @UnresolvedImport
Expand All @@ -27,6 +31,7 @@


from artisanlib.ble_port import ClientBLE
from artisanlib.async_comm import AsyncIterable, IteratorReader


_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -111,26 +116,22 @@ class AcaiaBLE(ClientBLE):
ACAIA_PYXIS_NAME:Final[str] = 'PYXIS' # Acaia Pyxis

# Acaia message constants
HEADER1:Final[int] = 0xef
HEADER2:Final[int] = 0xdd
HEADER1:Final[bytes] = b'\xef'
HEADER2:Final[bytes] = b'\xdd'

HEARTBEAT_FREQUENCY = 3 # every 3 sec send the heartbeat


# NOTE: __slots__ are incompatible with multiple inheritance mixings in subclasses (as done below in class Acaia with QObject)
# __slots__ = [ 'protocolParseStep', 'protocolParseBuf', 'protocolParseCMD', 'protocolParseDataIndex', 'protocolParseDataLen', 'protocolParseCRC',
# __slots__ = [ '_read_queue', '_input_stream',
# 'id_sent', 'fast_notifications_sent', 'slow_notifications_sent', 'weight', 'battery', 'firmware', 'unit', 'max_weight' ]

def __init__(self) -> None:
super().__init__()

# Protocol parser variables
self.protocolParseStep:PRS = PRS.CHECKHEADER1
self.protocolParseBuf:List[int] = []
self.protocolParseCMD:int = 0
self.protocolParseDataIndex:int = 0
self.protocolParseDataLen:int = 0
self.protocolParseCRC:List[int] = []
self._read_queue : asyncio.Queue[bytes] = asyncio.Queue(maxsize=200)
self._input_stream = IteratorReader(AsyncIterable(self._read_queue))

self.id_sent:bool = False # ID is sent once after first data is received from scale
self.fast_notifications_sent:bool = False # after connect we switch fast notification on to receive first reading fast
Expand Down Expand Up @@ -163,14 +164,6 @@ def __init__(self) -> None:

# protocol parser

def reset_protocol_parser(self) -> None:
self.protocolParseStep = PRS.CHECKHEADER1
self.protocolParseBuf = []
self.protocolParseCRC = []
self.protocolParseCMD = 0
self.protocolParseDataLen = 0
self.protocolParseDataIndex = 0


def reset_readings(self) -> None:
self.weight = None
Expand All @@ -181,7 +174,6 @@ def reset_readings(self) -> None:


def on_connect(self) -> None:
self.reset_protocol_parser()
self.reset_readings()
self.id_sent = False
self.fast_notifications_sent = False
Expand All @@ -200,51 +192,6 @@ def on_disconnect(self) -> None:
self._disconnected_handler()


def protocol_parser(self, dataIn:bytes) -> None:
for c_in in dataIn:
if self.protocolParseStep == PRS.CHECKHEADER1:
if c_in == self.HEADER1:
self.protocolParseStep = PRS.CHECKHEADER2
elif self.protocolParseStep == PRS.CHECKHEADER2:
if c_in == self.HEADER2:
self.protocolParseStep = PRS.CMDID
else:
self.reset_protocol_parser()
elif self.protocolParseStep == PRS.CMDID:
self.protocolParseCMD = c_in
# In these commands the data len is determined by the next byte, so assign 255
if self.protocolParseCMD == CMD.SYSTEM_SA:
self.protocolParseDataLen = 255
elif self.protocolParseCMD == CMD.INFO_A:
self.protocolParseDataLen = 255
elif self.protocolParseCMD == CMD.STATUS_A:
self.protocolParseDataLen = 255
elif self.protocolParseCMD == CMD.EVENT_SA:
self.protocolParseDataLen = 255
self.protocolParseStep = PRS.CMDDATA
elif self.protocolParseStep == PRS.CMDDATA:
if self.protocolParseDataIndex == 0 and self.protocolParseDataLen == 255:
self.protocolParseDataLen = c_in
self.protocolParseBuf.append(c_in)
self.protocolParseDataIndex+=1
if self.protocolParseDataIndex == self.protocolParseDataLen:
self.protocolParseStep = PRS.CHECKSUM1
if self.protocolParseDataIndex > 20:
self.reset_protocol_parser()
elif self.protocolParseStep == PRS.CHECKSUM1:
self.protocolParseCRC.append(c_in)
self.protocolParseStep = PRS.CHECKSUM2
elif self.protocolParseStep == PRS.CHECKSUM2:
self.protocolParseCRC.append(c_in)
cal_crc=self.crc(self.protocolParseBuf)
if cal_crc[0] == self.protocolParseCRC[0] and cal_crc[1] == self.protocolParseCRC[1]:
msgType = self.protocolParseCMD
data = self.protocolParseBuf[:] # copy buffer
self.reset_protocol_parser() # reset buffer already before parseScaleData() as it might hang
# when protocol parsing success, call original data parser
self.parse_data(msgType, bytes(data))
self.reset_protocol_parser()

##


Expand Down Expand Up @@ -427,7 +374,8 @@ def crc(payload:Union[bytes,List[int]]) -> bytes:

# constructs message bytearray of the given type (int) and payload (bytearray) by adding headers and CRCs
def message(self, tp:int, payload:bytes) -> bytes:
return bytes([self.HEADER1,self.HEADER2,tp]) + payload + self.crc(payload)
# return bytes([self.HEADER1,self.HEADER2,tp]) + payload + self.crc(payload)
return self.HEADER1 + self.HEADER2 + tp.to_bytes(1, 'big') + payload + self.crc(payload)

def send_message(self, tp:int, payload:bytes) -> None:
self.send(self.message(tp, payload))
Expand Down Expand Up @@ -505,19 +453,47 @@ def fast_notifications(self) -> None:
###


def notify_callback(self, _characteristic:Any, data:bytearray) -> None:
_log.debug('notify: %s', data)
if self._logging:
_log.info('received: %s',data)
self.protocol_parser(bytes(data))
def notify_callback(self, _sender:'BleakGATTCharacteristic', data:bytearray) -> None:
if self._async_loop_thread is not None:
asyncio.run_coroutine_threadsafe(
self._read_queue.put(bytes(data)),
self._async_loop_thread.loop)


# EX: d = b'\xef\xdd\x07\x07\x02\x14\x02<\x14\x00W\x18\xef\xdd\x07' (len: 12)
# 2 header: d[0:2] = b'\xef\xdd'
# 1 cmd: d[2] = b'\x07' => INFO
# 1 data_len: d[3] = b'\x07'
# 6 data: d[4:10] = b'\x02\x14\x02<\x14\x00'
# 2 crc: d[10:12] = b'\x00W\x18' # calculated over "data_len+data"

async def reader(self) -> None:
while True:
await self._input_stream.readuntil(self.HEADER1)
if await self._input_stream.readexactly(1) == self.HEADER2:
cmd = int.from_bytes(await self._input_stream.readexactly(1), 'big')
if cmd in {CMD.SYSTEM_SA, CMD.INFO_A, CMD.STATUS_A, CMD.EVENT_SA}:
dl = await self._input_stream.readexactly(1)
data_len:int = min(20, int.from_bytes(dl, 'big'))
data = await self._input_stream.readexactly(data_len - 1)
crc = await self._input_stream.readexactly(2)
data = dl+data
if crc == self.crc(data):
self.parse_data(cmd, data)

## to be overwritten by subclass:

def on_start(self) -> None:
if self._async_loop_thread is not None:
# start the reader
asyncio.run_coroutine_threadsafe(
self.reader(),
self._async_loop_thread.loop)


def weight_changed(self, new_value:int) -> None: # pylint: disable=no-self-use
del new_value


def battery_changed(self, new_value:int) -> None: # pylint: disable=no-self-use
del new_value

Expand Down
76 changes: 73 additions & 3 deletions src/artisanlib/async_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from contextlib import suppress
from threading import Thread
from pymodbus.transport.serialtransport import create_serial_connection # patched pyserial-asyncio
from typing import Final, Optional, Union, Tuple, Callable, TYPE_CHECKING
from typing import Final, Optional, Union, Tuple, Callable, AsyncIterator, TYPE_CHECKING

if TYPE_CHECKING:
from artisanlib.types import SerialSettings # pylint: disable=unused-import
Expand Down Expand Up @@ -66,6 +66,76 @@ def loop(self) -> asyncio.AbstractEventLoop:
return self.__loop


class AsyncIterable:

_queue: asyncio.Queue[bytes]

def __init__(self, queue:asyncio.Queue[bytes]) -> None:
self._queue = queue

def __aiter__(self) -> 'AsyncIterable':
return self

async def __anext__(self) -> bytes:
return await self._queue.get()


class IteratorReader:

_chunks: AsyncIterator[bytes]
_backlog: bytes

def __init__(self, chunks: AsyncIterator[bytes]):
self._chunks = chunks
self._backlog = b''

async def _read_until_end(self) -> bytes:

content = self._backlog
self._backlog = b''

while True:
try:
content += await self._chunks.__anext__()
except StopAsyncIteration:
break

return content

async def _read_chunk(self, size: int) -> bytes:

content = self._backlog
bytes_read = len(self._backlog)

while bytes_read < size:

try:
chunk = await self._chunks.__anext__()
except StopAsyncIteration:
break

content += chunk
bytes_read += len(chunk)

self._backlog = content[size:]
return content[:size]

async def readexactly(self, size: int = -1) -> bytes:
if size > 0:
return await self._read_chunk(size)
if size == -1:
return await self._read_until_end()
return b''

async def readuntil(self, separator:bytes = b'\n') -> bytes:
if len(separator) != 0:
while True:
next_char = await self.readexactly(len(separator))
if next_char == separator:
break
return separator


class AsyncComm:

__slots__ = [ '_asyncLoopThread', '_write_queue', '_host', '_port', '_serial', '_connected_handler', '_disconnected_handler',
Expand Down Expand Up @@ -182,7 +252,7 @@ async def handle_writes(self, writer: asyncio.StreamWriter, queue: 'asyncio.Queu
await writer.drain()

# if serial settings are given, host/port are ignore and communication is handled by the given serial port
async def connect(self, connect_timeout:float=2) -> None:
async def connect(self, connect_timeout:float=5) -> None:
writer = None
while True:
try:
Expand Down Expand Up @@ -246,7 +316,7 @@ def send(self, message:bytes) -> None:

# start/stop sample thread

def start(self, connect_timeout:float=2) -> None:
def start(self, connect_timeout:float=5) -> None:
try:
_log.debug('start sampling')
if self._asyncLoopThread is None:
Expand Down
2 changes: 1 addition & 1 deletion wiki/ReleaseHistory.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ v3.0.3
- adds optional delay after connect before sending requests to serial MODBUS to allow to wait for Arduino slaves to complete reboot ([Issue #1694](../../../issues/1694))
- upgrades MODBUS communication from sync to async IO
- limits the visible length of long popup lists in Devices and Statistics Dialog
- keep Hottop connected after OFF in control mode to avoid the shutdown on disconnect ([Issue #1714](../../../issues/1714))
- keep Hottop connected after OFF in control mode to prevent the shutdown on disconnect ([Issue #1714](../../../issues/1714))


* FIXES
Expand Down

0 comments on commit c0e9857

Please sign in to comment.