Skip to content

Commit

Permalink
Push property encode & decode into command.py
Browse files Browse the repository at this point in the history
  • Loading branch information
mill1000 committed Aug 28, 2024
1 parent 4b171a0 commit fef494e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 51 deletions.
60 changes: 25 additions & 35 deletions msmart/device/AC/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,27 @@ class PropertyId(IntEnum):
IECO = 0x00E3
ANION = 0x021E

def decode(self, data: bytes) -> Any:
if self in [PropertyId.BREEZELESS, PropertyId.SELF_CLEAN]:
return bool(data[0])
elif self == PropertyId.BREEZE_AWAY:
return data[0] == 2
elif self == PropertyId.BUZZER:
return None # Don't decode buzzer
elif self == PropertyId.IECO:
return data[1] # data[0] - ieco_number, data[1] - ieco_switch
else:
return data[0]

def encode(self, *args, **kwargs) -> bytes:
if self == PropertyId.BREEZE_AWAY:
return bytes([2 if args[0] else 1])
elif self == PropertyId.IECO:
# ieco_frame, ieco_number, ieco_switch, ...
return bytes([0, 1, args[0]]) + bytes(10)
else:
return bytes(args[0:1])


class TemperatureType(IntEnum):
UNKNOWN = 0
Expand Down Expand Up @@ -344,7 +365,7 @@ def tobytes(self) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride]
class SetPropertiesCommand(Command):
"""Command to set specific properties of the device."""

def __init__(self, props: Mapping[PropertyId, Union[bytes, int]]) -> None:
def __init__(self, props: Mapping[PropertyId, Union[int, bool]]) -> None:
super().__init__(frame_type=FrameType.CONTROL)

self._properties = props
Expand All @@ -358,8 +379,8 @@ def tobytes(self) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride]
for prop, value in self._properties.items():
payload += struct.pack("<H", prop)

if isinstance(value, int):
value = bytes([value])
# Encode property value to bytes
value = prop.encode(value)

payload += bytes([len(value)])
payload += value
Expand Down Expand Up @@ -900,26 +921,6 @@ def _parse(self, payload: memoryview) -> None:
# Clear existing properties
self._properties.clear()

# Define parsing functions for supported properties
# TODO when a properties has multiple field .e.g fresh air
# should they be stored all under the fresh_air key or create different
# keys for each. e.g. capabilities
parsers = {
PropertyId.ANION: lambda v: v[0],
PropertyId.BREEZE_AWAY: lambda v: v[0],
PropertyId.BREEZE_CONTROL: lambda v: v[0],
PropertyId.BREEZELESS: lambda v: v[0],
PropertyId.BUZZER: lambda v: None, # Don't bother parsing buzzer state
PropertyId.FRESH_AIR: lambda v: (v[0], v[1], v[2]),
# v[0] - ieco_number, v[1] - ieco_switch
PropertyId.IECO: lambda v: v[1],
PropertyId.INDOOR_HUMIDITY: lambda v: v[0],
PropertyId.RATE_SELECT: lambda v: v[0],
PropertyId.SELF_CLEAN: lambda v: v[0],
PropertyId.SWING_UD_ANGLE: lambda v: v[0],
PropertyId.SWING_LR_ANGLE: lambda v: v[0],
}

count = payload[1]
props = payload[2:]

Expand Down Expand Up @@ -948,25 +949,14 @@ def _parse(self, payload: memoryview) -> None:
props = props[4+size:]
continue

# Fetch parser for this property
parser = parsers.get(property, None)

# Check if parser exists
if parser is None:
_LOGGER.warning(
"Unsupported property %r, Size: %d.", property, size)
# Advanced to next property
props = props[4+size:]
continue

# Check execution result and log any errors
error = props[2] & 0x10
if error:
_LOGGER.error(
"Property %r failed, Result: 0x%02X.", property, props[2])

# Parse the property
if (value := parser(props[4:])) is not None:
if (value := property.decode(props[4:])) is not None:
self._properties.update({property: value})

# Advanced to next property
Expand Down
16 changes: 7 additions & 9 deletions msmart/device/AC/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,11 @@ class BreezeMode(MideaIntEnum):
DEFAULT = OFF

