Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Arksine/moonraker
Browse files Browse the repository at this point in the history
  • Loading branch information
actions-user committed May 7, 2024
2 parents d323bd4 + ba94285 commit b6eff4c
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ The format is based on [Keep a Changelog].
- **history**: Added support for auxiliary history fields
- **spoolman**: Report spool ids set during a print in history auxiliary data
- **sensor**: Added support for history fields reported in auxiliary data
- **power**: Added support for `uhubctl` devices

### Fixed

Expand Down
57 changes: 55 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -675,11 +675,11 @@ The following configuration options are available for all power device types:
type:
# The type of device. Can be either gpio, klipper_device, rf,
# tplink_smartplug, tasmota, shelly, homeseer, homeassistant, loxonev1,
# smartthings, mqtt or hue.
# smartthings, mqtt, hue, http or uhubctl.
# This parameter must be provided.
initial_state: off
# The state the power device should be initialized to. May be on or
# off. When this option is not specifed no initial state will be set.
# off. When this option is not specified no initial state will be set.
off_when_shutdown: False
# If set to True the device will be powered off when Klipper enters
# the "shutdown" state. This option applies to all device types.
Expand Down Expand Up @@ -1343,6 +1343,59 @@ device_type: light

```

#### USB (uhubctl) devices

Support for toggling USB powered devices via [uhubctl](https://github.com/mvp/uhubctl).

!!! Note
The host machine must have `uhubctl` installed as a prerequisite. In addition,
the required [udev rules](https://github.com/mvp/uhubctl#linux-usb-permissions)
must be installed on the host to give Moonraker permission to toggle hub
power without sudo.

```ini
location:
# Device location of the USB Hub connected to the device to control. The
# location corresponds to the "-l" option of "uhubctl". This parameter
# must be provided.
port:
# Port of the USB device to control. The port corresponds to the "-p"
# option of "ububctl". This parameter must be provided

```

!!! Tip
The `uhubctl` software can be used to list all compatible hubs on the
system by simply executing `uhubctl` with no arguments. The following
is example output from a Raspberry Pi 3B+:

```
Current status for hub 1-1.1 [0424:2514, USB 2.00, 3 ports, ppps]
Port 1: 0503 power highspeed enable connect [0424:7800]
Port 2: 0100 power
Port 3: 0100 power
Current status for hub 1-1 [0424:2514, USB 2.00, 4 ports, ppps]
Port 1: 0503 power highspeed enable connect [0424:2514, USB 2.00, 3 ports, ppps]
Port 2: 0100 power
Port 3: 0103 power enable connect [1d50:614e Klipper rp2040 45503571290B1068]
Port 4: 0100 power
Current status for hub 1 [1d6b:0002 Linux 6.6.28+rpt-rpi-v7 dwc_otg_hcd DWC OTG Controller 3f980000.usb, USB 2.00, 1 ports, ppps]
Port 1: 0503 power highspeed enable connect [0424:2514, USB 2.00, 4 ports, ppps]
```

##### Example

```ini
# moonraker.confg

