Skip to content

Commit

Permalink
Add a new linter rule and commit random linter fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
C0rn3j committed Sep 16, 2024
1 parent 6efff67 commit 29d4e6f
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 68 deletions.
1 change: 1 addition & 0 deletions ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ indent-style = 'tab'
select = ['ALL']
ignore = [
'W191', # We use tabs for indents, disabling this atrocious PEP 8 recommendation
'D206', # ^
]
53 changes: 26 additions & 27 deletions scc/device_monitor.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
#!/usr/bin/env python3
"""
SC-Controller - Device Monitor
"""SC-Controller - Device Monitor.
Extends eudevmonitor with options to register callbacks and
manage plugging/releasing devices.
"""
from __future__ import annotations

from ctypes.util import find_library
from typing import TYPE_CHECKING, Tuple, Union

from scc.lib.eudevmonitor import Eudev, Monitor
from scc.lib.ioctl_opt import IOR
from ctypes.util import find_library
from typing import Union, Tuple, TYPE_CHECKING

if TYPE_CHECKING:
from scc.sccdaemon import SCCDaemon
import os
import ctypes
import fcntl
import re
import logging
import os
import re
import time

log = logging.getLogger("DevMon")

RE_BT_NUMBERS = re.compile(r"[0-9A-F]{4}:([0-9A-F]{4}):([0-9A-F]{4}).*")
HCIGETCONNLIST = IOR(ord('H'), 212, ctypes.c_int)
HCIGETCONNLIST = IOR(ord("H"), 212, ctypes.c_int)
HAVE_BLUETOOTH_LIB = False
try:
btlib_name = find_library('bluetooth')
btlib_name = find_library("bluetooth")
assert btlib_name
btlib = ctypes.CDLL(btlib_name)
HAVE_BLUETOOTH_LIB = True
Expand All @@ -36,7 +38,7 @@ class DeviceMonitor(Monitor):

def __init__(self, *a) -> None:
Monitor.__init__(self, *a)
self.daemon: Union['SCCDaemon', None] = None
self.daemon: SCCDaemon | None = None
self.dev_added_cbs = {}
self.dev_removed_cbs = {}
self.bt_addresses = {}
Expand Down Expand Up @@ -87,7 +89,7 @@ def _on_new_syspath(self, subsystem, syspath) -> None:
vendor, product = None, None
else:
vendor, product = self.get_vendor_product(syspath, subsystem)
except (OSError, IOError):
except OSError:
# Cannot grab vendor & product, probably subdevice or bus itself
return
key = (subsystem, vendor, product)
Expand Down Expand Up @@ -119,15 +121,13 @@ def _get_hci_addresses(self) -> None:

for i in range(cl.conn_num):
ci = cl.conn_info[i]
id = "hci%s:%s" % (cl.dev_id, ci.handle)
id = f"hci{cl.dev_id}:{ci.handle}"
address = ":".join([ hex(x).lstrip("0x").zfill(2).upper() for x in reversed(ci.bdaddr) ])
self.bt_addresses[id] = address


def _dev_for_hci(self, syspath) -> Union[str, None]:
"""
For given syspath leading to ../hciX:ABCD, returns input device node
"""
def _dev_for_hci(self, syspath) -> str | None:
"""For given syspath leading to ../hciX:ABCD, returns input device node."""
name = syspath.split("/")[-1]
if ":" not in name:
return None
Expand All @@ -139,7 +139,7 @@ def _dev_for_hci(self, syspath) -> Union[str, None]:
#Joe: somehow my node_addr is in lowercase
if node_addr is not None:
node_addr = str.upper(node_addr)
except IOError:
except OSError:
continue
try:
if node_addr is not None and addr is not None:
Expand Down Expand Up @@ -171,7 +171,7 @@ def on_data_ready(self, *a) -> None:


def rescan(self) -> None:
""" Scans and calls callbacks for already connected devices """
"""Scan and call callbacks for already connected devices."""
self._get_hci_addresses()
enumerator = self._eudev.enumerate()
subsystem_to_vp_to_callback = {}
Expand All @@ -187,17 +187,16 @@ def rescan(self) -> None:
if syspath not in self.known_devs:
try:
subsystem = DeviceMonitor.get_subsystem(syspath)
except (IOError, OSError):
except OSError:
continue
if subsystem in subsystem_to_vp_to_callback:
self._on_new_syspath(subsystem, syspath)


def get_vendor_product(self, syspath: str, subsystem: str | None = None) -> Tuple[int, int]:
"""
For given syspath, reads and returns (vendor_id, product_id) as ints.
"""For given syspath, reads and returns (vendor_id, product_id) as ints.
May throw all kinds of OSErrors or IOErrors
May throw all kinds of OSErrors
"""
if os.path.exists(os.path.join(syspath, "idVendor")):
vendor = int(open(os.path.join(syspath, "idVendor")).read().strip(), 16)
Expand Down Expand Up @@ -226,7 +225,7 @@ def get_vendor_product(self, syspath: str, subsystem: str | None = None) -> Tupl
raise OSError("Cannot determine vendor and product IDs")


