diff --git a/doc/configuration.rst b/doc/configuration.rst index f70bf2b66..e45ecbd4b 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -2174,9 +2174,33 @@ Implements: Arguments: - delay (float, default=2.0): delay in seconds between off and on +GpioDigitalInputDriver +~~~~~~~~~~~~~~~~~~~~~~ +The :any:`GpioDigitalInputDriver` reads a digital signal from a GPIO line. + +This driver configures GPIO lines via `the sysfs kernel interface ` +as an input. + + +Binds to: + gpio: + - `SysfsGPIO`_ + - `MatchedSysfsGPIO`_ + - NetworkSysfsGPIO + +Implements: + - :any:`DigitalInputProtocol` + +.. code-block:: yaml + + GpioDigitalInputDriver: {} + +Arguments: + - None + GpioDigitalOutputDriver ~~~~~~~~~~~~~~~~~~~~~~~ -The :any:`GpioDigitalOutputDriver` writes a digital signal to a GPIO line. +The :any:`GpioDigitalOutputDriver` reads and writes a digital signal from and to a GPIO line. This driver configures GPIO lines via `the sysfs kernel interface `. While the driver automatically exports the GPIO, it does not configure it in any other way than as an output. diff --git a/labgrid/driver/__init__.py b/labgrid/driver/__init__.py index 721256bbf..f89cb2ce6 100644 --- a/labgrid/driver/__init__.py +++ b/labgrid/driver/__init__.py @@ -26,7 +26,7 @@ from .sigrokdriver import SigrokDriver, SigrokPowerDriver, SigrokDmmDriver from .usbstoragedriver import USBStorageDriver, NetworkUSBStorageDriver, Mode from .resetdriver import DigitalOutputResetDriver -from .gpiodriver import GpioDigitalOutputDriver +from .gpiodriver import GpioDigitalInputDriver, GpioDigitalOutputDriver from .filedigitaloutput import FileDigitalOutputDriver from .serialdigitaloutput import SerialPortDigitalOutputDriver from .xenadriver import XenaDriver diff --git a/labgrid/driver/gpiodriver.py b/labgrid/driver/gpiodriver.py index 2de90618e..68ae13027 100644 --- a/labgrid/driver/gpiodriver.py +++ b/labgrid/driver/gpiodriver.py @@ -2,13 +2,44 @@ import attr from ..factory import target_factory -from ..protocol import DigitalOutputProtocol +from ..protocol import DigitalInputProtocol, DigitalOutputProtocol from ..resource.remote import NetworkSysfsGPIO from ..step import step from .common import Driver from ..util.agentwrapper import AgentWrapper +@target_factory.reg_driver +@attr.s(eq=False) +class GpioDigitalInputDriver(Driver, DigitalInputProtocol): + + bindings = { + "gpio": {"SysfsGPIO", "NetworkSysfsGPIO"}, + } + + def __attrs_post_init__(self): + super().__attrs_post_init__() + self.wrapper = None + + def on_activate(self): + if isinstance(self.gpio, NetworkSysfsGPIO): + host = self.gpio.host + else: + host = None + self.wrapper = AgentWrapper(host) + self.proxy = self.wrapper.load('sysfsgpioin') + + def on_deactivate(self): + self.wrapper.close() + self.wrapper = None + self.proxy = None + + @Driver.check_active + @step(result=True) + def get(self): + return self.proxy.get(self.gpio.index) + + @target_factory.reg_driver @attr.s(eq=False) class GpioDigitalOutputDriver(Driver, DigitalOutputProtocol): diff --git a/labgrid/protocol/__init__.py b/labgrid/protocol/__init__.py index 0ac225622..1b3fc41c3 100644 --- a/labgrid/protocol/__init__.py +++ b/labgrid/protocol/__init__.py @@ -6,6 +6,7 @@ from .filetransferprotocol import FileTransferProtocol from .infoprotocol import InfoProtocol from .digitaloutputprotocol import DigitalOutputProtocol +from .digitalinputprotocol import DigitalInputProtocol from .mmioprotocol import MMIOProtocol from .filesystemprotocol import FileSystemProtocol from .resetprotocol import ResetProtocol diff --git a/labgrid/protocol/digitalinputprotocol.py b/labgrid/protocol/digitalinputprotocol.py new file mode 100644 index 000000000..768a4387e --- /dev/null +++ b/labgrid/protocol/digitalinputprotocol.py @@ -0,0 +1,10 @@ +import abc + + +class DigitalInputProtocol(abc.ABC): + """Abstract class providing the DigitalInputProtocol interface""" + + @abc.abstractmethod + def get(self): + """Implementations should return the status of the digital input.""" + raise NotImplementedError diff --git a/labgrid/protocol/digitaloutputprotocol.py b/labgrid/protocol/digitaloutputprotocol.py index 6b1ce7e23..c80737273 100644 --- a/labgrid/protocol/digitaloutputprotocol.py +++ b/labgrid/protocol/digitaloutputprotocol.py @@ -1,13 +1,10 @@ import abc +from .digitalinputprotocol import DigitalInputProtocol -class DigitalOutputProtocol(abc.ABC): - """Abstract class providing the DigitalOutputProtocol interface""" - - @abc.abstractmethod - def get(self): - """Implementations should return the status of the digital output.""" - raise NotImplementedError +class DigitalOutputProtocol(DigitalInputProtocol): + """Abstract class providing the DigitalOutputProtocol interface. + Implies that the set output can be read as well, so requires DigitalInputProtocol""" @abc.abstractmethod def set(self, status): diff --git a/labgrid/util/agents/sysfsgpioin.py b/labgrid/util/agents/sysfsgpioin.py new file mode 100644 index 000000000..7d6e12967 --- /dev/null +++ b/labgrid/util/agents/sysfsgpioin.py @@ -0,0 +1,72 @@ +""" +This module implements reading GPIOs via sysfs GPIO kernel interface. + +Takes an integer property 'index' which refers to the already exported GPIO device. + +""" +import logging +import os + +class GpioDigitalInput: + _gpio_sysfs_path_prefix = '/sys/class/gpio' + + @staticmethod + def _assert_gpio_line_is_exported(index): + gpio_sysfs_path = os.path.join(GpioDigitalInput._gpio_sysfs_path_prefix, + f'gpio{index}') + # Deprecated: the exporter can export on acquire, we are leaving this + # in for now to support exporters which have not been updated yet. + if not os.path.exists(gpio_sysfs_path): + export_sysfs_path = os.path.join(GpioDigitalInput._gpio_sysfs_path_prefix, 'export') + with open(export_sysfs_path, mode='wb') as export: + export.write(str(index).encode('utf-8')) + if not os.path.exists(gpio_sysfs_path): + raise ValueError("Device not found") + + def __init__(self, index): + self._logger = logging.getLogger("Device: ") + GpioDigitalInput._assert_gpio_line_is_exported(index) + gpio_sysfs_path = os.path.join(GpioDigitalInput._gpio_sysfs_path_prefix, + f'gpio{index}') + + gpio_sysfs_direction_path = os.path.join(gpio_sysfs_path, 'direction') + with open(gpio_sysfs_direction_path, 'rb') as direction_fd: + literal_value = direction_fd.read(2) + if literal_value != b"in": + self._logger.debug("Configuring GPIO %d as input.", index) + with open(gpio_sysfs_direction_path, 'wb') as direction_fd: + direction_fd.write(b'in') + + gpio_sysfs_value_path = os.path.join(gpio_sysfs_path, 'value') + self.gpio_sysfs_value_fd = os.open(gpio_sysfs_value_path, flags=(os.O_RDONLY | os.O_SYNC)) + + def __del__(self): + os.close(self.gpio_sysfs_value_fd) + self.gpio_sysfs_value_fd = None + + def get(self): + os.lseek(self.gpio_sysfs_value_fd, 0, os.SEEK_SET) + literal_value = os.read(self.gpio_sysfs_value_fd, 1) + if literal_value == b'0': + return False + elif literal_value == b'1': + return True + raise ValueError("GPIO value is out of range.") + + +_gpios = {} + +def _get_gpio_line(index): + if index not in _gpios: + _gpios[index] = GpioDigitalInput(index=index) + return _gpios[index] + + +def handle_get(index): + gpio_line = _get_gpio_line(index) + return gpio_line.get() + + +methods = { + 'get': handle_get, +} diff --git a/tests/test_agent.py b/tests/test_agent.py index f5ac21653..d6556b3ec 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -106,6 +106,9 @@ def test_all_modules(): methods = aw.list() assert 'sysfsgpio.set' in methods assert 'sysfsgpio.get' in methods + aw.load('sysfsgpioin') + methods = aw.list() + assert 'sysfsgpio.get' in methods aw.load('usb_hid_relay') methods = aw.list() assert 'usb_hid_relay.set' in methods