Skip to content

Commit

Permalink
Detect service and device UUID mismatches (#72)
Browse files Browse the repository at this point in the history
* Detect service and device UUID mismatches

This patch introduces a new validation stage to the Device connection
flow where the UUID in the Service object used to initialize the device
is validated against the UUID received in the initial device properties
query. If the UUIDs do not match, the connection is closed and an
exception is set on the run future and any "wait for available"
coroutines.

This feature required a fairly big rework of internal device connection
logic, but the basic design remains the same. Additional tests cover the
new functionality.

See home-assistant/core#98784

* Update aiobafi6/device.py

Co-authored-by: J. Nick Koston <[email protected]>

* Update aiobafi6/device.py

Co-authored-by: J. Nick Koston <[email protected]>

---------

Co-authored-by: J. Nick Koston <[email protected]>
  • Loading branch information
jfroy and bdraco authored Aug 23, 2023
1 parent 9cc438a commit 77ed6a9
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 91 deletions.
2 changes: 2 additions & 0 deletions aiobafi6/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
from .const import MIN_API_VERSION
from .device import Device
from .discovery import PORT, ZEROCONF_SERVICE_TYPE, Service, ServiceBrowser
from .exceptions import DeviceUUIDMismatchError
from .protoprop import OffOnAuto

__all__ = (
"MIN_API_VERSION",
"PORT",
"ZEROCONF_SERVICE_TYPE",
"Device",
"DeviceUUIDMismatchError",
"OffOnAuto",
"Service",
"ServiceBrowser",
Expand Down
175 changes: 92 additions & 83 deletions aiobafi6/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from . import wireutils
from .const import DELAY_BETWEEN_CONNECTS_SECONDS, OCCUPANCY_MIN_API_VERSION
from .discovery import Service
from .exceptions import DeviceUUIDMismatchError
from .proto import aiobafi6_pb2
from .protoprop import (
ClosedIntervalValidator,
Expand Down Expand Up @@ -61,7 +62,7 @@ def _clear_volatile_props(props: aiobafi6_pb2.Properties):


class Device:
"""A connected BAF i6 protocol device.
"""A BAF i6 protocol device.
The design the of class is relatively simple. Since the protocol is based on
protofbuf, the majority of a device's state can be stored in a `Properties` message.
Expand All @@ -70,12 +71,21 @@ class Device:
memory growth). Synthetic properties expose the protobuf to clients.
A device must be initialized with a `Service`, either obtained using the `discovery`
module or manually created. The only required fields are at least an address and a
port.
module or manually created. The only required fields are `ip_addresses` and `port`.
A `Device` object is initially inert. A client must called its `run` method to
create an `asyncio.Task` that will maintain a connection to the device and service
properties queries, pushes, and commits.
A `Device` object is initially inert. A client must call its `async_run` method to
connect the device, process state changes and handle device updates. A
`asyncio.Future` is returned to monitor and stop the device.
A device has an `available` property which is true when the device is connected and
has received basic properties from the firmware. The `async_wait_available` coroutine
can be used to wait for a device to become available (which may never happen).
If the `uuid` field of the `Service` used to initialize a device is set, the library
will validate it against the device's `dns_sd_uuid` property after connecting to the
device and receiving basic properties. If the UUIDs do not match, the device is
stopped and a `DeviceUUIDMismatchError` exception is set on the run future and raised
in `async_wait_available` coroutines.
To disable periodic properties queries, set `query_interval_seconds` to 0.
Expand All @@ -84,7 +94,7 @@ class Device:
the device is in a steady state). Callbacks are also be suppressed when only
so-called volatile properties have changed, such as fan RPM, device uptime or the
device's internal clock. This can be disabled by setting `ignore_volatile_props` to
false. These properties are still queried and available to read from the device.
False. These properties are still queried and available to read from the device.
"""

def __init__(
Expand Down Expand Up @@ -116,7 +126,6 @@ def __init__(
if self._loop is None:
raise RuntimeError("no running loop")
self._run_fut: t.Optional[asyncio.Future] = None
self._stop_requested = False
self._next_connect_ts: float = time.monotonic()
self._connect_timer: t.Optional[asyncio.TimerHandle] = None
self._connect_task: t.Optional[asyncio.Task] = None
Expand All @@ -126,7 +135,7 @@ def __init__(
self._query_timer: t.Optional[asyncio.TimerHandle] = None

# Availability.
self._available_event = asyncio.Event()
self._available_fut: asyncio.Future = self._loop.create_future()

def __eq__(self, other: t.Any) -> bool:
if isinstance(other, Device):
Expand Down Expand Up @@ -213,8 +222,7 @@ def _dispatch_callbacks(self) -> None:
"""Dispatch registered device update callbacks.
An async task is created for coroutine callbacks. Function callbacks are
executed synchronously. For function callbacks, each invocation is done inside
a try-except block to swallow any error."""
executed synchronously inside a try-except block to swallow any error."""
for callback in self._callbacks:
try:
callback(self)
Expand Down Expand Up @@ -258,29 +266,32 @@ def _commit_property(self, prop: aiobafi6_pb2.Properties) -> None:

# Connection and query machinery

def _sched_connect_or_signal_run_fut(self):
"""Schedules a `_connect` invocation or signals the run future.
def _sched_connect_or_reset(self):
"""Schedule a `_connect` invocation or reset the device to be run again.
This function is called when a connection could not be established (error or
timeout), or the connection has been closed, or there is no connection
(`_start`). This is somewhat enforced by checking that various member variables
are None."""
This function is the entrypoint of the internal run state machine. It is called
when there is no connection (`_start`), when a connection could not be
established (error or timeout) or the connection has been closed, or when the
device is stopped (`_stop`)."""
assert self._connect_timer is None
assert self._connect_task is None
assert self._query_timer is None
assert self._transport is None
assert self._protocol is None
if self._stop_requested:
assert self._run_fut
if not self._run_fut.done():
_LOGGER.debug("%s: Signalling run future.", self.name)
self._run_fut.set_result(None)
else:
_LOGGER.debug("%s: Scheduling next connect invocation.", self.name)
self._connect_timer = self._loop.call_at(
self._next_connect_ts,
self._connect,
)
assert self._run_fut
# If the run future is done, then reset it to None and return.
# `_sched_connect_or_reset` is the entrypoint of the internal run state machine
# and therefore this is the right place to make the device runnable again (by
# clearing the run future).
if self._run_fut.done():
_LOGGER.debug("%s: Resetting device for new run.", self.name)
self._run_fut = None
return
_LOGGER.debug("%s: Scheduling next connect invocation.", self.name)
self._connect_timer = self._loop.call_at(
self._next_connect_ts,
self._connect,
)

def _connect(self) -> None:
self._connect_timer = None
Expand Down Expand Up @@ -315,7 +326,7 @@ def _finish_connect(self, task: asyncio.Task) -> None:
self._loop.call_soon(self._query)
except (OSError, asyncio.CancelledError) as err:
_LOGGER.debug("%s: Connection failed: %s", self.name, err)
self._sched_connect_or_signal_run_fut()
self._sched_connect_or_reset()

def _handle_connection_lost(self, exc: t.Optional[Exception]) -> None:
_LOGGER.debug("%s: Connection lost: %s", self.name, exc)
Expand All @@ -324,7 +335,7 @@ def _handle_connection_lost(self, exc: t.Optional[Exception]) -> None:
self._query_timer = None
self._transport = None
self._protocol = None
self._sched_connect_or_signal_run_fut()
self._sched_connect_or_reset()

def _process_message(self, data: bytes) -> None:
root = aiobafi6_pb2.Root() # pylint: disable=no-member
Expand All @@ -335,22 +346,33 @@ def _process_message(self, data: bytes) -> None:
previous = self.properties_proto
for prop in root.root2.query_result.properties:
self._properties.MergeFrom(prop)
if not self._available_event.is_set():
self._maybe_make_available()
if not self.available:
self._maybe_set_available()
current = self.properties_proto
if self._ignore_volatile_props:
_clear_volatile_props(previous)
_clear_volatile_props(current)
if self._available_event.is_set() and current != previous:
if self.available and current != previous:
self._dispatch_callbacks()

def _maybe_make_available(self):
def _maybe_set_available(self):
"""Set the device as available if all required properties are set."""
for pname in _PROPS_REQUIRED_FOR_AVAILABLE:
if not self._properties.HasField(pname):
return
if self._service.uuid is not None and self._service.uuid != self.dns_sd_uuid:
_LOGGER.error(
"%s: Device UUID (%s) does not match service UUID (%s): stopping.",
self.name,
self.dns_sd_uuid,
self._service.uuid,
)
assert self._run_fut
if not self._run_fut.done():
self._run_fut.set_exception(DeviceUUIDMismatchError)
return
_LOGGER.debug("%s: Setting device as available.", self.name)
self._available_event.set()
self._available_fut.set_result(True)

def _query(self) -> None:
self._query_timer = None
Expand All @@ -374,33 +396,13 @@ def async_run(self) -> asyncio.Future:
A running `Device` schedules functions on the run loop to maintain a connection
to the device, sends periodic property queries, and services query commits.
Returns a future that will resolve when the device stops. Cancelling any future
returned by this function will stop the device.
Returns a future that will resolve when the device stops. Cancelling this future
will stop the device.
"""
fut = self._loop.create_future()
if self._run_fut is None:
self._start()
assert self._run_fut is not None

def resolve_fut(_: asyncio.Future):
if not fut.done():
fut.set_result(None)

self._run_fut.add_done_callback(resolve_fut)

# Snapshot the current `_run_fut` in this function to ensure `fut` cannot cancel
# a future run invocation. This seems unlikely but if run loops can execute
# scheduled callbacks in any order then it can happen. Snapshotting `_run_fut`
# and doing an ID equality works because by capturing it here its lifetime is
# extended and any future `_run_fut` is going to have a different ID.
run_fut = self._run_fut

def stop_on_cancel(_: asyncio.Future):
if fut.cancelled() and self._run_fut is run_fut:
self._stop()

fut.add_done_callback(stop_on_cancel)
return fut
return self._run_fut

def _start(self):
"""Start the device.
Expand All @@ -410,22 +412,38 @@ def _start(self):
connection, send periodic property queries, and service query commits.
"""
assert self._run_fut is None
assert not self._stop_requested
_LOGGER.debug("%s: Starting.", self.name)
self._run_fut = self._loop.create_future()
self._run_fut.add_done_callback(self._finish_run)
self._sched_connect_or_signal_run_fut()

def stop_on_done(_: asyncio.Future):
self._stop()

self._run_fut.add_done_callback(stop_on_done)
self._sched_connect_or_reset()

def _stop(self) -> None:
"""Stop the device."""
if self._stop_requested:
return
"""Stop the device.
This function ultimately causes `_sched_connect_or_reset` to be called by
cancelling the appropriate in-flight task.
This function also creates a new available future, thus marking the device as
unavailable. If the run future became done because of an exception or because
it was cancelled, that is propagated to the prior available future. Otherwise,
the prior available future is never signalled."""
_LOGGER.debug("%s: Stopping.", self.name)
# This will cause `_sched_connect_or_signal_run_fut` to signal `_run_fut`.
self._stop_requested = True
# The device is not available anymore. Dispatch device callbacks so clients can
# react to the change.
self._available_event.clear()
# Propagate run exception or cancellation to the available future, then reset it
# to set the device as unavailable.
assert self._run_fut
if self._run_fut.cancelled():
self._available_fut.cancel()
else:
run_exc = self._run_fut.exception()
if run_exc is not None:
self._available_fut.set_exception(run_exc)
self._available_fut = self._loop.create_future()
# Dispatch client callbacks, since some clients may observe the `available`
# property through a callback.
self._dispatch_callbacks()
# If there is an active connection, close it.
if self._transport is not None:
Expand All @@ -434,32 +452,23 @@ def _stop(self) -> None:
elif self._connect_task is not None:
self._connect_task.cancel()
# Otherwise, if `_connect` is scheduled, cancel that and call
# `_sched_connect_or_signal_run_fut` directly because nothing else will.
# `_sched_connect_or_reset` directly because nothing else will.
elif self._connect_timer is not None:
self._connect_timer.cancel()
self._connect_timer = None
self._sched_connect_or_signal_run_fut()

def _finish_run(self, _: asyncio.Future) -> None:
"""Reset the run future to None.
This is the only completion callback for the run future and the only place where
it is reset to None, indicating that the device has fully stopped and could be
run again."""
_LOGGER.debug("%s: Stopped.", self.name)
self._run_fut = None
self._stop_requested = False
self._sched_connect_or_reset()

# Availability

@property
def available(self) -> bool:
"""Return True when device is running and has values for critical properties."""
return self._available_event.is_set()
available_fut = self._available_fut
return available_fut.done() and not available_fut.exception()

async def async_wait_available(self) -> None:
"""Asynchronously wait for the device to be available."""
await self._available_event.wait()
await self._available_fut

# General

Expand Down
Loading

0 comments on commit 77ed6a9

Please sign in to comment.