Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Arksine/moonraker
Browse files Browse the repository at this point in the history
  • Loading branch information
actions-user committed Apr 29, 2024
2 parents 58eb2d3 + 0f2e3d7 commit 176ee49
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 10 deletions.
6 changes: 5 additions & 1 deletion docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ The format is based on [Keep a Changelog].
- **update_manager**: Added support for "zipped" application updates
- **file_manager**: Added `enable_config_write_access` option
- **machine**: Add support for system peripheral queries
- **mqtt**: Added the `status_interval` option to support rate limiting
- **mqtt**: Added the `enable_tls` option to support ssl/tls connections

### Fixed

Expand All @@ -49,6 +51,8 @@ The format is based on [Keep a Changelog].
skipped subscription updates.
- **configheler**: Fixed inline comment parsing.
- **authorization**: Fixed blocking call to `socket.getfqdn()`
- **power**: Fixed "on_when_job_queued" behavior when the internal device
state is stale.

### Changed

Expand All @@ -59,7 +63,7 @@ The format is based on [Keep a Changelog].
- **build**: Bumped zeroconf to version `0.131.0`
- **build**: Bumped libnacl to version `2.1.0`
- **build**: Bumped distro to version `1.9.0`
- **build**: Bumped pillow to version `10.2.0`
- **build**: Bumped pillow to version `10.3.0`
- **build**: Bumped streaming-form-data to version `1.13.0`
- **machine**: Added `ratos-configurator` to list of default allowed services
- **update_manager**: It is now required that an application be "allowed"
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2114,6 +2114,11 @@ address:
# parameter must be provided.
port:
# Port the Broker is listening on. Default is 1883.
enable_tls: False
# Enables SSL/TLS connections when set to true. Note that if a user intends
# to connect to a local MQTT service using a self signed certificate then
# it will be necessary to install the root CA certificate on the machine
# hosting Moonraker. Default is False.
username:
# An optional username used to log in to the Broker. This option accepts
# Jinja2 Templates, see the [secrets] section for details. The default is
Expand Down Expand Up @@ -2178,6 +2183,10 @@ status_objects:
#
# If not configured then no objects will be tracked and published to
# the klipper/status topic.
status_interval:
# The interval (in seconds) between published status updates. This value
# can be used to limit the rate of updates published. By default Moonraker
# will publish Klipper status updates as it receives them.
publish_split_status: False
# Configures how to publish status updates to MQTT.
#
Expand Down
122 changes: 114 additions & 8 deletions moonraker/components/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import ssl
from collections import deque
import paho.mqtt.client as paho_mqtt
import paho.mqtt
from ..common import (
TransportType,
RequestType,
Expand All @@ -38,10 +39,12 @@
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..common import JsonRPC, APIDefinition
from ..eventloop import FlexTimer
from .klippy_apis import KlippyAPI
FlexCallback = Callable[[bytes], Optional[Coroutine]]
RPCCallback = Callable[..., Coroutine]

PAHO_MQTT_VERSION = tuple([int(p) for p in paho.mqtt.__version__.split(".")])
DUP_API_REQ_CODE = -10000
MQTT_PROTOCOLS = {
'v3.1': paho_mqtt.MQTTv31,
Expand All @@ -60,7 +63,9 @@ def reconnect(self, sock: Optional[socket.socket] = None):
if self._port <= 0:
raise ValueError('Invalid port number.')

if hasattr(self, "_out_packet_mutex"):
if PAHO_MQTT_VERSION >= (2, 0):
return self._v2_reconnect(sock)
if PAHO_MQTT_VERSION < (1, 6):
# Paho Mqtt Version < 1.6.x
self._in_packet = {
"command": 0,
Expand Down Expand Up @@ -157,6 +162,65 @@ def reconnect(self, sock: Optional[socket.socket] = None):

return self._send_connect(self._keepalive)

def _v2_reconnect(self, sock: Optional[socket.socket] = None):
self._in_packet = {
"command": 0,
"have_remaining": 0,
"remaining_count": [],
"remaining_mult": 1,
"remaining_length": 0,
"packet": bytearray(b""),
"to_process": 0,
"pos": 0,
}

self._ping_t = 0.0 # type: ignore
self._state = paho_mqtt._ConnectionState.MQTT_CS_CONNECTING

self._sock_close()

# Mark all currently outgoing QoS = 0 packets as lost,
# or `wait_for_publish()` could hang forever
for pkt in self._out_packet:
if (
pkt["command"] & 0xF0 == paho_mqtt.PUBLISH and
pkt["qos"] == 0 and pkt["info"] is not None
):
pkt["info"].rc = paho_mqtt.MQTT_ERR_CONN_LOST
pkt["info"]._set_as_published()

self._out_packet.clear()

with self._msgtime_mutex:
self._last_msg_in = paho_mqtt.time_func()
self._last_msg_out = paho_mqtt.time_func()

# Put messages in progress in a valid state.
self._messages_reconnect_reset()

with self._callback_mutex:
on_pre_connect = self.on_pre_connect

if on_pre_connect:
try:
on_pre_connect(self, self._userdata)
except Exception as err:
self._easy_log(
paho_mqtt.MQTT_LOG_ERR,
'Caught exception in on_pre_connect: %s', err
)
if not self.suppress_exceptions:
raise

self._sock = sock or self._create_socket()

self._sock.setblocking(False) # type: ignore[attr-defined]
self._registered_write = False
self._call_socket_open(self._sock)

return self._send_connect(self._keepalive)


class SubscriptionHandle:
def __init__(self, topic: str, callback: FlexCallback) -> None:
self.callback = callback
Expand Down Expand Up @@ -253,6 +317,7 @@ def __init__(self, config: ConfigHelper) -> None:
self.eventloop = self.server.get_event_loop()
self.address: str = config.get('address')
self.port: int = config.getint('port', 1883)
self.tls_enabled: bool = config.getboolean("enable_tls", False)
user = config.gettemplate('username', None)
self.user_name: Optional[str] = None
if user:
Expand Down Expand Up @@ -287,7 +352,12 @@ def __init__(self, config: ConfigHelper) -> None:
"between 0 and 2")
self.publish_split_status = \
config.getboolean("publish_split_status", False)
self.client = ExtPahoClient(protocol=self.protocol)
if PAHO_MQTT_VERSION < (2, 0):
self.client = ExtPahoClient(protocol=self.protocol)
else:
self.client = ExtPahoClient(
paho_mqtt.CallbackAPIVersion.VERSION1, protocol=self.protocol
)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
Expand Down Expand Up @@ -323,6 +393,10 @@ def __init__(self, config: ConfigHelper) -> None:
status_cfg: Dict[str, str] = config.getdict(
"status_objects", {}, allow_empty_fields=True
)
self.status_interval = config.getfloat("status_interval", 0, above=.25)
self.status_cache: Dict[str, Dict[str, Any]] = {}
self.status_update_timer: Optional[FlexTimer] = None
self.last_status_time = 0.
self.status_objs: Dict[str, Optional[List[str]]] = {}
for key, val in status_cfg.items():
if val is not None:
Expand All @@ -334,6 +408,13 @@ def __init__(self, config: ConfigHelper) -> None:
self.server.register_event_handler(
"server:klippy_started", self._handle_klippy_started
)
self.server.register_event_handler(
"server:klippy_disconnect", self._handle_klippy_disconnect
)
if self.status_interval:
self.status_update_timer = self.eventloop.register_timer(
self._handle_timed_status_update
)

self.timestamp_deque: Deque = deque(maxlen=20)
self.api_qos = config.getint('api_qos', self.qos)
Expand All @@ -360,6 +441,8 @@ async def component_init(self) -> None:
self.client.will_set(self.moonraker_status_topic,
payload=jsonw.dumps({'server': 'offline'}),
qos=self.qos, retain=True)
if self.tls_enabled:
self.client.tls_set()
self.client.connect_async(self.address, self.port)
self.connect_task = self.eventloop.create_task(
self._do_reconnect(first=True)
Expand All @@ -371,6 +454,16 @@ async def _handle_klippy_started(self, state: KlippyState) -> None:
await kapi.subscribe_from_transport(
self.status_objs, self, default=None,
)
if self.status_update_timer is not None:
self.status_update_timer.start(delay=self.status_interval)

def _handle_klippy_disconnect(self):
if self.status_update_timer is not None:
self.status_update_timer.stop()
if self.status_cache:
payload = self.status_cache
self.status_cache = {}
self._publish_status_update(payload, self.last_status_time)

def _on_message(self,
client: str,
Expand Down Expand Up @@ -694,18 +787,29 @@ def screen_rpc_request(
else:
self.timestamp_deque.append(ts)

def send_status(self,
status: Dict[str, Any],
eventtime: float
) -> None:
def send_status(self, status: Dict[str, Any], eventtime: float) -> None:
if not status or not self.is_connected():
return
if not self.status_interval:
self._publish_status_update(status, eventtime)
else:
for key, val in status.items():
self.status_cache.setdefault(key, {}).update(val)
self.last_status_time = eventtime

def _handle_timed_status_update(self, eventtime: float) -> float:
if self.status_cache:
payload = self.status_cache
self.status_cache = {}
self._publish_status_update(payload, self.last_status_time)
return eventtime + self.status_interval

def _publish_status_update(self, status: Dict[str, Any], eventtime: float) -> None:
if self.publish_split_status:
for objkey in status:
objval = status[objkey]
for statekey in objval:
payload = {'eventtime': eventtime,
'value': objval[statekey]}
payload = {'eventtime': eventtime, 'value': objval[statekey]}
self.publish_topic(
f"{self.klipper_state_prefix}/{objkey}/{statekey}",
payload, retain=True)
Expand All @@ -718,6 +822,8 @@ def get_instance_name(self) -> str:
return self.instance_name

async def close(self) -> None:
if self.status_update_timer is not None:
self.status_update_timer.stop()
if self.connect_task is not None:
self.connect_task.cancel()
self.connect_task = None
Expand Down
6 changes: 5 additions & 1 deletion moonraker/components/power.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def _power_off_on_shutdown(self) -> None:
power.set_device_power(self.name, "off")

def should_turn_on_when_queued(self) -> bool:
return self.on_when_queued and self.state == "off"
return self.on_when_queued

def _setup_bound_services(self) -> None:
if not self.bound_services:
Expand Down Expand Up @@ -679,6 +679,7 @@ async def set_power(self, state: str) -> None:
kapis: APIComp = self.server.lookup_component('klippy_apis')
value = "1" if state == "on" else "0"
await kapis.run_gcode(f"{self.gc_cmd} VALUE={value}")
assert self.update_fut is not None
await asyncio.wait_for(self.update_fut, 1.)
except TimeoutError:
self.state = "error"
Expand Down Expand Up @@ -1273,6 +1274,7 @@ async def _on_mqtt_connected(self) -> None:
while self.mqtt.is_connected():
self.query_response = self.eventloop.create_future()
try:
assert self.query_response is not None
await self._wait_for_update(self.query_response)
except asyncio.TimeoutError:
# Only wait once if no query topic is set.
Expand Down Expand Up @@ -1335,6 +1337,7 @@ async def refresh_status(self) -> None:
"MQTT Not Connected", 503)
self.query_response = self.eventloop.create_future()
try:
assert self.query_response is not None
await self._wait_for_update(self.query_response)
except Exception:
logging.exception(f"MQTT Power Device {self.name}: "
Expand All @@ -1361,6 +1364,7 @@ async def set_power(self, state: str) -> None:
self.query_response = self.eventloop.create_future()
new_state = "error"
try:
assert self.query_response is not None
payload = self.cmd_payload.render({'command': state})
await self.mqtt.publish_topic(
self.cmd_topic, payload, self.qos,
Expand Down

0 comments on commit 176ee49

Please sign in to comment.