From ccebd3b30dfb4ba6ba477793b3f91611a3d8e98e Mon Sep 17 00:00:00 2001 From: Matthias Hagmann <16444067+MattHag@users.noreply.github.com> Date: Fri, 12 Apr 2024 02:58:15 +0200 Subject: [PATCH] Introduce DeviceProtocol Type device interface for hidpp10 and hidpp20 --- lib/logitech_receiver/hidpp10.py | 135 +++++++++++++++++++------------ lib/logitech_receiver/hidpp20.py | 72 +++++++++++------ 2 files changed, 133 insertions(+), 74 deletions(-) diff --git a/lib/logitech_receiver/hidpp10.py b/lib/logitech_receiver/hidpp10.py index 80d3ccda6c..6e90cd4929 100644 --- a/lib/logitech_receiver/hidpp10.py +++ b/lib/logitech_receiver/hidpp10.py @@ -13,10 +13,15 @@ ## You should have received a copy of the GNU General Public License along ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +from __future__ import annotations import logging -from .common import Battery as _Battery +from typing_extensions import Protocol + +from .common import Battery +from .common import BatteryChargeApproximation +from .common import BatteryStatus from .common import FirmwareInfo as _FirmwareInfo from .common import bytes2int as _bytes2int from .common import int2bytes as _int2bytes @@ -26,19 +31,36 @@ logger = logging.getLogger(__name__) -# -# functions -# +class DeviceProtocol(Protocol): + def request(self): + ... + + @property + def kind(self): + ... + + @property + def online(self): + ... + + @property + def protocol(self): + ... -def read_register(device, register_number, *params): + @property + def registers(self): + ... + + +def read_register(device: DeviceProtocol, register_number, *params): assert device is not None, f"tried to read register {register_number:02X} from invalid device {device}" # support long registers by adding a 2 in front of the register number request_id = 0x8100 | (int(register_number) & 0x2FF) return device.request(request_id, *params) -def write_register(device, register_number, *value): +def write_register(device: DeviceProtocol, register_number, *value): assert device is not None, f"tried to write register {register_number:02X} to invalid device {device}" # support long registers by adding a 2 in front of the register number request_id = 0x8000 | (int(register_number) & 0x2FF) @@ -59,7 +81,7 @@ def set_configuration_pending_flags(receiver, devices): class Hidpp10: - def get_battery(self, device): + def get_battery(self, device: DeviceProtocol): assert device is not None assert device.kind is not None if not device.online: @@ -89,7 +111,7 @@ def get_battery(self, device): device.registers.append(REGISTERS.battery_status) return parse_battery_status(REGISTERS.battery_status, reply) - def get_firmware(self, device): + def get_firmware(self, device: DeviceProtocol): assert device is not None firmware = [None, None, None] @@ -124,7 +146,7 @@ def get_firmware(self, device): if any(firmware): return tuple(f for f in firmware if f) - def set_3leds(self, device, battery_level=None, charging=None, warning=None): + def set_3leds(self, device: DeviceProtocol, battery_level=None, charging=None, warning=None): assert device is not None assert device.kind is not None if not device.online: @@ -134,17 +156,17 @@ def set_3leds(self, device, battery_level=None, charging=None, warning=None): return if battery_level is not None: - if battery_level < _Battery.APPROX.critical: + if battery_level < BatteryChargeApproximation.CRITICAL: # 1 orange, and force blink v1, v2 = 0x22, 0x00 warning = True - elif battery_level < _Battery.APPROX.low: + elif battery_level < BatteryChargeApproximation.LOW: # 1 orange v1, v2 = 0x22, 0x00 - elif battery_level < _Battery.APPROX.good: + elif battery_level < BatteryChargeApproximation.GOOD: # 1 green v1, v2 = 0x20, 0x00 - elif battery_level < _Battery.APPROX.full: + elif battery_level < BatteryChargeApproximation.FULL: # 2 greens v1, v2 = 0x20, 0x02 else: @@ -166,10 +188,10 @@ def set_3leds(self, device, battery_level=None, charging=None, warning=None): write_register(device, REGISTERS.three_leds, v1, v2) - def get_notification_flags(self, device): + def get_notification_flags(self, device: DeviceProtocol): return self._get_register(device, REGISTERS.notifications) - def set_notification_flags(self, device, *flag_bits): + def set_notification_flags(self, device: DeviceProtocol, *flag_bits): assert device is not None # Avoid a call if the device is not online, @@ -184,10 +206,10 @@ def set_notification_flags(self, device, *flag_bits): result = write_register(device, REGISTERS.notifications, _int2bytes(flag_bits, 3)) return result is not None - def get_device_features(self, device): + def get_device_features(self, device: DeviceProtocol): return self._get_register(device, REGISTERS.mouse_button_flags) - def _get_register(self, device, register): + def _get_register(self, device: DeviceProtocol, register): assert device is not None # Avoid a call if the device is not online, @@ -203,50 +225,61 @@ def _get_register(self, device, register): return _bytes2int(flags) -def parse_battery_status(register, reply): +def parse_battery_status(register, reply) -> Battery | None: + def status_byte_to_charge(status_byte_: int) -> BatteryChargeApproximation: + if status_byte_ == 7: + charge_ = BatteryChargeApproximation.FULL + elif status_byte_ == 5: + charge_ = BatteryChargeApproximation.GOOD + elif status_byte_ == 3: + charge_ = BatteryChargeApproximation.LOW + elif status_byte_ == 1: + charge_ = BatteryChargeApproximation.CRITICAL + else: + # pure 'charging' notifications may come without a status + charge_ = BatteryChargeApproximation.EMPTY + return charge_ + + def status_byte_to_battery_status(status_byte_: int) -> BatteryStatus: + if status_byte_ == 0x30: + status_text_ = BatteryStatus.DISCHARGING + elif status_byte_ == 0x50: + status_text_ = BatteryStatus.RECHARGING + elif status_byte_ == 0x90: + status_text_ = BatteryStatus.FULL + else: + status_text_ = None + return status_text_ + + def charging_byte_to_status_text(charging_byte_: int) -> BatteryStatus: + if charging_byte_ == 0x00: + status_text_ = BatteryStatus.DISCHARGING + elif charging_byte_ & 0x21 == 0x21: + status_text_ = BatteryStatus.RECHARGING + elif charging_byte_ & 0x22 == 0x22: + status_text_ = BatteryStatus.FULL + else: + logger.warning("could not parse 0x07 battery status: %02X (level %02X)", charging_byte_, status_byte) + status_text_ = None + return status_text_ + if register == REGISTERS.battery_charge: charge = ord(reply[:1]) status_byte = ord(reply[2:3]) & 0xF0 - status_text = ( - _Battery.STATUS.discharging - if status_byte == 0x30 - else _Battery.STATUS.recharging - if status_byte == 0x50 - else _Battery.STATUS.full - if status_byte == 0x90 - else None - ) - return _Battery(charge, None, status_text, None) + + battery_status = status_byte_to_battery_status(status_byte) + return Battery(charge, None, battery_status, None) if register == REGISTERS.battery_status: status_byte = ord(reply[:1]) - charge = ( - _Battery.APPROX.full - if status_byte == 7 # full - else _Battery.APPROX.good - if status_byte == 5 # good - else _Battery.APPROX.low - if status_byte == 3 # low - else _Battery.APPROX.critical - if status_byte == 1 # critical - # pure 'charging' notifications may come without a status - else _Battery.APPROX.empty - ) - charging_byte = ord(reply[1:2]) - if charging_byte == 0x00: - status_text = _Battery.STATUS.discharging - elif charging_byte & 0x21 == 0x21: - status_text = _Battery.STATUS.recharging - elif charging_byte & 0x22 == 0x22: - status_text = _Battery.STATUS.full - else: - logger.warning("could not parse 0x07 battery status: %02X (level %02X)", charging_byte, status_byte) - status_text = None + + status_text = charging_byte_to_status_text(charging_byte) + charge = status_byte_to_charge(status_byte) if charging_byte & 0x03 and status_byte == 0: # some 'charging' notifications may come with no battery level information charge = None # Return None for next charge level and voltage as these are not in HID++ 1.0 spec - return _Battery(charge, None, status_text, None) + return Battery(charge, None, status_text, None) diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index 87e318265e..ca4a1f9422 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -26,6 +26,8 @@ import yaml as _yaml +from typing_extensions import Protocol + from . import exceptions from . import hidpp10_constants as _hidpp10_constants from . import special_keys @@ -1399,6 +1401,30 @@ def feature_request(device, feature, function=0x00, *params, no_reply=False): ) +class DeviceProtocol(Protocol): + def feature_request(self, feature: FEATURE): + ... + + def request(self): + ... + + @property + def features(self): + ... + + @property + def _gestures(self): + ... + + @property + def _backlight(self): + ... + + @property + def _profiles(self): + ... + + class Hidpp20: def get_firmware(self, device): """Reads a device's firmware info. @@ -1446,7 +1472,7 @@ def get_ids(self, device): offset = offset + 2 return (unitId.hex().upper(), modelId.hex().upper(), tid_map) - def get_kind(self, device): + def get_kind(self, device: DeviceProtocol): """Reads a device's type. :see DEVICE_KIND: @@ -1460,7 +1486,7 @@ def get_kind(self, device): # logger.debug("device %d type %d = %s", devnumber, kind, DEVICE_KIND[kind]) return KIND_MAP[DEVICE_KIND[kind]] - def get_name(self, device): + def get_name(self, device: DeviceProtocol): """Reads a device's name. :returns: a string with the device name, or ``None`` if the device is not @@ -1481,7 +1507,7 @@ def get_name(self, device): return name.decode("utf-8") - def get_friendly_name(self, device): + def get_friendly_name(self, device: DeviceProtocol): """Reads a device's friendly name. :returns: a string with the device name, or ``None`` if the device is not @@ -1502,22 +1528,22 @@ def get_friendly_name(self, device): return name.decode("utf-8") - def get_battery_status(self, device): + def get_battery_status(self, device: DeviceProtocol): report = device.feature_request(FEATURE.BATTERY_STATUS) if report: return decipher_battery_status(report) - def get_battery_unified(self, device): + def get_battery_unified(self, device: DeviceProtocol): report = device.feature_request(FEATURE.UNIFIED_BATTERY, 0x10) if report is not None: return decipher_battery_unified(report) - def get_battery_voltage(self, device): + def get_battery_voltage(self, device: DeviceProtocol): report = device.feature_request(FEATURE.BATTERY_VOLTAGE) if report is not None: return decipher_battery_voltage(report) - def get_adc_measurement(self, device): + def get_adc_measurement(self, device: DeviceProtocol): try: # this feature call produces an error for headsets that are connected but inactive report = device.feature_request(FEATURE.ADC_MEASUREMENT) if report is not None: @@ -1542,7 +1568,7 @@ def get_battery(self, device, feature): return result return 0 - def get_keys(self, device): + def get_keys(self, device: DeviceProtocol): # TODO: add here additional variants for other REPROG_CONTROLS count = None if FEATURE.REPROG_CONTROLS_V2 in device.features: @@ -1553,30 +1579,30 @@ def get_keys(self, device): return KeysArrayV4(device, ord(count[:1])) return None - def get_remap_keys(self, device): + def get_remap_keys(self, device: DeviceProtocol): count = device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x10) if count: return KeysArrayPersistent(device, ord(count[:1])) - def get_gestures(self, device): + def get_gestures(self, device: DeviceProtocol): if getattr(device, "_gestures", None) is not None: return device._gestures if FEATURE.GESTURE_2 in device.features: return Gestures(device) - def get_backlight(self, device): + def get_backlight(self, device: DeviceProtocol): if getattr(device, "_backlight", None) is not None: return device._backlight if FEATURE.BACKLIGHT2 in device.features: return Backlight(device) - def get_profiles(self, device): + def get_profiles(self, device: DeviceProtocol): if getattr(device, "_profiles", None) is not None: return device._profiles if FEATURE.ONBOARD_PROFILES in device.features: return OnboardProfiles.from_device(device) - def get_mouse_pointer_info(self, device): + def get_mouse_pointer_info(self, device: DeviceProtocol): pointer_info = device.feature_request(FEATURE.MOUSE_POINTER) if pointer_info: dpi, flags = _unpack("!HB", pointer_info[:3]) @@ -1590,7 +1616,7 @@ def get_mouse_pointer_info(self, device): "suggest_vertical_orientation": suggest_vertical_orientation, } - def get_vertical_scrolling_info(self, device): + def get_vertical_scrolling_info(self, device: DeviceProtocol): vertical_scrolling_info = device.feature_request(FEATURE.VERTICAL_SCROLLING) if vertical_scrolling_info: roller, ratchet, lines = _unpack("!BBB", vertical_scrolling_info[:3]) @@ -1606,13 +1632,13 @@ def get_vertical_scrolling_info(self, device): )[roller] return {"roller": roller_type, "ratchet": ratchet, "lines": lines} - def get_hi_res_scrolling_info(self, device): + def get_hi_res_scrolling_info(self, device: DeviceProtocol): hi_res_scrolling_info = device.feature_request(FEATURE.HI_RES_SCROLLING) if hi_res_scrolling_info: mode, resolution = _unpack("!BB", hi_res_scrolling_info[:2]) return mode, resolution - def get_pointer_speed_info(self, device): + def get_pointer_speed_info(self, device: DeviceProtocol): pointer_speed_info = device.feature_request(FEATURE.POINTER_SPEED) if pointer_speed_info: pointer_speed_hi, pointer_speed_lo = _unpack("!BB", pointer_speed_info[:2]) @@ -1620,14 +1646,14 @@ def get_pointer_speed_info(self, device): # pointer_speed_lo = pointer_speed_lo return pointer_speed_hi + pointer_speed_lo / 256 - def get_lowres_wheel_status(self, device): + def get_lowres_wheel_status(self, device: DeviceProtocol): lowres_wheel_status = device.feature_request(FEATURE.LOWRES_WHEEL) if lowres_wheel_status: wheel_flag = _unpack("!B", lowres_wheel_status[:1])[0] wheel_reporting = ("HID", "HID++")[wheel_flag & 0x01] return wheel_reporting - def get_hires_wheel(self, device): + def get_hires_wheel(self, device: DeviceProtocol): caps = device.feature_request(FEATURE.HIRES_WHEEL, 0x00) mode = device.feature_request(FEATURE.HIRES_WHEEL, 0x10) ratchet = device.feature_request(FEATURE.HIRES_WHEEL, 0x030) @@ -1653,7 +1679,7 @@ def get_hires_wheel(self, device): return multi, has_invert, has_ratchet, inv, res, target, ratchet - def get_new_fn_inversion(self, device): + def get_new_fn_inversion(self, device: DeviceProtocol): state = device.feature_request(FEATURE.NEW_FN_INVERSION, 0x00) if state: inverted, default_inverted = _unpack("!BB", state[:2]) @@ -1661,7 +1687,7 @@ def get_new_fn_inversion(self, device): default_inverted = (default_inverted & 0x01) != 0 return inverted, default_inverted - def get_host_names(self, device): + def get_host_names(self, device: DeviceProtocol): state = device.feature_request(FEATURE.HOSTS_INFO, 0x00) host_names = {} if state: @@ -1709,7 +1735,7 @@ def set_host_name(self, device, name, currentName=""): chunk += 14 return True - def get_onboard_mode(self, device): + def get_onboard_mode(self, device: DeviceProtocol): state = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x20) if state: @@ -1720,7 +1746,7 @@ def set_onboard_mode(self, device, mode): state = device.feature_request(FEATURE.ONBOARD_PROFILES, 0x10, mode) return state - def get_polling_rate(self, device): + def get_polling_rate(self, device: DeviceProtocol): state = device.feature_request(FEATURE.REPORT_RATE, 0x10) if state: rate = _unpack("!B", state[:1])[0] @@ -1732,7 +1758,7 @@ def get_polling_rate(self, device): rate = _unpack("!B", state[:1])[0] return rates[rate] - def get_remaining_pairing(self, device): + def get_remaining_pairing(self, device: DeviceProtocol): result = device.feature_request(FEATURE.REMAINING_PAIRING, 0x0) if result: result = _unpack("!B", result[:1])[0]