Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement network scanning using the standard zigpy interface #648

Merged
merged 5 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,28 +223,44 @@ async def _list_command(
self, name, item_frames, completion_frame, spos, *args, **kwargs
):
"""Run a command, returning result callbacks as a list"""
fut = asyncio.Future()
queue = asyncio.Queue()
results = []

def cb(frame_name, response):
if frame_name in item_frames:
with self.callback_for_commands(
commands=set(item_frames) | {completion_frame},
callback=lambda command, response: queue.put_nowait((command, response)),
):
v = await self._command(name, *args, **kwargs)
if t.sl_Status.from_ember_status(v[0]) != t.sl_Status.OK:
raise Exception(v)

while True:
command, response = await queue.get()
if command == completion_frame:
if t.sl_Status.from_ember_status(response[spos]) != t.sl_Status.OK:
raise Exception(response)

break

results.append(response)
elif frame_name == completion_frame:
fut.set_result(response)

return results

@contextlib.contextmanager
def callback_for_commands(
self, commands: set[str], callback: Callable
) -> Generator[None]:
def cb(frame_name, response):
if frame_name in commands:
callback(frame_name, response)

cbid = self.add_callback(cb)

try:
v = await self._command(name, *args, **kwargs)
if t.sl_Status.from_ember_status(v[0]) != t.sl_Status.OK:
raise Exception(v)
v = await fut
if t.sl_Status.from_ember_status(v[spos]) != t.sl_Status.OK:
raise Exception(v)
yield
finally:
self.remove_callback(cbid)

return results

startScan = functools.partialmethod(
_list_command,
"startScan",
Expand All @@ -253,7 +269,11 @@ def cb(frame_name, response):
1,
)
pollForData = functools.partialmethod(
_list_command, "pollForData", ["pollHandler"], "pollCompleteHandler", 0
_list_command,
"pollForData",
["pollHandler"],
"pollCompleteHandler",
0,
)
zllStartScan = functools.partialmethod(
_list_command,
Expand Down
41 changes: 41 additions & 0 deletions bellows/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import statistics
import sys
from typing import AsyncGenerator

if sys.version_info[:2] < (3, 11):
from async_timeout import timeout as asyncio_timeout # pragma: no cover
Expand Down Expand Up @@ -711,6 +712,46 @@ async def energy_scan(
for channel in list(channels)
}

async def _network_scan(
self, channels: t.Channels, duration_exp: int
) -> AsyncGenerator[zigpy.types.NetworkBeacon]:
"""Scans for networks and yields network beacons."""
queue = asyncio.Queue()

with self._ezsp.callback_for_commands(
{"networkFoundHandler", "scanCompleteHandler"},
callback=lambda command, response: queue.put_nowait((command, response)),
):
# XXX: replace with normal command invocation once overload is removed
(status,) = await self._ezsp._command(
"startScan",
scanType=t.EzspNetworkScanType.ACTIVE_SCAN,
channelMask=channels,
duration=duration_exp,
)

if t.sl_Status.from_ember_status(status) != t.sl_Status.OK:
raise ControllerError(f"Failed to start scan: {status!r}")

while True:
command, response = await queue.get()

if command == "scanCompleteHandler":
break

(networkFound, lastHopLqi, lastHopRssi) = response

yield zigpy.types.NetworkBeacon(
pan_id=networkFound.panId,
extended_pan_id=networkFound.extendedPanId,
channel=networkFound.channel,
nwk_update_id=networkFound.nwkUpdateId,
permit_joining=bool(networkFound.allowingJoin),
stack_profile=networkFound.stackProfile,
lqi=lastHopLqi,
rssi=lastHopRssi,
)

async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
if not self.is_controller_running:
raise ControllerError("ApplicationController is not running")
Expand Down
101 changes: 101 additions & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -1943,3 +1943,104 @@
)
)
]


async def test_network_scan(app: ControllerApplication) -> None:
app._ezsp._protocol.startScan.return_value = [t.sl_Status.OK]

