Skip to content

Commit

Permalink
Refactor property decode/encode (#170)
Browse files Browse the repository at this point in the history
* Push property encode & decode into command.py
* Add testcases to verify encode/decode of properties
* Only decode/encode supported (e.g. tested) properties.
  • Loading branch information
mill1000 authored Aug 29, 2024
1 parent 4b171a0 commit 50ec117
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 58 deletions.
90 changes: 54 additions & 36 deletions msmart/device/AC/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,51 @@ class PropertyId(IntEnum):
IECO = 0x00E3
ANION = 0x021E

@property
def _supported(self) -> bool:
"""Check if a property ID is supported/tested."""
return self in [
PropertyId.BREEZE_AWAY,
PropertyId.BREEZE_CONTROL,
PropertyId.BREEZELESS,
PropertyId.BUZZER,
PropertyId.IECO,
PropertyId.RATE_SELECT,
PropertyId.SELF_CLEAN,
PropertyId.SWING_LR_ANGLE,
PropertyId.SWING_UD_ANGLE,
]

def decode(self, data: bytes) -> Any:
"""Decode raw property data into a convenient form."""
if not self._supported:
raise NotImplementedError(f"{repr(self)} decode is not supported.")

if self in [PropertyId.BREEZELESS, PropertyId.SELF_CLEAN]:
return bool(data[0])
elif self == PropertyId.BREEZE_AWAY:
return data[0] == 2
elif self == PropertyId.BUZZER:
return None # Don't decode buzzer
elif self == PropertyId.IECO:
# data[0] - ieco_number, data[1] - ieco_switch
return bool(data[1])
else:
return data[0]

def encode(self, *args, **kwargs) -> bytes:
"""Encode property into raw form."""
if not self._supported:
raise NotImplementedError(f"{repr(self)} encode is not supported.")

if self == PropertyId.BREEZE_AWAY:
return bytes([2 if args[0] else 1])
elif self == PropertyId.IECO:
# ieco_frame, ieco_number, ieco_switch, ...
return bytes([0, 1, args[0]]) + bytes(10)
else:
return bytes(args[0:1])


class TemperatureType(IntEnum):
UNKNOWN = 0
Expand Down Expand Up @@ -344,7 +389,7 @@ def tobytes(self) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride]
class SetPropertiesCommand(Command):
"""Command to set specific properties of the device."""

def __init__(self, props: Mapping[PropertyId, Union[bytes, int]]) -> None:
def __init__(self, props: Mapping[PropertyId, Union[int, bool]]) -> None:
super().__init__(frame_type=FrameType.CONTROL)

self._properties = props
Expand All @@ -358,8 +403,8 @@ def tobytes(self) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride]
for prop, value in self._properties.items():
payload += struct.pack("<H", prop)

if isinstance(value, int):
value = bytes([value])
# Encode property value to bytes
value = prop.encode(value)

payload += bytes([len(value)])
payload += value
Expand Down Expand Up @@ -900,26 +945,6 @@ def _parse(self, payload: memoryview) -> None:
# Clear existing properties
self._properties.clear()

# Define parsing functions for supported properties
# TODO when a properties has multiple field .e.g fresh air
# should they be stored all under the fresh_air key or create different
# keys for each. e.g. capabilities
parsers = {
PropertyId.ANION: lambda v: v[0],
PropertyId.BREEZE_AWAY: lambda v: v[0],
PropertyId.BREEZE_CONTROL: lambda v: v[0],
PropertyId.BREEZELESS: lambda v: v[0],
PropertyId.BUZZER: lambda v: None, # Don't bother parsing buzzer state
PropertyId.FRESH_AIR: lambda v: (v[0], v[1], v[2]),
# v[0] - ieco_number, v[1] - ieco_switch
PropertyId.IECO: lambda v: v[1],
PropertyId.INDOOR_HUMIDITY: lambda v: v[0],
PropertyId.RATE_SELECT: lambda v: v[0],
PropertyId.SELF_CLEAN: lambda v: v[0],
PropertyId.SWING_UD_ANGLE: lambda v: v[0],
PropertyId.SWING_LR_ANGLE: lambda v: v[0],
}

count = payload[1]
props = payload[2:]

Expand Down Expand Up @@ -948,26 +973,19 @@ def _parse(self, payload: memoryview) -> None:
props = props[4+size:]
continue

# Fetch parser for this property
parser = parsers.get(property, None)

# Check if parser exists
if parser is None:
_LOGGER.warning(
"Unsupported property %r, Size: %d.", property, size)
# Advanced to next property
props = props[4+size:]
continue

# Check execution result and log any errors
error = props[2] & 0x10
if error:
_LOGGER.error(
"Property %r failed, Result: 0x%02X.", property, props[2])

# Parse the property
if (value := parser(props[4:])) is not None:
self._properties.update({property: value})
try:
if (value := property.decode(props[4:])) is not None:
self._properties.update({property: value})
except NotImplementedError:
_LOGGER.warning(
"Unsupported property %r, Size: %d.", property, size)

