Skip to content
/ xdc Public
forked from adamkewley/xdc

Use an XSens DOT from pure python code

License

Notifications You must be signed in to change notification settings

Lance-Tin/xdc

 
 

Repository files navigation

xdc (XSens DOT Connector)

Use an XSens DOT from pure python code, using bleak

⚠️EXPERIMENTAL ⚠️: this is just something I'm hacking together to move a project forward. It is not a full-fat library, nor robust.

The Python code in here is a low-level Bluetooth Low-Energy client implementation that can pull useful information from an XSens DOT. The implementation is pure Python that is only dependent on the Python standard library and bleak.

This implements an extremely basic wrapper over XSens's "raw" BLE specification, rather than relying on any XSens library code. The motivation for this is that other solutions out there involve using different platforms (e.g. the Android SDK /w Kotlin or Java, NodeJS) or involve installing third-party applications. It's much cleaner to have a system that is written in one language with minimal dependencies, which is why I wrote these bindings.

Requirements

  • python>=3.9: may work on earlier pythons. Haven't tested
  • pip: to install bleak. Not a hard requirement, if you know how to manually install packages

Instal Dependencies

pip3 install -r requirements.txt

Example Usage

import asyncio
import xdc

# xdc code here, e.g.:

## DEVICE/CONNECTION LAYER: ##

# scan for all BLE devices the computer can see
#
# returns a list of `bleak.backends.device.BLEDevice`
xdc.scan_all()

# scan for all DOT devices the computer can see
#
# returns a list of `bleak.backends.device.BLEDevice`
xdc.scan()

# take an element from the scan list (`bleak.backends.device.BLEDevice`)
device = xdc.scan()[0]

# take the address (a string) of the device
#
# handy, because you can save the string to a config file etc. and use it
# to reconnect to the device after reboots etc.
address = device.address

# finds a BLE device by an address string
#
# returns `bleak.backends.device.BLEDevice`
xdc.find_by_address(address)

# same as above, but ensures the address points to a DOT by checking
# whether the device is a DOT after establishing the connection
#
# returns `bleak.backends.device.BLEDevice`
dot = xdc.find_dot_by_address(address)

## EXAMPLE CHARACTERISTIC USE (see source code for more examples) ##

# read the "Device Info Characteristic" for the given DOT
#
# returns `xdc.DeviceInfoCharacteristic`
xdc.device_info_read(dot)

# read the "Device Control Characteristic" for the given DOT
#
# returns `xdc.DeviceControlCharacteristic`
control_chr = xdc.device_control_read(dot)

control_chr.output_rate = 4  # modify it

# write a (potentially, modified) `xdc.DeviceControlCharacteristic` to
# the DOT. This enables controlling the device
xdc.device_control_write(dot, control_chr)


## HIGH-LEVEL CONVENIENCE API (see source code for implementation details) ##

# make the DOT flash its LED light a little bit, so that you can identify it
xdc.identify(dot)

# turn the DOT off (requires maybe pressing the button or shaking it afterwards to turn it back on)
xdc.power_off(dot)

# enable powering the DOT on whenever the micro-USB charger is plugged in
xdc.enable_power_on_by_usb_plug_in(dot)

# (opposite of the above): disable powering the DOT on whenever the micro-USB charger is plugged in
xdc.disable_power_on_by_usb_plug_in(dot)

# set the output rate of the DOT
#
# this is the frequency at which the reporting characteristic (i.e. the thing that is emitted whenever
# the DOT reports telemetry) reports
#
# must be 1, 4, 10, 12, 15, 20, 30, 60, 120 (see official XSens spec: Device Control Characteristic)
xdc.set_output_rate(dot, 10)

# reset the output rate to the default rate
xdc.reset_output_rate(dot)


## READING DATA FROM THE DOT ##
#
# Once you enable reporting, the DOT will asynchronously send telemetry data to the computer.
#
# Robust downstream code should assume that notifications sometimes go missing (e.g. due to
# connection issues)

# a callback function that is called whenever the DOT sends a device report notification
#
# after using `device_report_start_notify`, this will be called by the backend - the caller
# should handle the message bytes as appropriate (e.g. by pumping them into a parser)
def on_device_report(message_id, message_bytes):
    parsed = xdc.DeviceReportCharacteristic.from_bytes(message_bytes)
    print(parsed)

# a callback function that is called whenever the DOT sends a long payload report notifcation
#
# after using `device
def on_long_payload_report(message_id, message_bytes):
    print(message_bytes)

def on_medium_payload_report(message_id, message_bytes):
    print(message_bytes)

def on_short_payload_report(message_id, message_bytes):
    print(message_bytes)

def on_battery_report(message_id, message_bytes):
    print(message_bytes)

## SYNCHRONOUS API (simpler, but not exactly how the communication actually works)