def get_hidraw(self, syspath) -> Union[str, None]:
def get_hidraw(self, syspath) -> str | None:
"""
For given syspath, returns name of assotiated hidraw device.
Returns None if there is no such thing.
Expand All @@ -242,7 +241,7 @@ def get_hidraw(self, syspath) -> Union[str, None]:


@staticmethod
def _find_bt_address(syspath) -> Union[str, None]:
def _find_bt_address(syspath) -> str | None:
"""
Recursivelly searchs for "input*" subdirectories until "uniq" file
is found. Then, returns address from that file.
Expand All @@ -261,11 +260,11 @@ def _find_bt_address(syspath) -> Union[str, None]:


@staticmethod
def get_usb_address(syspath) -> Tuple[int, int]:
def get_usb_address(syspath) -> tuple[int, int]:
"""
For given syspath, reads and returns (busnum, devnum) as ints.
May throw all kinds of OSErrors or IOErrors
May throw all kinds of OSErrors
"""
busnum = int(open(os.path.join(syspath, "busnum")).read().strip())
devnum = int(open(os.path.join(syspath, "devnum")).read().strip())
Expand Down Expand Up @@ -301,7 +300,7 @@ class hci_conn_list_req(ctypes.Structure):
]


def create_device_monitor(daemon: 'SCCDaemon') -> DeviceMonitor:
def create_device_monitor(daemon: SCCDaemon) -> DeviceMonitor:
mon = Eudev().monitor(subclass=DeviceMonitor)
assert type(mon) is DeviceMonitor # Satisfy type checker
mon.daemon = daemon
Expand Down
85 changes: 49 additions & 36 deletions scc/drivers/ds4drv.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,41 @@
#!/usr/bin/env python3
"""
SC Controller - Dualshock 4 Driver
"""SC Controller - Dualshock 4 Driver.
Extends HID driver with DS4-specific options.
"""

from scc.drivers.hiddrv import BUTTON_COUNT, ButtonData, AxisType, AxisData
from scc.drivers.hiddrv import HIDController, HIDDecoder, hiddrv_test
from scc.drivers.hiddrv import AxisMode, AxisDataUnion, AxisModeData
from scc.drivers.hiddrv import HatswitchModeData, _lib
from scc.drivers.evdevdrv import HAVE_EVDEV, EvdevController, get_axes
from scc.drivers.evdevdrv import get_evdev_devices_from_syspath
from scc.drivers.evdevdrv import make_new_device
import ctypes
import logging
import sys
from typing import TYPE_CHECKING

from scc.constants import STICK_PAD_MAX, STICK_PAD_MIN, ControllerFlags, SCButtons
from scc.drivers.evdevdrv import (
HAVE_EVDEV,
EvdevController,
get_axes,
get_evdev_devices_from_syspath,
make_new_device,
)
from scc.drivers.hiddrv import (
BUTTON_COUNT,
AxisData,
AxisDataUnion,
AxisMode,
AxisModeData,
AxisType,
ButtonData,
HatswitchModeData,
HIDController,
HIDDecoder,
_lib,
hiddrv_test,
)
from scc.drivers.usb import register_hotplug_device
from scc.constants import SCButtons, ControllerFlags
from scc.constants import STICK_PAD_MIN, STICK_PAD_MAX
from scc.tools import init_logging, set_logging_level
import sys
import logging
import ctypes

if TYPE_CHECKING:
from scc.sccdaemon import SCCDaemon

log = logging.getLogger("DS4")

VENDOR_ID = 0x054c
Expand Down Expand Up @@ -61,39 +78,39 @@ def _load_hid_descriptor(self, config, max_size, vid, pid, test_mode):
mode = AxisMode.HATSWITCH, byte_offset = 5, size = 8,
data = AxisDataUnion(hatswitch = HatswitchModeData(
button = SCButtons.LPAD | SCButtons.LPADTOUCH,
min = STICK_PAD_MIN, max = STICK_PAD_MAX
min = STICK_PAD_MIN, max = STICK_PAD_MAX,
)))
self._decoder.axes[AxisType.AXIS_STICK_X] = AxisData(
mode = AxisMode.AXIS, byte_offset = 1, size = 8,
data = AxisDataUnion(axis = AxisModeData(
scale = 1.0, offset = -127.5, clamp_max = 257, deadzone = 10
scale = 1.0, offset = -127.5, clamp_max = 257, deadzone = 10,
)))
self._decoder.axes[AxisType.AXIS_STICK_Y] = AxisData(
mode = AxisMode.AXIS, byte_offset = 2, size = 8,
data = AxisDataUnion(axis = AxisModeData(
scale = -1.0, offset = 127.5, clamp_max = 257, deadzone = 10
scale = -1.0, offset = 127.5, clamp_max = 257, deadzone = 10,
)))
self._decoder.axes[AxisType.AXIS_RPAD_X] = AxisData(
mode = AxisMode.AXIS, byte_offset = 3, size = 8,
data = AxisDataUnion(axis = AxisModeData(
button = SCButtons.RPADTOUCH,
scale = 1.0, offset = -127.5, clamp_max = 257, deadzone = 10
scale = 1.0, offset = -127.5, clamp_max = 257, deadzone = 10,
)))
self._decoder.axes[AxisType.AXIS_RPAD_Y] = AxisData(
mode = AxisMode.AXIS, byte_offset = 4, size = 8,
data = AxisDataUnion(axis = AxisModeData(
button = SCButtons.RPADTOUCH,
scale = -1.0, offset = 127.5, clamp_max = 257, deadzone = 10
scale = -1.0, offset = 127.5, clamp_max = 257, deadzone = 10,
)))
self._decoder.axes[AxisType.AXIS_LTRIG] = AxisData(
mode = AxisMode.AXIS, byte_offset = 8, size = 8,
data = AxisDataUnion(axis = AxisModeData(
scale = 1.0, clamp_max = 1, deadzone = 10
scale = 1.0, clamp_max = 1, deadzone = 10,
)))
self._decoder.axes[AxisType.AXIS_RTRIG] = AxisData(
mode = AxisMode.AXIS, byte_offset = 9, size = 8,
data = AxisDataUnion(axis = AxisModeData(
scale = 1.0, clamp_max = 1, deadzone = 10
scale = 1.0, clamp_max = 1, deadzone = 10,
)))
self._decoder.axes[AxisType.AXIS_GPITCH] = AxisData(
mode = AxisMode.DS4ACCEL, byte_offset = 13)
Expand All @@ -114,7 +131,7 @@ def _load_hid_descriptor(self, config, max_size, vid, pid, test_mode):
mode = AxisMode.DS4TOUCHPAD, byte_offset = 37, bit_offset=4)
self._decoder.buttons = ButtonData(
enabled = True, byte_offset=5, bit_offset=4, size=14,
button_count = 14
button_count = 14,
)

if test_mode:
Expand Down Expand Up @@ -168,7 +185,7 @@ def _generate_id(self) -> str:
magic_number = 1
id = "ds4"
while id in self.daemon.get_active_ids():
id = "ds4:%s" % (magic_number, )
id = f"ds4:{magic_number}"
magic_number += 1
return id

Expand Down Expand Up @@ -240,7 +257,7 @@ class DS4EvdevController(EvdevController):
| ControllerFlags.NO_GRIPS
)

def __init__(self, daemon, controllerdevice, gyro, touchpad):
def __init__(self, daemon: "SCCDaemon", controllerdevice, gyro, touchpad):
config = {
'axes' : DS4EvdevController.AXIS_MAP,
'buttons' : DS4EvdevController.BUTTON_MAP,
Expand Down Expand Up @@ -271,7 +288,7 @@ def _gyro_input(self, *a):
if axis:
new_state = new_state._replace(
**{ axis : int(event.value * factor) })
except IOError:
except OSError:
# Errors here are not even reported, evdev class handles important ones
return

Expand Down Expand Up @@ -311,7 +328,7 @@ def _touchpad_input(self, *a):
b = new_state.buttons & ~SCButtons.CPADTOUCH
new_state = new_state._replace(buttons = b,
cpad_x = 0, cpad_y = 0)
except IOError:
except OSError:
# Errors here are not even reported, evdev class handles important ones
return

Expand Down Expand Up @@ -350,10 +367,7 @@ def __repr__(self) -> str:


def _generate_id(self) -> str:
"""
ID is generated as 'ds4' or 'ds4:X' where 'X' starts as 1 and increases
as controllers with same ids are connected.
"""
"""ID is generated as 'ds4' or 'ds4:X' where 'X' starts as 1 and increases as controllers with same ids are connected."""
magic_number = 1
id = "ds4"
while id in self.daemon.get_active_ids():
Expand All @@ -362,8 +376,8 @@ def _generate_id(self) -> str:
return id


def init(daemon, config: dict) -> bool:
""" Registers hotplug callback for ds4 device """
def init(daemon: "SCCDaemon", config: dict) -> bool:
"""Register hotplug callback for ds4 device."""

def hid_callback(device, handle):
return DS4Controller(device, daemon, handle, None, None)
Expand Down Expand Up @@ -427,9 +441,8 @@ def fail_cb(syspath, vid: int, pid: int) -> None:
daemon.get_device_monitor().add_callback("bluetooth",
VENDOR_ID, DS4_V1_PRODUCT_ID, make_evdev_device, None)
return True
else:
log.warning("Neither HID nor Evdev driver is enabled, DS4 support cannot be enabled.")
return False
log.warning("Neither HID nor Evdev driver is enabled, DS4 support cannot be enabled.")
return False


if __name__ == "__main__":
Expand Down
1 change: 0 additions & 1 deletion scc/gui/daemon_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#!/usr/bin/env python3
"""
SC-Controller - DaemonManager
Expand Down
1 change: 0 additions & 1 deletion scc/gui/importexport/dialog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#!/usr/bin/env python3
"""
SC-Controller - Import / Export Dialog
"""
Expand Down
1 change: 0 additions & 1 deletion scc/sccdaemon.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#!/usr/bin/env python3
"""
SC-Controller - Daemon class
"""
Expand Down
3 changes: 1 addition & 2 deletions scc/tools.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#!/usr/bin/env python3
"""
SC-Controller - tools
Expand Down Expand Up @@ -26,7 +25,7 @@
log = logging.getLogger("tools.py")
_ = lambda x : x

LOG_FORMAT = "%(levelname)s %(name)-13s %(message)s"
LOG_FORMAT = "%(levelname)s %(name)-13s %(message)s"

def init_logging(prefix="", suffix=""):
"""
Expand Down

0 comments on commit 29d4e6f

Please sign in to comment.