def run_scan() -> None:
app._ezsp._protocol._handle_callback(
"networkFoundHandler",
list(
{
"networkFound": t.EmberZigbeeNetwork(
channel=11,
panId=zigpy_t.PanId(0x1D13),
extendedPanId=t.EUI64.convert("00:07:81:00:00:9a:8f:3b"),
allowingJoin=False,
stackProfile=2,
nwkUpdateId=0,
),
"lastHopLqi": 152,
"lastHopRssi": -62,
}.values()
),
)
app._ezsp._protocol._handle_callback(
"networkFoundHandler",
list(
{
"networkFound": t.EmberZigbeeNetwork(
channel=11,
panId=zigpy_t.PanId(0x2857),
extendedPanId=t.EUI64.convert("00:07:81:00:00:9a:34:1b"),
allowingJoin=False,
stackProfile=2,
nwkUpdateId=0,
),
"lastHopLqi": 136,
"lastHopRssi": -66,
}.values()
),
)
app._ezsp._protocol._handle_callback(
"scanCompleteHandler",
list(
{
"channel": 26,
"status": t.sl_Status.OK,
}.values()
),
)

asyncio.get_running_loop().call_soon(run_scan)

results = [
beacon
async for beacon in app.network_scan(

Check failure on line 2000 in tests/test_application.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

test_network_scan AttributeError: 'ControllerApplication' object has no attribute 'network_scan'

Check failure on line 2000 in tests/test_application.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

test_network_scan AttributeError: 'ControllerApplication' object has no attribute 'network_scan'. Did you mean: '_network_scan'?

Check failure on line 2000 in tests/test_application.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

test_network_scan AttributeError: 'ControllerApplication' object has no attribute 'network_scan'

Check failure on line 2000 in tests/test_application.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

test_network_scan AttributeError: 'ControllerApplication' object has no attribute 'network_scan'. Did you mean: '_network_scan'?

Check failure on line 2000 in tests/test_application.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

test_network_scan AttributeError: 'ControllerApplication' object has no attribute 'network_scan'
channels=t.Channels.from_channel_list([11, 15, 26]), duration_exp=4
)
]

assert results == [
zigpy_t.NetworkBeacon(
pan_id=0x1D13,
extended_pan_id=t.EUI64.convert("00:07:81:00:00:9a:8f:3b"),
channel=11,
permit_joining=False,
stack_profile=2,
nwk_update_id=0,
lqi=152,
src=None,
rssi=-62,
depth=None,
router_capacity=None,
device_capacity=None,
protocol_version=None,
),
zigpy_t.NetworkBeacon(
pan_id=0x2857,
extended_pan_id=t.EUI64.convert("00:07:81:00:00:9a:34:1b"),
channel=11,
permit_joining=False,
stack_profile=2,
nwk_update_id=0,
lqi=136,
src=None,
rssi=-66,
depth=None,
router_capacity=None,
device_capacity=None,
protocol_version=None,
),
]


async def test_network_scan_failure(app: ControllerApplication) -> None:
app._ezsp._protocol.startScan.return_value = [t.sl_Status.FAIL]

with pytest.raises(zigpy.exceptions.ControllerException):
async for beacon in app.network_scan(

Check failure on line 2043 in tests/test_application.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

test_network_scan_failure AttributeError: 'ControllerApplication' object has no attribute 'network_scan'

Check failure on line 2043 in tests/test_application.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

test_network_scan_failure AttributeError: 'ControllerApplication' object has no attribute 'network_scan'. Did you mean: '_network_scan'?

Check failure on line 2043 in tests/test_application.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

test_network_scan_failure AttributeError: 'ControllerApplication' object has no attribute 'network_scan'

Check failure on line 2043 in tests/test_application.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

test_network_scan_failure AttributeError: 'ControllerApplication' object has no attribute 'network_scan'. Did you mean: '_network_scan'?

Check failure on line 2043 in tests/test_application.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

test_network_scan_failure AttributeError: 'ControllerApplication' object has no attribute 'network_scan'
channels=t.Channels.from_channel_list([11, 15, 26]), duration_exp=4
):
pass
Loading