# Advanced to next property
props = props[4+size:]
Expand Down
16 changes: 7 additions & 9 deletions msmart/device/AC/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,11 @@ class BreezeMode(MideaIntEnum):
DEFAULT = OFF

# Create a dict to map attributes to property values
# TODO encoding of properties should be pushed into commands
_PROPERTY_MAP = {
PropertyId.BREEZE_AWAY: lambda s: 2 if s._breeze_mode == AirConditioner.BreezeMode.BREEZE_AWAY else 1,
PropertyId.BREEZE_AWAY: lambda s: s._breeze_mode == AirConditioner.BreezeMode.BREEZE_AWAY,
PropertyId.BREEZE_CONTROL: lambda s: s._breeze_mode,
PropertyId.BREEZELESS: lambda s: s._breeze_mode == AirConditioner.BreezeMode.BREEZELESS,
# ieco_frame, ieco_number, ieco_switch, ...
PropertyId.IECO: lambda s: bytes([0, 1, s._ieco_mode]) + bytes(10),
PropertyId.IECO: lambda s: s._ieco_mode,
PropertyId.RATE_SELECT: lambda s: s._rate_select,
PropertyId.SWING_LR_ANGLE: lambda s: s._horizontal_swing_angle,
PropertyId.SWING_UD_ANGLE: lambda s: s._vertical_swing_angle
Expand Down Expand Up @@ -220,7 +218,7 @@ def _update_state(self, res: Response) -> None:
AirConditioner.SwingAngle.get_from_value(angle))

if (value := res.get_property(PropertyId.SELF_CLEAN)) is not None:
self._self_clean_active = bool(value)
self._self_clean_active = value