# Example for controlling a device connected to a Raspberry Pi 3B+.
# Location 1-1 Port 2 controls power for all 4 exposed ports.
[power my_usb_dev]
type: uhubctl
location: 1-1
port: 2
```

#### Generic HTTP Devices

Support for configurable HTTP switches. This device type may be used when
Expand Down
109 changes: 108 additions & 1 deletion moonraker/components/power.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Raspberry Pi Power Control
# Power Switch Control
#
# Copyright (C) 2024 Eric Callahan <[email protected]>
# Copyright (C) 2020 Jordan Ruthe <[email protected]>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
Expand All @@ -10,6 +11,8 @@
import socket
import asyncio
import time
import re
import shutil
from urllib.parse import quote, urlencode
from ..utils import json_wrapper as jsonw
from ..common import RequestType, KlippyState
Expand All @@ -35,6 +38,7 @@
from .mqtt import MQTTClient
from .http_client import HttpClient
from .klippy_connection import KlippyConnection
from .shell_command import ShellCommandFactory as ShellCommand

class PrinterPower:
def __init__(self, config: ConfigHelper) -> None:
Expand All @@ -56,6 +60,7 @@ def __init__(self, config: ConfigHelper) -> None:
"smartthings": SmartThings,
"hue": HueDevice,
"http": GenericHTTP,
"uhubctl": UHubCtl
}

for section in prefix_sections:
Expand Down Expand Up @@ -1465,6 +1470,108 @@ async def _send_power_request(self, state: str) -> str:
async def _send_status_request(self) -> str:
return await self._send_generic_request("status")


HUB_STATE_PATTERN = r"""
(?:Port\s(?P<port>[0-9]+):)
(?:\s(?P<bits>[0-9a-f]{4}))
(?:\s(?P<pstate>power|off))
(?P<flags>(?:\s[0-9a-z]+)+)?
(?:\s\[(?P<desc>.+)\])?
"""

class UHubCtl(PowerDevice):
_uhubctrl_regex = re.compile(
r"^\s*" + HUB_STATE_PATTERN + r"\s*$",
re.VERBOSE | re.IGNORECASE
)
def __init__(self, config: ConfigHelper) -> None:
super().__init__(config)
self.scmd: ShellCommand = self.server.load_component(config, "shell_command")
self.location = config.get("location")
self.port = config.getint("port")
ret = shutil.which("uhubctl")
if ret is None:
raise config.error(
f"[{config.get_name()}]: failed to locate 'uhubctl' binary. "
"Make sure uhubctl is correctly installed on the host machine."
)

async def init_state(self) -> None:
async with self.request_lock:
await self.refresh_status()
cur_state = True if self.state == "on" else False
if self.initial_state is not None and cur_state != self.initial_state:
await self.set_power("on" if self.initial_state else "off")

async def refresh_status(self) -> None:
try:
result = await self._run_uhubctl("info")
except self.server.error as e:
self.state = "error"
output = f"\n{e}"
if isinstance(e, self.scmd.error):
output += f"\nuhubctrl output: {e.stderr.decode(errors='ignore')}"
logging.info(f"Power Device {self.name}: Refresh Error{output}")
return
logging.debug(f"Power Device {self.name}: uhubctl device info: {result}")
self.state = result["state"]

async def set_power(self, state: str) -> None:
try:
result = await self._run_uhubctl(state)
except self.server.error as e:
self.state = "error"
msg = f"Power Device {self.name}: Error turning device {state}"
output = f"\n{e}"
if isinstance(e, self.scmd.error):
output += f"\nuhubctrl output: {e.stderr.decode(errors='ignore')}"
logging.info(f"{msg}{output}")
raise self.server.error(msg) from None
logging.debug(f"Power Device {self.name}: uhubctl device info: {result}")
self.state = result["state"]

async def _run_uhubctl(self, action: str) -> Dict[str, Any]:
cmd = f"uhubctl -l {self.location} -p {self.port}"
search_prefix = "Current status"
if action in ["on", "off"]:
cmd += f" -a {action}"
search_prefix = "New status"
resp: str = await self.scmd.exec_cmd(cmd, log_complete=False)
for line in resp.splitlines():
if search_prefix:
if line.startswith(search_prefix):
search_prefix = ""
continue
match = self._uhubctrl_regex.match(line.strip())
if match is None:
continue
result = match.groupdict()
try:
port = int(result["port"])
status_bits = int(result["bits"], 16)
except (TypeError, ValueError):
continue
if port != self.port:
continue
if result["pstate"] is None:
continue
state = "on" if result["pstate"] == "power" else "off"
flags: List[str] = []
if result["flags"] is not None:
flags = result["flags"].strip().split()
return {
"port": port,
"status_bits": status_bits,
"state": state,
"flags": flags,
"desc": result["desc"]
}
raise self.server.error(
f"Failed to receive response for device at location {self.location}, "
f"port {self.port}, "
)


# The power component has multiple configuration sections
def load_component(config: ConfigHelper) -> PrinterPower:
return PrinterPower(config)

0 comments on commit b6eff4c

Please sign in to comment.