Skip to content

Commit

Permalink
Merge pull request #76 from warhammerkid/lazy-bluetooth-connect
Browse files Browse the repository at this point in the history
Lazy Bluetooth Connect
  • Loading branch information
warhammerkid authored Apr 29, 2023
2 parents 2eb5546 + 71c7e59 commit 4dc245f
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 220 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* BREAKING: The internal API has changed to better reflect standard MODBUS terminology
* Out-of-range internal_current_three values are no longer reported for AC300
* No longer checks if it can connect to all listed devices when logging

## 0.13.0

Expand Down
37 changes: 19 additions & 18 deletions bluetti_mqtt/bluetooth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ async def scan_devices():
print(f'Found {d.name}: address {d.address}')


def build_device(address: str, name: str):
match = DEVICE_NAME_RE.match(name)
if match[1] == 'AC200M':
return AC200M(address, match[2])
if match[1] == 'AC300':
return AC300(address, match[2])
if match[1] == 'AC500':
return AC500(address, match[2])
if match[1] == 'EP500':
return EP500(address, match[2])
if match[1] == 'EP500P':
return EP500P(address, match[2])
if match[1] == 'EP600':
return EP600(address, match[2])
if match[1] == 'EB3A':
return EB3A(address, match[2])


async def check_addresses(addresses: Set[str]):
logging.debug(f'Checking we can connect: {addresses}')
devices = await BleakScanner.discover()
Expand All @@ -32,21 +50,4 @@ async def check_addresses(addresses: Set[str]):
if len(filtered) != len(addresses):
return []

def build_device(device: BLEDevice) -> BluettiDevice:
match = DEVICE_NAME_RE.match(device.name)
if match[1] == 'AC200M':
return AC200M(device.address, match[2])
if match[1] == 'AC300':
return AC300(device.address, match[2])
if match[1] == 'AC500':
return AC500(device.address, match[2])
if match[1] == 'EP500':
return EP500(device.address, match[2])
if match[1] == 'EP500P':
return EP500P(device.address, match[2])
if match[1] == 'EP600':
return EP600(device.address, match[2])
if match[1] == 'EB3A':
return EB3A(device.address, match[2])

return [build_device(d) for d in filtered]
return [build_device(d.address, d.name) for d in filtered]
207 changes: 130 additions & 77 deletions bluetti_mqtt/bluetooth/client.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,46 @@
import asyncio
from enum import Enum, auto, unique
import logging
from typing import Union
from bleak import BleakClient, BleakError
from bleak.exc import BleakDeviceNotFoundError
from bluetti_mqtt.core import DeviceCommand
from .exc import BadConnectionError, ModbusError, ParseError


@unique
class ClientState(Enum):
NOT_CONNECTED = auto()
CONNECTED = auto()
READY = auto()
PERFORMING_COMMAND = auto()
COMMAND_ERROR_WAIT = auto()
DISCONNECTING = auto()


class BluetoothClient:
RESPONSE_TIMEOUT = 5
WRITE_UUID = '0000ff02-0000-1000-8000-00805f9b34fb'
NOTIFY_UUID = '0000ff01-0000-1000-8000-00805f9b34fb'
DEVICE_NAME_UUID = '00002a00-0000-1000-8000-00805f9b34fb'

name: Union[str, None]
current_command: DeviceCommand
notify_future: asyncio.Future
notify_response: bytearray

def __init__(self, address: str):
self.address = address
self.client = BleakClient(address)
self.state = ClientState.NOT_CONNECTED
self.name = None
self.client = BleakClient(self.address)
self.command_queue = asyncio.Queue()
self.notify_future = None
self.loop = asyncio.get_running_loop()

@property
def is_connected(self):
return self.client.is_connected
def is_ready(self):
return self.state == ClientState.READY or self.state == ClientState.PERFORMING_COMMAND

