Skip to content

Commit

Permalink
fix: resolve unawaited task errors on connect/disconnect (#103)
Browse files Browse the repository at this point in the history
* fix: resolve unawaited task errors on connect/disconnect

* chore: make lint happy
  • Loading branch information
the-ress authored Aug 5, 2023
1 parent 58452b0 commit 1ad03be
Showing 1 changed file with 38 additions and 26 deletions.
64 changes: 38 additions & 26 deletions roborock/cloud_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import logging
import threading
import uuid
from asyncio import Lock
from typing import Optional
from asyncio import Lock, Task
from typing import Any, Optional
from urllib.parse import urlparse

import paho.mqtt.client as mqtt
Expand Down Expand Up @@ -112,40 +112,52 @@ def sync_start_loop(self) -> None:
self._logger.info("Starting mqtt loop")
super().loop_start()

def sync_disconnect(self) -> bool:
rc = mqtt.MQTT_ERR_AGAIN
def sync_disconnect(self) -> tuple[bool, Task[tuple[Any, VacuumError | None]] | None]:
if not self.is_connected():
return False, None

self._logger.info("Disconnecting from mqtt")
disconnected_future = asyncio.ensure_future(self._async_response(DISCONNECT_REQUEST_ID))
rc = super().disconnect()

if rc == mqtt.MQTT_ERR_NO_CONN:
disconnected_future.cancel()
return False, None

if rc != mqtt.MQTT_ERR_SUCCESS:
disconnected_future.cancel()
raise RoborockException(f"Failed to disconnect ({mqtt.error_string(rc)})")

return True, disconnected_future

def sync_connect(self) -> tuple[bool, Task[tuple[Any, VacuumError | None]] | None]:
if self.is_connected():
self._logger.info("Disconnecting from mqtt")
rc = super().disconnect()
if rc not in [mqtt.MQTT_ERR_SUCCESS, mqtt.MQTT_ERR_NO_CONN]:
raise RoborockException(f"Failed to disconnect ({mqtt.error_string(rc)})")
return rc == mqtt.MQTT_ERR_SUCCESS

def sync_connect(self) -> bool:
should_connect = not self.is_connected()
if should_connect:
if self._mqtt_port is None or self._mqtt_host is None:
raise RoborockException("Mqtt information was not entered. Cannot connect.")
self._logger.info("Connecting to mqtt")
super().connect(host=self._mqtt_host, port=self._mqtt_port, keepalive=KEEPALIVE)
self.sync_start_loop()
return False, None

if self._mqtt_port is None or self._mqtt_host is None:
raise RoborockException("Mqtt information was not entered. Cannot connect.")

self._logger.info("Connecting to mqtt")
connected_future = asyncio.ensure_future(self._async_response(CONNECT_REQUEST_ID))
super().connect(host=self._mqtt_host, port=self._mqtt_port, keepalive=KEEPALIVE)

self.sync_start_loop()
return should_connect
return True, connected_future

async def async_disconnect(self) -> None:
async with self._mutex:
async_response = asyncio.ensure_future(self._async_response(DISCONNECT_REQUEST_ID))
disconnecting = self.sync_disconnect()
if disconnecting:
(_, err) = await async_response
(disconnecting, disconnected_future) = self.sync_disconnect()
if disconnecting and disconnected_future:
(_, err) = await disconnected_future
if err:
raise RoborockException(err) from err

async def async_connect(self) -> None:
async with self._mutex:
async_response = asyncio.ensure_future(self._async_response(CONNECT_REQUEST_ID))
connecting = self.sync_connect()
if connecting:
(_, err) = await async_response
(connecting, connected_future) = self.sync_connect()
if connecting and connected_future:
(_, err) = await connected_future
if err:
raise RoborockException(err) from err

Expand Down

0 comments on commit 1ad03be

Please sign in to comment.