if (rate := res.get_property(PropertyId.RATE_SELECT)) is not None:
self._rate_select = cast(
Expand All @@ -233,15 +231,15 @@ def _update_state(self, res: Response) -> None:
else AirConditioner.BreezeMode.OFF)
else:
if (value := res.get_property(PropertyId.BREEZE_AWAY)) is not None:
self._breeze_mode = (AirConditioner.BreezeMode.BREEZE_AWAY if (value == 2)
self._breeze_mode = (AirConditioner.BreezeMode.BREEZE_AWAY if value
else AirConditioner.BreezeMode.OFF)

if (value := res.get_property(PropertyId.BREEZELESS)) is not None:
self._breeze_mode = (AirConditioner.BreezeMode.BREEZELESS if bool(value)
self._breeze_mode = (AirConditioner.BreezeMode.BREEZELESS if value
else AirConditioner.BreezeMode.OFF)

if (value := res.get_property(PropertyId.IECO)) is not None:
self._ieco_mode = bool(value)
self._ieco_mode = value

elif isinstance(res, EnergyUsageResponse):
self._total_energy_usage = res.total_energy
Expand Down Expand Up @@ -482,7 +480,7 @@ async def refresh(self) -> None:
for response in await self._send_command_get_responses(cmd):
self._update_state(response)

async def _apply_properties(self, properties: dict[PropertyId, Union[bytes, int]]) -> None:
async def _apply_properties(self, properties: dict[PropertyId, Union[int, bool]]) -> None:
"""Apply the provided properties to the device."""

# Warn if attempting to update a property that isn't supported
Expand Down
82 changes: 71 additions & 11 deletions msmart/device/AC/test_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,11 +590,38 @@ def test_payload(self) -> None:

class TestSetPropertiesCommand(unittest.TestCase):

def test_encode(self) -> None:
"""Test encoding of property values to bytes objects."""
TEST_ENCODES = {
# Breeze away: 0x02 - On, 0x01 - Off
(PropertyId.BREEZE_AWAY, True): bytes([0x02]),
(PropertyId.BREEZE_AWAY, False): bytes([0x01]),

# Breezeless: Boolean
(PropertyId.BREEZELESS, True): bytes([0x01]),
(PropertyId.BREEZELESS, False): bytes([0x00]),

# Breeze control: Passthru
(PropertyId.BREEZE_CONTROL, 0x04): bytes([0x04]),
(PropertyId.BREEZE_CONTROL, 0x00): bytes([0x00]),

# IECO: 13 bytes ieco_frame, ieco_number, ieco_switch, ...
(PropertyId.IECO, True): bytes([0, 1, 1]) + bytes(10),
(PropertyId.IECO, False): bytes([0, 1, 0]) + bytes(10),
}

for (prop, value), expected_data in TEST_ENCODES.items():
self.assertEqual(prop.encode(value), expected_data, msg=f"""Encode {
repr(prop)}, Value: {value}, Expected: {expected_data}""")

# Validate "unsupported" properties raise exceptions
with self.assertRaisesRegex(NotImplementedError, ".* encode is not supported."):
PropertyId.ANION.encode(True)

def test_payload(self) -> None:
"""Test that we encode set properties payloads correctly."""
# TODO this test is not based on a real world sample
PROPS = {PropertyId.SWING_UD_ANGLE: bytes(
[25]), PropertyId.SWING_LR_ANGLE: bytes([75])}
PROPS = {PropertyId.SWING_UD_ANGLE: 25, PropertyId.SWING_LR_ANGLE: 75}

# Build command
command = SetPropertiesCommand(PROPS)
Expand All @@ -610,38 +637,71 @@ def test_payload(self) -> None:
self.assertEqual(payload[2], PropertyId.SWING_UD_ANGLE & 0xFF)
self.assertEqual(payload[3], PropertyId.SWING_UD_ANGLE >> 8 & 0xFF)

# Assert length is correct
self.assertEqual(payload[4], len(PROPS[PropertyId.SWING_UD_ANGLE]))

# Assert data is correct
self.assertEqual(payload[5], PROPS[PropertyId.SWING_UD_ANGLE][0])
# Assert length is correct and data is correct
self.assertEqual(payload[4], 1)
self.assertEqual(payload[5], PROPS[PropertyId.SWING_UD_ANGLE])


class TestPropertiesResponse(_TestResponseBase):
"""Test properties response messages."""

def test_decode(self) -> None:
"""Test decoding of bytes objects to property values."""
TEST_DECODES = {
# Breeze away 0x02 - On, 0x01 - Off
(PropertyId.BREEZE_AWAY, bytes([0x02])): True,
(PropertyId.BREEZE_AWAY, bytes([0x01])): False,

# Breezeless: Boolean
(PropertyId.BREEZELESS, bytes([0x01])): True,
(PropertyId.BREEZELESS, bytes([0x00])): False,
(PropertyId.BREEZELESS, bytes([0x02])): True,

# Breeze control: Passthru
(PropertyId.BREEZE_CONTROL, bytes([0x04])): 0x04,
(PropertyId.BREEZE_CONTROL, bytes([0x00])): 0x00,

# Buzzer: Don't decode
(PropertyId.BUZZER, bytes([0x00])): None,

# IECO: 2 bytes
(PropertyId.IECO, bytes([0x00, 0x00])): False,
(PropertyId.IECO, bytes([0x00, 0x01])): True,
}

for (prop, data), expected_value in TEST_DECODES.items():
self.assertEqual(prop.decode(data), expected_value, msg=f"""Decode {
repr(prop)}, Data: {data}, Expected: {expected_value}""")

# Validate "unsupported" properties raise exceptions
with self.assertRaisesRegex(NotImplementedError, ".* decode is not supported."):
PropertyId.INDOOR_HUMIDITY.decode(bytes([1]))

def test_properties_parsing(self) -> None:
"""Test we decode properties correctly."""
"""Test we decode properties responses correctly."""
# https://github.com/mill1000/midea-ac-py/issues/60#issuecomment-1936976587
TEST_RESPONSE = bytes.fromhex(
"aa21ac00000000000303b10409000001000a00000100150000012b1e020000005fa3")

resp = self._test_build_response(TEST_RESPONSE)
# Response contains an unsupported property so check the log for warnings
with self.assertLogs("msmart", logging.WARNING) as log:
resp = self._test_build_response(TEST_RESPONSE)

self.assertRegex("\n".join(log.output),
"Unsupported property .*INDOOR_HUMIDITY.*")

# Assert response is a correct type
self.assertEqual(type(resp), PropertiesResponse)
resp = cast(PropertiesResponse, resp)

EXPECTED_RAW_PROPERTIES = {
PropertyId.INDOOR_HUMIDITY: 43,
PropertyId.SWING_LR_ANGLE: 0,
PropertyId.SWING_UD_ANGLE: 0,
}
# Ensure raw decoded properties match
self.assertEqual(resp._properties, EXPECTED_RAW_PROPERTIES)

# Check state
self.assertEqual(resp.get_property(PropertyId.INDOOR_HUMIDITY), 43)
self.assertEqual(resp.get_property(PropertyId.SWING_LR_ANGLE), 0)
self.assertEqual(resp.get_property(PropertyId.SWING_UD_ANGLE), 0)

Expand Down
9 changes: 7 additions & 2 deletions msmart/device/AC/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,15 @@ def test_properties_response(self) -> None:
device.horizontal_swing_angle = AC.SwingAngle.POS_5
device.vertical_swing_angle = AC.SwingAngle.POS_5

resp = Response.construct(TEST_RESPONSE)
self.assertIsNotNone(resp)
# Response contains an unsupported property so check the log for warnings
with self.assertLogs("msmart", logging.WARNING) as log:
resp = Response.construct(TEST_RESPONSE)

self.assertRegex("\n".join(log.output),
"Unsupported property .*INDOOR_HUMIDITY.*")

# Assert response is a state response
self.assertIsNotNone(resp)
self.assertEqual(type(resp), PropertiesResponse)

# Process the response
Expand Down

0 comments on commit 50ec117

Please sign in to comment.