async def perform(self, cmd: DeviceCommand):
future = self.loop.create_future()
Expand All @@ -34,84 +51,120 @@ async def perform_nowait(self, cmd: DeviceCommand):
await self.command_queue.put((cmd, None))

async def run(self):
while True:
# Try to connect
try:
await self.client.connect()
logging.info(f'Connected to device: {self.address}')
except BaseException:
logging.exception(f'Error connecting to device {self.address}:')
await asyncio.sleep(1)
logging.info(f'Retrying connection to {self.address}')
continue

# Register for notifications and run command loop
try:
await self.client.start_notify(
self.NOTIFY_UUID,
self._notification_handler)
await self._perform_commands(self.client)
except (BleakError, asyncio.TimeoutError):
logging.exception(f'Reconnecting to {self.address} after error:')
continue
except BadConnectionError:
logging.exception(f'Delayed reconnect to {self.address} after error:')
await asyncio.sleep(1)
finally:
try:
while True:
if self.state == ClientState.NOT_CONNECTED:
await self._connect()
elif self.state == ClientState.CONNECTED:
if not self.name:
await self._get_name()
else:
await self._start_listening()
elif self.state == ClientState.READY:
await self._perform_command()
elif self.state == ClientState.DISCONNECTING:
await self._disconnect()
else:
logging.warn(f'Unexpected current state {self.state}')
self.state = ClientState.NOT_CONNECTED
finally:
# Ensure that we disconnect
if self.client:
await self.client.disconnect()

async def _perform_commands(self, client):
while client.is_connected:
cmd, cmd_future = await self.command_queue.get()
retries = 0
while retries < 5:
try:
# Prepare to make request
self.current_command = cmd
self.notify_future = self.loop.create_future()
self.notify_response = bytearray()

# Make request
await client.write_gatt_char(
self.WRITE_UUID,
bytes(self.current_command))

# Wait for response
res = await asyncio.wait_for(
self.notify_future,
timeout=self.RESPONSE_TIMEOUT)
if cmd_future:
cmd_future.set_result(res)

# Success!
break
except ParseError:
# For safety, wait the full timeout before retrying again
retries += 1
await asyncio.sleep(self.RESPONSE_TIMEOUT)
except asyncio.TimeoutError:
retries += 1
except (ModbusError, BleakError) as err:
if cmd_future:
cmd_future.set_exception(err)

# Don't retry
break
except BadConnectionError as err:
# Exit command loop
if cmd_future:
cmd_future.set_exception(err)
self.command_queue.task_done()
raise

if retries == 5:
err = BadConnectionError('too many retries')
async def _connect(self):
"""Establish connection to the bluetooth device"""
try:
await self.client.connect()
self.state = ClientState.CONNECTED
logging.info(f'Connected to device: {self.address}')
except BleakDeviceNotFoundError:
logging.debug(f'Error connecting to device {self.address}: Not found')
except (BleakError, EOFError, asyncio.TimeoutError):
logging.exception(f'Error connecting to device {self.address}:')
await asyncio.sleep(1)

async def _get_name(self):
"""Get device name, which can be parsed for type"""
try:
name = await self.client.read_gatt_char(self.DEVICE_NAME_UUID)
self.name = name.decode('ascii')
logging.info(f'Device {self.address} has name: {self.name}')
except BleakError:
logging.exception(f'Error retrieving device name {self.address}:')
self.state = ClientState.DISCONNECTING

async def _start_listening(self):
"""Register for command response notifications"""
try:
await self.client.start_notify(
self.NOTIFY_UUID,
self._notification_handler)
self.state = ClientState.READY
except BleakError:
self.state = ClientState.DISCONNECTING

async def _perform_command(self):
cmd, cmd_future = await self.command_queue.get()
retries = 0
while retries < 5:
try:
# Prepare to make request
self.state = ClientState.PERFORMING_COMMAND
self.current_command = cmd
self.notify_future = self.loop.create_future()
self.notify_response = bytearray()

# Make request
await self.client.write_gatt_char(
self.WRITE_UUID,
bytes(self.current_command))

