From 8e2f75a81a47e09d14d02901bccf625f704b9265 Mon Sep 17 00:00:00 2001 From: Tucker Kern Date: Fri, 30 Aug 2024 16:39:43 -0600 Subject: [PATCH] Add alternate energy/power response parsing (#172) * Parse energy responses as binary and BCD data * Add binary energy data testcases --- msmart/device/AC/command.py | 61 ++++++++++++++++++++++---------- msmart/device/AC/device.py | 15 ++++++-- msmart/device/AC/test_command.py | 23 ++++++++++++ msmart/device/AC/test_device.py | 33 +++++++++++++++++ 4 files changed, 110 insertions(+), 22 deletions(-) diff --git a/msmart/device/AC/command.py b/msmart/device/AC/command.py index 0a7396e..8040108 100644 --- a/msmart/device/AC/command.py +++ b/msmart/device/AC/command.py @@ -5,7 +5,7 @@ import struct from collections import namedtuple from enum import IntEnum -from typing import Any, Callable, Collection, Mapping, Optional, Union +from typing import Any, Callable, Collection, Mapping, Optional, Tuple, Union import msmart.crc8 as crc8 from msmart.const import DeviceType, FrameType @@ -1004,6 +1004,10 @@ def __init__(self, payload: memoryview) -> None: self.current_energy = None self.real_time_power = None + self.total_energy_binary = None + self.current_energy_binary = None + self.real_time_power_binary = None + _LOGGER.debug("Energy response payload: %s", payload.hex()) self._parse(payload) @@ -1015,35 +1019,54 @@ def _parse(self, payload: memoryview) -> None: def decode_bcd(d: int) -> int: return 10 * (d >> 4) + (d & 0xF) + def parse_energy(d: bytes) -> Tuple[float, float]: + bcd = (10000 * decode_bcd(d[0]) + + 100 * decode_bcd(d[1]) + + 1 * decode_bcd(d[2]) + + 0.01 * decode_bcd(d[3])) + binary = ((d[0] << 24) + + (d[1] << 16) + + (d[2] << 8) + + d[3]) / 10 + return bcd, binary + + def parse_power(d: bytes) -> Tuple[float, float]: + bcd = (1000 * decode_bcd(d[0]) + + 10 * decode_bcd(d[1]) + + 0.1 * decode_bcd(d[2])) + binary = ((d[0] << 16) + + (d[1] << 8) + + d[2]) / 10 + return bcd, binary + # Lua reference decodes real time power field in BCD and binary form # JS reference decodes multiple energy/power fields in BCD only. - # Total energy in bytes 4 - 8 - total_energy = (10000 * decode_bcd(payload[4]) + - 100 * decode_bcd(payload[5]) + - 1 * decode_bcd(payload[6]) + - 0.01 * decode_bcd(payload[7])) + # Total energy in bytes 4 - 7 + total_energy_bcd, total_energy_binary = parse_energy( + payload[4:8]) # JS references decodes bytes 8 - 11 as "total running energy" # Older JS does not decode these bytes, and sample payloads contain bogus data - # Current run energy consumption bytes 12 - 16 - current_energy = (10000 * decode_bcd(payload[12]) + - 100 * decode_bcd(payload[13]) + - 1 * decode_bcd(payload[14]) + - 0.01 * decode_bcd(payload[15])) + # Current run energy consumption bytes 12 - 15 + current_energy_bcd, current_energy_binary = parse_energy( + payload[12:16]) # Real time power usage bytes 16 - 18 - real_time_power = (1000 * decode_bcd(payload[16]) + - 10 * decode_bcd(payload[17]) + - 0.1 * decode_bcd(payload[18])) + real_time_power_bcd, real_time_power_binary = parse_power( + payload[16:19]) + + # Assume energy monitory is valid if at least one stat is non zero + valid = total_energy_bcd or current_energy_bcd or real_time_power_bcd - # Assume energy monitory is valid if at least one stats is non zero - valid = total_energy or current_energy or real_time_power + self.total_energy = total_energy_bcd if valid else None + self.current_energy = current_energy_bcd if valid else None + self.real_time_power = real_time_power_bcd if valid else None - self.total_energy = total_energy if valid else None - self.current_energy = current_energy if valid else None - self.real_time_power = real_time_power if valid else None + self.total_energy_binary = total_energy_binary if valid else None + self.current_energy_binary = current_energy_binary if valid else None + self.real_time_power_binary = real_time_power_binary if valid else None class HumidityResponse(Response): diff --git a/msmart/device/AC/device.py b/msmart/device/AC/device.py index 24e7a33..1155403 100644 --- a/msmart/device/AC/device.py +++ b/msmart/device/AC/device.py @@ -145,6 +145,7 @@ def __init__(self, ip: str, device_id: int, port: int, **kwargs) -> None: self._total_energy_usage = None self._current_energy_usage = None self._real_time_power_usage = None + self._use_binary_energy = False # Default to assuming device can't handle any properties self._supported_properties = set() @@ -242,9 +243,9 @@ def _update_state(self, res: Response) -> None: self._ieco = value elif isinstance(res, EnergyUsageResponse): - self._total_energy_usage = res.total_energy - self._current_energy_usage = res.current_energy - self._real_time_power_usage = res.real_time_power + self._total_energy_usage = res.total_energy_binary if self._use_binary_energy else res.total_energy + self._current_energy_usage = res.current_energy_binary if self._use_binary_energy else res.current_energy + self._real_time_power_usage = res.real_time_power_binary if self._use_binary_energy else res.real_time_power elif isinstance(res, HumidityResponse): self._indoor_humidity = res.humidity @@ -827,6 +828,14 @@ def supports_filter_reminder(self) -> bool: def filter_alert(self) -> Optional[bool]: return self._filter_alert + @property + def use_alternate_energy_format(self) -> bool: + return self._use_binary_energy + + @use_alternate_energy_format.setter + def use_alternate_energy_format(self, enable: bool) -> None: + self._use_binary_energy = enable + @property def enable_energy_usage_requests(self) -> bool: return self._request_energy_usage diff --git a/msmart/device/AC/test_command.py b/msmart/device/AC/test_command.py index 4786141..28eacb1 100644 --- a/msmart/device/AC/test_command.py +++ b/msmart/device/AC/test_command.py @@ -840,6 +840,29 @@ def test_energy_usage(self) -> None: self.assertEqual(resp.current_energy, current) self.assertEqual(resp.real_time_power, real_time) + def test_binary_energy_usage(self) -> None: + """Test we decode binary energy usage responses correctly.""" + TEST_RESPONSES = { + # https://github.com/mill1000/midea-ac-py/issues/204#issuecomment-2314705021 + (150.4, .6, 279.5): bytes.fromhex("aa22ac00000000000803c1210144000005e00000000000000006000aeb000000487a5e"), + + # https://github.com/mill1000/midea-msmart/pull/116#issuecomment-2218753545 + (None, None, None): bytes.fromhex("aa20ac00000000000303c1210144000000000000000000000000000000000843bc"), + } + + for power, response in TEST_RESPONSES.items(): + resp = self._test_build_response(response) + + # Assert response is a correct type + self.assertEqual(type(resp), EnergyUsageResponse) + resp = cast(EnergyUsageResponse, resp) + + total, current, real_time = power + + self.assertEqual(resp.total_energy_binary, total) + self.assertEqual(resp.current_energy_binary, current) + self.assertEqual(resp.real_time_power_binary, real_time) + def test_humidity(self) -> None: """Test we decode humidity responses correctly.""" TEST_RESPONSES = { diff --git a/msmart/device/AC/test_device.py b/msmart/device/AC/test_device.py index 7bf576e..dc82018 100644 --- a/msmart/device/AC/test_device.py +++ b/msmart/device/AC/test_device.py @@ -293,6 +293,39 @@ def test_energy_usage_response(self) -> None: self.assertEqual(device.current_energy_usage, current) self.assertEqual(device.real_time_power_usage, real_time) + def test_binary_energy_usage_response(self) -> None: + """Test parsing of EnergyUsageResponses into device state with binary format.""" + TEST_RESPONSES = { + # https://github.com/mill1000/midea-ac-py/issues/204#issuecomment-2314705021 + (150.4, .6, 279.5): bytes.fromhex("aa22ac00000000000803c1210144000005e00000000000000006000aeb000000487a5e"), + + # https://github.com/mill1000/midea-msmart/pull/116#issuecomment-2218753545 + (None, None, None): bytes.fromhex("aa20ac00000000000303c1210144000000000000000000000000000000000843bc"), + } + + for power, response in TEST_RESPONSES.items(): + resp = Response.construct(response) + self.assertIsNotNone(resp) + + # Assert response is a state response + self.assertEqual(type(resp), EnergyUsageResponse) + + # Create a dummy device and process the response + device = AC(0, 0, 0) + + # Switch to binary format + device.use_alternate_energy_format = True + + # Update state with response + device._update_state(resp) + + total, current, real_time = power + + # Assert state is expected + self.assertEqual(device.total_energy_usage, total) + self.assertEqual(device.current_energy_usage, current) + self.assertEqual(device.real_time_power_usage, real_time) + def test_humidity_response(self) -> None: """Test parsing of HumidityResponses into device state.""" TEST_RESPONSES = {