with xdc.Dot(dot) as device:
    # subscribe to notifications
    device.device_report_start_notify(on_device_report)
    device.long_payload_start_notify(on_long_payload_report)
    device.medium_payload_start_notify(on_medium_payload_report)
    device.short_payload_start_notify(on_short_payload_report)

    # make the calling (synchronous) pump the asynchronous event queue forever
    #
    # this is required, because the main thread is responsible for pumping the
    # message queue that contains the above notifications. If you don't pump
    # the queue then you won't see the notifications
    xdc.pump_forever()


## ASYNCHRONOUS API (this is actually how communication with the `bleak` backend actually works)

# define an asynchronous function that should be used as the entrypoint for the asynchronous
# event loop (`asyncio.run_until_complete`)
async def arun():
    async with xdc.Dot(dot) as device:
        # asynchronously subscribe to notifications
        await device.adevice_report_start_notify(on_device_report)
        await device.along_payload_start_notify(on_long_payload_report)
        await device.amedium_payload_start_notify(on_medium_payload_report)
        await device.ashort_payload_start_notify(on_short_payload_report)

        # sleep for some amount of time, while pumping the message queue
        #
        # note: this differs from python's `sleep` function, because it doesn't cause the
        #       calling (asynchronous) thread to entirely sleep - it still processes any
        #       notifications that come in, unlike the synchronous API
        await asyncio.sleep(10)

        # (optional): unsubscribe to the notifications
        await device.adevice_report_stop_notify()
        await device.along_payload_stop_notify()
        await device.amedium_payload_stop_notify()
        await device.ashort_payload_stop_notify()

# start running the async task from the calling thread (by making the calling thread fully
# pump the event loop until the task is complete)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

General Tips & Tricks

People have emailed me about using this library. To be clear, xdc is an experimental library. I am far too busy to productionize it right now (with tests, full documentation etc.). This is why it feels a bit hacky.

Just to answer some previous questions I have received about xdc:

  • It's a library I tinker with occasionally in my spare time. My primary area of interest is C++; specifically, OpenSim Creator, which may eventually include in-UI XDC support, if I ever get the time.

  • The entire implementation of xdc is in one file, xdc.py. I have tried to make the code simple. You may find that you can hack around some of xdc's, uh, "quirks" by reading through the source and changing a line or two

  • Synchronous methods use a standard naming convention, e.g. xdc.identify. Asynchronous equivalents typically prefix an a before the method name, e.g. xdc.aidentify. In almost all cases, the synchronous API will call into the asynchronous API because the underlying BLE library being used (bleak) is asynchronous by design (which is a reasonable design decision, given how BLE devices typically work).

  • Almost all DOT "messages" are named [Message]Characteristic in the source code to reflect how they are represented by BLE. Most of the characteristics described in the official XSens DOT documentation are represented by an equivalent python class (e.g. xdc.DeviceControlCharacteristic). Almost all characteristics have UUID, SIZE, from_bytes, and to_bytes properties/methods. The xdc API effectively pumps raw byte-messages into- and out-of these classes

  • xdc is composed of roughly 4 layers of API:

    • Lowest-level byte parsers and characteristic representations (i.e. any class with Characteristic in the name)

    • Low-level asynchronous Dot lifetime wrapper. I.e. the thing that lets you write async with xdc.Dot(dev) as device:. This is effectively what all the higher-level and synchronous APIs defer to eventually

    • Medium-level Dot synchronous lifetime wrapper. I.e. the thing that lets you write with xdc.Dot(dev) as device:. This is the same class as the asynchronous one, but wraps up calling into/out-of the asynchronous event loop. All methods on this class ultimately use the asynchronous API of the Dot lifetime wrapper

    • High-level asynchronous free-functions (e.g. xdc.aidentify(device), xdc.ascan()). These are higher-level free functions that probably use bleak and the Dot lifetime wrapper internally. These are lower-performance than using the Dot lifetime wrapper yourself because they internally need to connect and tear-down a Dot wrapper on every call

    • Highest-level synchronous free-functions (e.g. xdc.idenfity(device), xdc.power_off(device)). These are high-level free functions that internally use the asynchronous event loop, bleak, and the Dot lifetime wrapper. These are the lowest-performance API because they need to hop into the asynchronous event loop, create a Dot connection, tear it down, and exit the event loop.

  • Overall, it's recommended to use the highest-level API to test whether the DOT works etc. (it's the easiest API to use), but you will probably find that your code needs to go deeper and deeper into the lower-levels once you (e.g.) need certain performance guarantees, or need to handle receiving notifications from multiple DOTs concurrently, etc. - there are no silver bullets in unreliable hardware communication protocols

About

Use an XSens DOT from pure python code

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 100.0%