# Wait for response
res = await asyncio.wait_for(
self.notify_future,
timeout=self.RESPONSE_TIMEOUT)
if cmd_future:
cmd_future.set_result(res)

# Success!
self.state = ClientState.READY
break
except ParseError:
# For safety, wait the full timeout before retrying again
self.state = ClientState.COMMAND_ERROR_WAIT
retries += 1
await asyncio.sleep(self.RESPONSE_TIMEOUT)
except asyncio.TimeoutError:
self.state = ClientState.COMMAND_ERROR_WAIT
retries += 1
except ModbusError as err:
if cmd_future:
cmd_future.set_exception(err)
self.command_queue.task_done()
raise err
else:
self.command_queue.task_done()

# Don't retry
self.state = ClientState.READY
break
except (BleakError, EOFError, BadConnectionError) as err:
if cmd_future:
cmd_future.set_exception(err)

self.state = ClientState.DISCONNECTING
break

if retries == 5:
err = BadConnectionError('too many retries')
if cmd_future:
cmd_future.set_exception(err)
self.state = ClientState.DISCONNECTING

self.command_queue.task_done()

async def _disconnect(self):
await self.client.disconnect()
logging.warn(f'Delayed reconnect to {self.address} after error')
await asyncio.sleep(5)
self.state = ClientState.NOT_CONNECTED

def _notification_handler(self, _sender: int, data: bytearray):
# Ignore notifications we don't expect
Expand Down
47 changes: 29 additions & 18 deletions bluetti_mqtt/bluetooth/manager.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,48 @@
import asyncio
import logging
from typing import Dict, List
from bluetti_mqtt.core import BluettiDevice, DeviceCommand
from bleak import BleakScanner
from bluetti_mqtt.core import DeviceCommand
from .client import BluetoothClient


class MultiDeviceManager:
clients: Dict[BluettiDevice, BluetoothClient]
clients: Dict[str, BluetoothClient]

def __init__(self, devices: List[BluettiDevice]):
self.devices = devices
def __init__(self, addresses: List[str]):
self.addresses = addresses
self.clients = {}

async def run(self):
addresses = [d.address for d in self.devices]
logging.info(f'Connecting to clients: {addresses}')
self.clients = {d: BluetoothClient(d.address) for d in self.devices}
logging.info(f'Connecting to clients: {self.addresses}')

# Perform a blocking scan just to speed up initial connect
await BleakScanner.discover()

# Start client loops
self.clients = {a: BluetoothClient(a) for a in self.addresses}
await asyncio.gather(*[c.run() for c in self.clients.values()])

def is_connected(self, device: BluettiDevice):
if device in self.clients:
return self.clients[device].is_connected
def is_ready(self, address: str):
if address in self.clients:
return self.clients[address].is_ready
else:
return False

async def perform(self, device: BluettiDevice, command: DeviceCommand):
if device in self.clients:
return await self.clients[device].perform(command)
def get_name(self, address: str):
if address in self.clients:
return self.clients[address].name
else:
raise Exception('Unknown address')

async def perform(self, address: str, command: DeviceCommand):
if address in self.clients:
return await self.clients[address].perform(command)
else:
raise Exception('Unknown device')
raise Exception('Unknown address')

async def perform_nowait(self, device: BluettiDevice, command: DeviceCommand):
if device in self.clients:
await self.clients[device].perform_nowait(command)
async def perform_nowait(self, address: str, command: DeviceCommand):
if address in self.clients:
await self.clients[address].perform_nowait(command)
else:
raise Exception('Unknown device')
raise Exception('Unknown address')
1 change: 0 additions & 1 deletion bluetti_mqtt/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from .devices.eb3a import EB3A
from .commands import (
DeviceCommand,
ModbusDeviceCommand,
ReadHoldingRegisters,
WriteSingleRegister,
WriteMultipleRegisters
Expand Down
Loading

0 comments on commit 4dc245f

Please sign in to comment.