From 8625571bad36e5abb0800eff7f8e80bec50d449c Mon Sep 17 00:00:00 2001 From: Lincoln Lorenz Date: Fri, 29 Dec 2023 15:09:00 -0500 Subject: [PATCH] Migrate zeroconf advertisement into its own module and test --- .github/workflows/python-app.yml | 1 + .pylintrc | 3 +- amplipi/__init__.py | 3 +- amplipi/app.py | 166 +--------------------------- amplipi/asgi.py | 7 +- amplipi/zeroconf.py | 179 +++++++++++++++++++++++++++++++ scripts/test | 2 + tests/context.py | 5 +- tests/test_rest.py | 61 ----------- tests/test_zeroconf.py | 53 +++++++++ 10 files changed, 250 insertions(+), 230 deletions(-) create mode 100644 amplipi/zeroconf.py create mode 100644 tests/test_zeroconf.py diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 188c26b82..9441d1e85 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -44,6 +44,7 @@ jobs: pytest tests/test_ctrl.py -vvv -k doubly_missing_config pytest tests/test_rest.py -vvv -k 'not _live' --cov=./ --cov-report=xml pytest tests/test_auth.py -vvv + pytest tests/test_zeroconf.py -vvv - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 with: diff --git a/.pylintrc b/.pylintrc index f3518a395..8147f9b19 100644 --- a/.pylintrc +++ b/.pylintrc @@ -20,7 +20,8 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use "--disable=all --enable=classes # --disable=W". -disable=bare-except +disable=bare-except, + logging-fstring-interpolation [BASIC] good-names=i, diff --git a/amplipi/__init__.py b/amplipi/__init__.py index 59b317080..19aa25d93 100755 --- a/amplipi/__init__.py +++ b/amplipi/__init__.py @@ -1,3 +1,4 @@ """ AmpliPi """ # TODO: remove "rt" -__all__ = ["app", "asgi", "ctrl", "display", "eeprom", "extras", "hw", "models", "rt", "streams", "utils"] +__all__ = ["app", "asgi", "auth", "ctrl", "defaults", "display", "eeprom", + "extras", "hw", "models", "mpris", "rt", "streams", "utils", "zeroconf"] diff --git a/amplipi/app.py b/amplipi/app.py index d617ec6d9..ad5ad6646 100755 --- a/amplipi/app.py +++ b/amplipi/app.py @@ -1,7 +1,7 @@ #!/usr/bin/python3 # AmpliPi Home Audio -# Copyright (C) 2022 MicroNova LLC +# Copyright (C) 2021-2024 MicroNova LLC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -22,8 +22,6 @@ The FastAPI/Starlette web framework is used to simplify the web plumbing. """ -import logging -import sys import argparse import os @@ -33,6 +31,8 @@ from enum import Enum from types import SimpleNamespace +from multiprocessing import Queue + import urllib.request # For custom album art size from functools import lru_cache import asyncio @@ -53,13 +53,6 @@ from starlette.responses import FileResponse from sse_starlette.sse import EventSourceResponse -# mdns service advertisement -import netifaces as ni -from socket import gethostname, inet_aton -from zeroconf import IPVersion, ServiceInfo, Zeroconf -from multiprocessing import Event, Queue -from multiprocessing.synchronize import Event as SyncEvent - # amplipi import amplipi.utils as utils import amplipi.models as models @@ -110,7 +103,7 @@ def unused_groups(ctrl: Api, src: int) -> Dict[int, str]: def unused_zones(ctrl: Api, src: int) -> Dict[int, str]: - """ Get zones that are not conencted to src """ + """ Get zones that are not connected to src """ zones = ctrl.status.zones return {z.id: z.name for z in zones if z.source_id != src and z.id is not None and not z.disabled} @@ -888,157 +881,6 @@ def on_shutdown(): del _ctrl print('webserver shutdown complete') -# MDNS - - -# TEST: use logging in this module -log = logging.getLogger(__name__) -log.setLevel(logging.INFO) -sh = logging.StreamHandler(sys.stderr) -sh.setFormatter(logging.Formatter('%(name)s: %(levelname)s - %(message)s')) -log.addHandler(sh) - - -def get_ip_info(iface: str = 'eth0') -> Tuple[Optional[str], Optional[str]]: - """ Get the IP address of interface @iface """ - try: - info = ni.ifaddresses(iface) - return info[ni.AF_INET][0].get('addr'), info[ni.AF_LINK][0].get('addr') - except: - return None, None - - -def advertise_service(port, event: SyncEvent): - """ Advertise the AmpliPi api via zeroconf, can be verified with 'avahi-browse -ar' - Expected to be run as a separate process, eg: - - event = multiprocessing.Event() - ad = Process(target=amplipi.app.advertise_service, args=(5000, event)) - ad.start() - ... - event.set() - ad.join() - NOTE: multiprocessing.Event() is a function that returns a multiprocessing.synchronize.Event type - Here the type is aliased to SyncEvent - """ - def ok(): - """ Was a stop requested by the parent process? """ - return not event.is_set() - - while ok(): - try: - _advertise_service(port, ok) - except Exception as exc: - if 'change registration' not in str(exc): - log.error(f'Failed to advertise AmpliPi service, retrying in 30s: {exc}') - # delay for a bit after a failure - delay_ct = 300 - while ok() and delay_ct > 0: - sleep(0.1) - delay_ct -= 1 - - -def _find_best_iface(ok: Callable) -> Union[Tuple[str, str, str], Tuple[None, None, None]]: - """ Try to find the best interface to advertise on - Retries several times in case DHCP resolution is delayed - """ - ip_addr, mac_addr = None, None - - # attempt to find the interface used as the default gateway - retry_count = 5 - while ok() and retry_count > 0: - try: - iface = ni.gateways()['default'][ni.AF_INET][1] - ip_addr, mac_addr = get_ip_info(iface) - if ip_addr and mac_addr: - return ip_addr, mac_addr, iface - except: - pass - sleep(2) # wait a bit in case this was started before DHCP was started - retry_count -= 1 - - # fallback to any normal interface that is available - # this also covers the case where a link-local connection is established - def is_normal(iface: str): - """ Check if an interface is wireless or wired, excluding virtual and local interfaces""" - return iface.startswith('w') or iface.startswith('e') - ifaces = filter(is_normal, ni.interfaces()) - for iface in ifaces: - ip_addr, mac_addr = get_ip_info(iface) - if ip_addr and mac_addr: - return ip_addr, mac_addr, iface - log.warning(f'Unable to register service on one of {ifaces}, \ - they all are either not available or have no IP address.') - return None, None, None - - -def _advertise_service(port: int, ok: Callable) -> None: - """ Underlying advertisement, can throw Exceptions when network is not connected """ - - hostname = f'{gethostname()}.local' - url = f'http://{hostname}' - log.debug("AmpliPi zeroconf - attempting to find best interface") - ip_addr, mac_addr, good_iface = _find_best_iface(ok) - - if not ip_addr: - log.warning('is this running on AmpliPi?') - # Fallback to any hosted ip on this device. - # This will be resolved to an ip address by the advertisement - ip_addr = '0.0.0.0' - else: - log.debug('Found IP address %s on interface %s', ip_addr, good_iface) - if port != 80: - url += f':{port}' - - info = ServiceInfo( - # use a custom type to easily support multiple amplipi device enumeration - '_amplipi._tcp.local.', - # the MAC Address is appended to distinguish multiple AmpliPi units - f'amplipi-{str(mac_addr).lower()}._amplipi._tcp.local.', - addresses=[inet_aton(ip_addr)], - port=port, - properties={ - # standard info - 'path': '/api/', - # extra info - for interfacing - 'name': 'AmpliPi', - 'vendor': 'MicroNova', - 'version': utils.detect_version(), - # extra info - for user - 'web_app': url, - 'documentation': f'{url}/doc' - }, - server=f'{hostname}.', # Trailing '.' is required by the SRV_record specification - ) - - if not ok(): - log.info("Advertisement cancelled") - return - - log.info(f'Registering service: {info}') - # right now the AmpliPi webserver is ipv4 only - zeroconf = Zeroconf(ip_version=IPVersion.V4Only, interfaces=[ip_addr]) - zeroconf.register_service(info) - log.info('Finished registering service') - try: - # poll for changes to the IP address - # this attempts to handle events like router/switch resets - while ok(): - delay_ct = 100 - while ok() and delay_ct > 0: - sleep(0.1) - delay_ct -= 1 - if ok(): - new_ip_addr, _, new_iface = _find_best_iface(ok) - if new_ip_addr != ip_addr: - log.info(f'IP address changed from {ip_addr} ({good_iface}) to {new_ip_addr} ({new_iface})') - log.info('Registration change needed') - raise Exception("change registration") - finally: - log.info('Unregistering service') - zeroconf.unregister_service(info) - zeroconf.close() - if __name__ == '__main__': """ Create the openapi yaml file describing the AmpliPi API """ diff --git a/amplipi/asgi.py b/amplipi/asgi.py index b21ca4777..67ad17572 100755 --- a/amplipi/asgi.py +++ b/amplipi/asgi.py @@ -1,7 +1,7 @@ #!/usr/bin/python3 # AmpliPi Home Audio -# Copyright (C) 2022 MicroNova LLC +# Copyright (C) 2021-2024 MicroNova LLC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -25,6 +25,7 @@ from multiprocessing import Process, Event import uvicorn import amplipi.app +from amplipi import zeroconf MOCK_CTRL = os.environ.get('MOCK_CTRL', 'False').lower() == 'true' MOCK_STREAMS = os.environ.get('MOCK_STREAMS', 'False').lower() == 'true' @@ -38,13 +39,13 @@ # NOTE: Zeroconf advertisements need to be done as a separate process to avoid blocking the # webserver startup since behind the scenes zeroconf makes its own event loop. zc_event = Event() -zc_reg = Process(target=amplipi.app.advertise_service, args=(PORT, zc_event)) +zc_reg = Process(target=zeroconf.advertise_service, args=(PORT, zc_event)) zc_reg.start() @application.on_event('shutdown') def on_shutdown(): - """ Notify the mdns advertisement to shutdown""" + """ Notify the zeroconf advertisement to shutdown""" zc_event.set() zc_reg.join() diff --git a/amplipi/zeroconf.py b/amplipi/zeroconf.py new file mode 100644 index 000000000..e3a615856 --- /dev/null +++ b/amplipi/zeroconf.py @@ -0,0 +1,179 @@ +#!/usr/bin/python3 + +# AmpliPi Home Audio +# Copyright (C) 2022-2024 MicroNova LLC +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""AmpliPi zeroconf service advertisement""" + +import logging +from multiprocessing.synchronize import Event as SyncEvent +from socket import gethostname, inet_aton +import sys +from time import sleep +from typing import Callable, Optional, Tuple, Union + +import netifaces as ni +from zeroconf import IPVersion, ServiceInfo, Zeroconf + +from amplipi import utils + +# TEST: use logging in this module +log = logging.getLogger(__name__) +log.setLevel(logging.INFO) # TODO: allow changing log level higher up (as AmpliPi config?) +sh = logging.StreamHandler(sys.stderr) +sh.setFormatter(logging.Formatter('%(name)s: %(levelname)s - %(message)s')) +log.addHandler(sh) + + +def get_ip_info(iface: str = 'eth0') -> Tuple[Optional[str], Optional[str]]: + """ Get the IP address of interface @iface """ + try: + info = ni.ifaddresses(iface) + return info[ni.AF_INET][0].get('addr'), info[ni.AF_LINK][0].get('addr') + except: + return None, None + + +def advertise_service(port, event: SyncEvent): + """ Advertise the AmpliPi api via zeroconf, can be verified with 'avahi-browse -ar' + Expected to be run as a separate process, eg: + + event = multiprocessing.Event() + ad = Process(target=amplipi.app.advertise_service, args=(5000, event)) + ad.start() + ... + event.set() + ad.join() + NOTE: multiprocessing.Event() is a function that returns a multiprocessing.synchronize.Event type + Here the type is aliased to SyncEvent + """ + def ok(): + """ Was a stop requested by the parent process? """ + return not event.is_set() + + while ok(): + try: + _advertise_service(port, ok) + except Exception as exc: + if 'change registration' not in str(exc): + log.error(f'Failed to advertise AmpliPi service, retrying in 30s: {exc}') + # delay for a bit after a failure + delay_ct = 300 + while ok() and delay_ct > 0: + sleep(0.1) + delay_ct -= 1 + + +def _find_best_iface(ok: Callable) -> Union[Tuple[str, str, str], Tuple[None, None, None]]: + """ Try to find the best interface to advertise on + Retries several times in case DHCP resolution is delayed + """ + ip_addr, mac_addr = None, None + + # attempt to find the interface used as the default gateway + retry_count = 5 + while ok() and retry_count > 0: + try: + iface = ni.gateways()['default'][ni.AF_INET][1] + ip_addr, mac_addr = get_ip_info(iface) + if ip_addr and mac_addr: + return ip_addr, mac_addr, iface + except: + pass + sleep(2) # wait a bit in case this was started before DHCP was started + retry_count -= 1 + + # fallback to any normal interface that is available + # this also covers the case where a link-local connection is established + def is_normal(iface: str): + """ Check if an interface is wireless or wired, excluding virtual and local interfaces""" + return iface.startswith('w') or iface.startswith('e') + ifaces = filter(is_normal, ni.interfaces()) + for iface in ifaces: + ip_addr, mac_addr = get_ip_info(iface) + if ip_addr and mac_addr: + return ip_addr, mac_addr, iface + log.warning(f'Unable to register service on one of {ifaces}, \ + they all are either not available or have no IP address.') + return None, None, None + + +def _advertise_service(port: int, ok: Callable) -> None: + """ Underlying advertisement, can throw Exceptions when network is not connected """ + + hostname = f'{gethostname()}.local' + url = f'http://{hostname}' + log.debug("AmpliPi zeroconf - attempting to find best interface") + ip_addr, mac_addr, good_iface = _find_best_iface(ok) + + if not ip_addr: + # Fallback to any hosted ip on this device. + # This will be resolved to an ip address by the advertisement + ip_addr = '0.0.0.0' + log.warning(f'No valid network interface found, advertising on {ip_addr}.') + else: + log.debug(f'Found IP address {ip_addr} on interface {good_iface}') + if port != 80: + url += f':{port}' + + info = ServiceInfo( + # use a custom type to easily support multiple amplipi device enumeration + '_amplipi._tcp.local.', + # the MAC Address is appended to distinguish multiple AmpliPi units + f'amplipi-{str(mac_addr).lower()}._amplipi._tcp.local.', + addresses=[inet_aton(ip_addr)], + port=port, + properties={ + # standard info + 'path': '/api/', + # extra info - for interfacing + 'name': 'AmpliPi', + 'vendor': 'MicroNova', + 'version': utils.detect_version(), + # extra info - for user + 'web_app': url, + 'documentation': f'{url}/doc' + }, + server=f'{hostname}.', # Trailing '.' is required by the SRV_record specification + ) + + if not ok(): + log.info("Advertisement cancelled") + return + + log.info(f'Registering service: {info}') + # right now the AmpliPi webserver is ipv4 only + zeroconf = Zeroconf(ip_version=IPVersion.V4Only, interfaces=[ip_addr]) + zeroconf.register_service(info) + log.info('Finished registering service') + try: + # poll for changes to the IP address + # this attempts to handle events like router/switch resets + while ok(): + delay_ct = 100 + while ok() and delay_ct > 0: + sleep(0.1) + delay_ct -= 1 + if ok(): + new_ip_addr, _, new_iface = _find_best_iface(ok) + if new_ip_addr != ip_addr: + log.info(f'IP address changed from {ip_addr} ({good_iface}) to {new_ip_addr} ({new_iface})') + log.info('Registration change needed') + raise Exception("change registration") + finally: + log.info('Unregistering service') + zeroconf.unregister_service(info) + zeroconf.close() diff --git a/scripts/test b/scripts/test index a66741c43..02413cb7c 100755 --- a/scripts/test +++ b/scripts/test @@ -27,6 +27,8 @@ pytest tests/test_ctrl.py -vvv -k doubly_missing_config pytest tests/test_rest.py -vvv -k 'not _live' pytest tests/test_auth.py -vvv +pytest tests/test_zeroconf.py -vvv + # Live tests require some amplipi streams to be setup, which means real audio # sinks are required. These tests will not run in Github Actions. diff --git a/tests/context.py b/tests/context.py index 37c2deb69..c412bcb85 100644 --- a/tests/context.py +++ b/tests/context.py @@ -4,11 +4,12 @@ import amplipi import amplipi.app +import amplipi.auth import amplipi.ctrl +import amplipi.defaults import amplipi.hw import amplipi.models import amplipi.rt # TODO: remove import amplipi.streams import amplipi.utils -import amplipi.auth -import amplipi.defaults +import amplipi.zeroconf diff --git a/tests/test_rest.py b/tests/test_rest.py index d00349c0f..db4a0a424 100644 --- a/tests/test_rest.py +++ b/tests/test_rest.py @@ -17,8 +17,6 @@ # testing context from context import amplipi -import netifaces as ni - # pylint: disable=redefined-outer-name # pylint: disable=invalid-name # pylint: disable=too-many-locals @@ -983,65 +981,6 @@ def test_generate(client): # if os.path.exists('{}/{}'.format(fullpath, fn)): # os.remove('{}/{}'.format(fullpath, fn)) -def test_zeroconf(): - """ Unit test for zeroconf advertisement """ - # TODO: migrate this test into its own module - from zeroconf import Zeroconf, ServiceStateChange, ServiceBrowser, IPVersion - from time import sleep - from multiprocessing import Process, Event - - # get default network interface - iface = ni.gateways()['default'][ni.AF_INET][1] - FAKE_PORT = 9898 - - # first time ni.ifaddresses is called in the CI system it fails - try: - ni.ifaddresses(iface)[ni.AF_LINK][0]['addr'] - except: - print('ni.ifaddresses() failed first time!') - pass - - services_advertised = {} - def on_service_state_change(zeroconf: Zeroconf, service_type: str, name: str, state_change: ServiceStateChange): - if state_change is ServiceStateChange.Added: - info = zeroconf.get_service_info(service_type, name) - if info and info.port != 80: # ignore the actual amplipi service on your network - services_advertised[name] = info - - # advertise amplipi-api service (start this before the listener to verify it can be found after advertisement) - event = Event() - zc_reg = Process(target=amplipi.app.advertise_service, args=(FAKE_PORT,event)) - zc_reg.start() - sleep(4) # wait for a bit to make sure the service is started - - # start listener that adds available services - zeroconf = Zeroconf(ip_version=IPVersion.V4Only) - services = ["_amplipi._tcp.local."] - _ = ServiceBrowser(zeroconf, services, handlers=[on_service_state_change]) - - # wait enough time for a response from the serice - sleep(2) - - # figure out what the name will be - mac_addr = '' - try: - mac_addr = ni.ifaddresses(iface)[ni.AF_LINK][0]['addr'] - except: - mac_addr = 'none' - - AMPLIPI_ZC_NAME = f'amplipi-{mac_addr}._amplipi._tcp.local.' - - # stop the advertiser - event.set() - zc_reg.join() - - # stop the listener - zeroconf.close() - - # check advertisememts - assert AMPLIPI_ZC_NAME in services_advertised - assert services_advertised[AMPLIPI_ZC_NAME].port == FAKE_PORT - @pytest.mark.parametrize('zid', base_zone_ids()) def test_set_zone_vol(client, zid): """ Try changing a zone's volume """ diff --git a/tests/test_zeroconf.py b/tests/test_zeroconf.py new file mode 100644 index 000000000..56af82040 --- /dev/null +++ b/tests/test_zeroconf.py @@ -0,0 +1,53 @@ +from time import sleep +from multiprocessing import Process, Event +import netifaces as ni +from zeroconf import Zeroconf, ServiceStateChange, ServiceBrowser, IPVersion +from context import amplipi + + +def test_zeroconf(): + """ Unit test for zeroconf advertisement """ + FAKE_PORT = 9898 + + # get network interface and MAC address that the service is expected to be advertised on + iface = ni.gateways()['default'][ni.AF_INET][1] + try: + mac_addr = ni.ifaddresses(iface)[ni.AF_LINK][0]['addr'] + except: + mac_addr = 'none' + + AMPLIPI_ZC_NAME = f'amplipi-{mac_addr}._amplipi._tcp.local.' + print(f'Expecting that "{AMPLIPI_ZC_NAME}" is advertised to interface {iface}') + + services_advertised = {} + + def on_service_state_change(zeroconf: Zeroconf, service_type: str, name: str, state_change: ServiceStateChange): + if state_change is ServiceStateChange.Added: + info = zeroconf.get_service_info(service_type, name) + if info and info.port != 80: # ignore the actual amplipi service on your network + services_advertised[name] = info + + # advertise amplipi-api service (start this before the listener to verify it can be found after advertisement) + event = Event() + zc_reg = Process(target=amplipi.zeroconf.advertise_service, args=(FAKE_PORT, event)) + zc_reg.start() + sleep(4) # wait for a bit to make sure the service is started + + # start listener that adds available services + zeroconf = Zeroconf(ip_version=IPVersion.V4Only) + services = ["_amplipi._tcp.local."] + _ = ServiceBrowser(zeroconf, services, handlers=[on_service_state_change]) + + # wait enough time for a response from the service + sleep(2) + + # stop the advertiser + event.set() + zc_reg.join() + + # stop the listener + zeroconf.close() + + # check advertisements + assert AMPLIPI_ZC_NAME in services_advertised + assert services_advertised[AMPLIPI_ZC_NAME].port == FAKE_PORT