# Create a dict to map attributes to property values
# TODO encoding of properties should be pushed into commands
_PROPERTY_MAP = {
PropertyId.BREEZE_AWAY: lambda s: 2 if s._breeze_mode == AirConditioner.BreezeMode.BREEZE_AWAY else 1,
PropertyId.BREEZE_AWAY: lambda s: s._breeze_mode == AirConditioner.BreezeMode.BREEZE_AWAY,
PropertyId.BREEZE_CONTROL: lambda s: s._breeze_mode,
PropertyId.BREEZELESS: lambda s: s._breeze_mode == AirConditioner.BreezeMode.BREEZELESS,
# ieco_frame, ieco_number, ieco_switch, ...
PropertyId.IECO: lambda s: bytes([0, 1, s._ieco_mode]) + bytes(10),
PropertyId.IECO: lambda s: s._ieco_mode,
PropertyId.RATE_SELECT: lambda s: s._rate_select,
PropertyId.SWING_LR_ANGLE: lambda s: s._horizontal_swing_angle,
PropertyId.SWING_UD_ANGLE: lambda s: s._vertical_swing_angle
Expand Down Expand Up @@ -220,7 +218,7 @@ def _update_state(self, res: Response) -> None:
AirConditioner.SwingAngle.get_from_value(angle))

if (value := res.get_property(PropertyId.SELF_CLEAN)) is not None:
self._self_clean_active = bool(value)
self._self_clean_active = value

if (rate := res.get_property(PropertyId.RATE_SELECT)) is not None:
self._rate_select = cast(
Expand All @@ -233,15 +231,15 @@ def _update_state(self, res: Response) -> None:
else AirConditioner.BreezeMode.OFF)
else:
if (value := res.get_property(PropertyId.BREEZE_AWAY)) is not None:
self._breeze_mode = (AirConditioner.BreezeMode.BREEZE_AWAY if (value == 2)
self._breeze_mode = (AirConditioner.BreezeMode.BREEZE_AWAY if value
else AirConditioner.BreezeMode.OFF)

if (value := res.get_property(PropertyId.BREEZELESS)) is not None:
self._breeze_mode = (AirConditioner.BreezeMode.BREEZELESS if bool(value)
self._breeze_mode = (AirConditioner.BreezeMode.BREEZELESS if value
else AirConditioner.BreezeMode.OFF)

if (value := res.get_property(PropertyId.IECO)) is not None:
self._ieco_mode = bool(value)
self._ieco_mode = value

elif isinstance(res, EnergyUsageResponse):
self._total_energy_usage = res.total_energy
Expand Down Expand Up @@ -482,7 +480,7 @@ async def refresh(self) -> None:
for response in await self._send_command_get_responses(cmd):
self._update_state(response)

async def _apply_properties(self, properties: dict[PropertyId, Union[bytes, int]]) -> None:
async def _apply_properties(self, properties: dict[PropertyId, Union[int, bool]]) -> None:
"""Apply the provided properties to the device."""

# Warn if attempting to update a property that isn't supported
Expand Down
11 changes: 4 additions & 7 deletions msmart/device/AC/test_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,7 @@ class TestSetPropertiesCommand(unittest.TestCase):
def test_payload(self) -> None:
"""Test that we encode set properties payloads correctly."""
# TODO this test is not based on a real world sample
PROPS = {PropertyId.SWING_UD_ANGLE: bytes(
[25]), PropertyId.SWING_LR_ANGLE: bytes([75])}
PROPS = {PropertyId.SWING_UD_ANGLE: 25, PropertyId.SWING_LR_ANGLE: 75}

# Build command
command = SetPropertiesCommand(PROPS)
Expand All @@ -610,11 +609,9 @@ def test_payload(self) -> None:
self.assertEqual(payload[2], PropertyId.SWING_UD_ANGLE & 0xFF)
self.assertEqual(payload[3], PropertyId.SWING_UD_ANGLE >> 8 & 0xFF)

# Assert length is correct
self.assertEqual(payload[4], len(PROPS[PropertyId.SWING_UD_ANGLE]))

# Assert data is correct
self.assertEqual(payload[5], PROPS[PropertyId.SWING_UD_ANGLE][0])
# Assert length is correct and data is correct
self.assertEqual(payload[4], 1)
self.assertEqual(payload[5], PROPS[PropertyId.SWING_UD_ANGLE])


class TestPropertiesResponse(_TestResponseBase):
Expand Down

0 comments on commit fef494e

Please sign in to comment.