From 0ac878efcc58f55876140bfd9b3b1557b189ee00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20B=C3=B6hm?= Date: Tue, 26 Dec 2023 17:36:41 +0100 Subject: [PATCH] docs: Add section about sorting into multiple queues --- CONTRIBUTING.md | 11 ++-- README.md | 20 ++++---- docs/alongside-fastapi-and-co.md | 4 ++ docs/introduction.md | 8 +-- docs/subscribing-to-a-topic.md | 88 ++++++++++++++++++++++++++------ scripts/README.md | 2 +- scripts/setup | 2 +- 7 files changed, 98 insertions(+), 37 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 2773024..26dc856 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,4 +1,4 @@ -# Contributing +# How to contribute We're very happy about contributions to aiomqtt! ✨ @@ -6,11 +6,10 @@ We're very happy about contributions to aiomqtt! ✨ - Clone the aiomqtt repository - Install the Python version noted in `.python-version` via `pyenv` -- [Install poetry](https://python-poetry.org/docs/#installation) -- Install the dependencies with `./scripts/setup` +- Install poetry; Then run `./scripts/setup` to install the dependencies and aiomqtt itself - Run black, ruff, and mypy with `./scripts/check` - Run the tests with `./scripts/test` -- Spin up a local mosquitto broker via Docker with `./scripts/develop` +- Spin up a local mosquitto broker via Docker with `./scripts/develop`; You can connect to this broker via `aiomqtt.Client("localhost", port=1883)` ## The documentation @@ -20,8 +19,8 @@ The Markdown source files are located in the `docs` folder. The reference sectio ## Making a pull request -The branch to contribute to is `main`. You can create a draft pull request if your contribution is not yet ready to merge. Please check if your changes call for updates to the documentation and don't forget to add your name and contribution to the `CHANGELOG.md`! +Please check if your changes call for updates to the documentation and don't forget to add your name and contribution to the `CHANGELOG.md`! You can create a draft pull request if your contribution is not yet ready to merge. ### Visual Studio Code -If you're using VSCode, you can find workspace settings and recommended extensions in the `.vscode` folder. +You can find workspace settings and recommended extensions in the `.vscode` folder. diff --git a/README.md b/README.md index c974cab..7408184 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ async with Client("test.mosquitto.org") as client: print(message.payload) ``` -aiomqtt combines the stability of the time-proven [paho-mqtt](https://github.com/eclipse/paho.mqtt.python) library with an intuitive, idiomatic asyncio interface: +aiomqtt combines the stability of the time-proven [paho-mqtt](https://github.com/eclipse/paho.mqtt.python) library with an idiomatic asyncio interface: - No more callbacks! 👍 - No more return codes (welcome to the `MqttError`) @@ -55,9 +55,9 @@ aiomqtt combines the stability of the time-proven [paho-mqtt](https://github.com ## Installation -aiomqtt can be installed via `pip install aiomqtt`. It requires Python `3.8+` to run. The only dependency is [paho-mqtt](https://github.com/eclipse/paho.mqtt.python). +aiomqtt can be installed via `pip install aiomqtt`. The only dependency is [paho-mqtt](https://github.com/eclipse/paho.mqtt.python). -If you can't wait for the latest version and want to install directly from GitHub, use: +If you can't wait for the latest version, you can install aiomqtt directly from GitHub with: `pip install git+https://github.com/sbtinstruments/aiomqtt` @@ -85,7 +85,7 @@ Note that the underlying paho-mqtt library is dual-licensed. One of the licenses ## Contributing -We're very happy about contributions to the project! You can get started by reading [CONTRIBUTING.md](https://github.com/sbtinstruments/aiomqtt/blob/main/CONTRIBUTING.md). +We're very happy about contributions to aiomqtt! ✨ You can get started by reading [CONTRIBUTING.md](https://github.com/sbtinstruments/aiomqtt/blob/main/CONTRIBUTING.md). ## Versioning @@ -99,9 +99,9 @@ The changelog lives in [CHANGELOG.md](https://github.com/sbtinstruments/aiomqtt/ Is aiomqtt not what you're looking for? There are a few other clients you can try: -- [paho-mqtt](https://github.com/eclipse/paho.mqtt.python) — Own protocol implementation. Synchronous.
![GitHub stars](https://img.shields.io/github/stars/eclipse/paho.mqtt.python) ![license](https://img.shields.io/github/license/eclipse/paho.mqtt.python) -- [gmqtt](https://github.com/wialon/gmqtt) — Own protocol implementation. Asynchronous.
![GitHub stars](https://img.shields.io/github/stars/wialon/gmqtt) ![license](https://img.shields.io/github/license/wialon/gmqtt) -- [fastapi-mqtt](https://github.com/sabuhish/fastapi-mqtt) — Asynchronous wrapper around gmqtt. Simplifies integration in your FastAPI application.
![GitHub stars](https://img.shields.io/github/stars/sabuhish/fastapi-mqtt) ![license](https://img.shields.io/github/license/sabuhish/fastapi-mqtt) -- [amqtt](https://github.com/Yakifo/amqtt) — Own protocol implementation. Asynchronous. Includes a broker.
![GitHub stars](https://img.shields.io/github/stars/Yakifo/amqtt) ![license](https://img.shields.io/github/license/Yakifo/amqtt) -- [mqttools](https://github.com/eerimoq/mqttools) — Own protocol implementation. Asynchronous.
![GitHub stars](https://img.shields.io/github/stars/eerimoq/mqttools) ![license](https://img.shields.io/github/license/eerimoq/mqttools) -- [trio-paho-mqtt](https://github.com/bkanuka/trio-paho-mqtt) — Asynchronous wrapper around paho-mqtt (similar to aiomqtt). Based on trio instead of asyncio.
![GitHub stars](https://img.shields.io/github/stars/bkanuka/trio-paho-mqtt) ![license](https://img.shields.io/github/license/bkanuka/trio-paho-mqtt) +- [paho-mqtt](https://github.com/eclipse/paho.mqtt.python): Synchronous. +- [micropython-mqtt](https://github.com/peterhinch/micropython-mqtt): Asynchronous client for microcontrollers in MicroPython. +- [gmqtt](https://github.com/wialon/gmqtt): Asynchronous. +- [fastapi-mqtt](https://github.com/sabuhish/fastapi-mqtt): Asynchronous wrapper around gmqtt; Simplifies integration with FastAPI. +- [amqtt](https://github.com/Yakifo/amqtt): Asynchronous; Includes a broker. +- [trio-paho-mqtt](https://github.com/bkanuka/trio-paho-mqtt): Asynchronous wrapper around paho-mqtt; Based on trio instead of asyncio. diff --git a/docs/alongside-fastapi-and-co.md b/docs/alongside-fastapi-and-co.md index e52329c..c390c1a 100644 --- a/docs/alongside-fastapi-and-co.md +++ b/docs/alongside-fastapi-and-co.md @@ -51,3 +51,7 @@ async def publish(): ```{note} This is a combination of some concepts addressed in more detail in other sections: The connection is shared between the listener task and the routes, as explained in [](connecting-to-the-broker.md#sharing-the-connection). We don't immediately await the listener task in order to avoid blocking other code, as explained in [](subscribing-to-a-topic.md#listening-without-blocking). ``` + +```{tip} +With Starlette you can yield the initialized client to [the lifespan's state](https://www.starlette.io/lifespan/) instead of using global variables. +``` diff --git a/docs/introduction.md b/docs/introduction.md index 5f16b8f..79cb7a5 100644 --- a/docs/introduction.md +++ b/docs/introduction.md @@ -1,9 +1,9 @@ # Introduction -This documentation aims to cover everything you need to know to use aiomqtt in your projects. We expect some knowledge of MQTT and asyncio, but we'll try to explain things as clearly as possible. +This documentation aims to cover everything you need to know to use aiomqtt in your projects. We expect some knowledge of MQTT and asyncio, but we try to explain things as clearly as possible. If you get stuck, don't hesitate to start a new discussion on GitHub. We're happy to help! -If you're new to MQTT, we recommend reading the [HiveMQ MQTT essentials](https://www.hivemq.com/mqtt-essentials/) guide as an introduction. Afterward, the [OASIS specification](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html) is the best reference. For asyncio, we recommend the [RealPython asyncio walkthrough](https://realpython.com/async-io-python/) as an introduction and the [asyncio docs](https://docs.python.org/3/library/asyncio.html) as a reference. +If you're new to MQTT, we recommend reading the [HiveMQ MQTT essentials](https://www.hivemq.com/mqtt-essentials/) guide as an introduction. Afterward, the [OASIS specification](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html) is a great reference. For asyncio, we recommend the [RealPython asyncio walkthrough](https://realpython.com/async-io-python/) as an introduction and the [asyncio docs](https://docs.python.org/3/library/asyncio.html) as a reference. -Especially first-time readers have a very valuable view of the documentation. You can spot ambiguities and unclear explanations that simply stay hidden from us maintainers! If you find an error somewhere or if you feel there's anything that can improve these docs, you're a big help if you open an issue or a pull request on GitHub. +You're a big help if you open an issue or a pull request when you find an error or when you have an idea of how to improve these docs. It's easy to miss things that are unclear to newcomers when you're already familiar with the project. We need your fresh eyes here! -Let's dive in! 🤿 +That said, let's dive in! 🤿 diff --git a/docs/subscribing-to-a-topic.md b/docs/subscribing-to-a-topic.md index 8cd1dfb..bee45c1 100644 --- a/docs/subscribing-to-a-topic.md +++ b/docs/subscribing-to-a-topic.md @@ -21,12 +21,12 @@ asyncio.run(main()) Now you can use the [minimal publisher example](publishing-a-message.md) to publish a message to `temperature/outside` and see it appear in the console. ```{tip} -You can set the [Quality of Service](publishing-a-message.md#quality-of-service-qos) of the subscription by passing the `qos` parameter to `subscribe()`. +You can set the [Quality of Service](publishing-a-message.md#quality-of-service-qos) of the subscription by passing the `qos` parameter to `Client.subscribe()`. ``` ## Filtering messages -Imagine that we measure temperature and humidity on the outside and inside, and our topics have the structure `temperature/outside`. We want to receive all types of measurements but handle them differently. +Imagine that we measure temperature and humidity on the outside and inside. We want to receive all types of measurements but handle them differently. aiomqtt provides `Topic.matches()` to make this easy: @@ -62,13 +62,13 @@ For details on the `+` and `#` wildcards and what topics they match, see the [OA ## The message queue -Messages are buffered in a queue internally and handled one after another. +Messages are queued and returned sequentially from `Client.messages()`. -The default queue is `asyncio.Queue` which returns messages on a FIFO ("first in first out") basis. You can pass [other asyncio queues](https://docs.python.org/3/library/asyncio-queue.html) as `queue_class` to `messages()` to modify the order in which messages are returned, e.g. `asyncio.LifoQueue`. +The default queue is `asyncio.Queue` which returns messages on a FIFO ("first in first out") basis. You can pass [other types of asyncio queues](https://docs.python.org/3/library/asyncio-queue.html) as `queue_class` to `Client.messages()` to modify the order in which messages are returned, e.g. `asyncio.LifoQueue`. -If you want to queue based on priority, you can subclass `asyncio.PriorityQueue`. This queue returns messages in priority order (lowest priority first). In case of ties, messages with lower message identifiers are returned first. +You can subclass `asyncio.PriorityQueue` to queue based on priority. Messages are returned ascendingly by their priority values. In the case of ties, messages with lower message identifiers are returned first. -Let's say we measure temperature and humidity again, but we want to prioritize humidity messages: +Let's say we measure temperature and humidity again, but we want to prioritize humidity: ```python import asyncio @@ -99,11 +99,14 @@ asyncio.run(main()) ``` ```{tip} -By default, the size of the queue is unlimited. You can limit it by passing the `queue_maxsize` parameter to `messages()`. +By default, the size of the queue is unlimited. You can set a limit by passing the `queue_maxsize` parameter to `Client.messages()`. ``` -````{important} -If a message takes a long time to handle, it blocks the handling of other messages. You can handle messages in parallel by using an `asyncio.TaskGroup` like so: +## Processing concurrently + +Messages are queued and returned sequentially from `Client.messages()`. If a message takes a long time to handle, it blocks the handling of other messages. + +You can handle messages concurrently by using an `asyncio.TaskGroup` like so: ```python import asyncio @@ -121,18 +124,72 @@ async def main(): await client.subscribe("temperature/#") async with asyncio.TaskGroup() as tg: async for message in messages: - tg.create_task(handle(message)) + tg.create_task(handle(message)) # Spawn new coroutine asyncio.run(main()) ``` -Note that this only makes sense if your message handling is I/O-bound. If it's CPU-bound, you should spawn multiple processes instead. -```` +```{important} +Coroutines only make sense if your message handling is I/O-bound. If it's CPU-bound, you should spawn multiple processes instead. +``` + +The code snippet above handles each message in a new coroutine. Sometimes, we want to handle messages from different topics concurrently, but sequentially inside a single topic. + +The idea here is to implement a "distributor" that sorts incoming messages into multiple asyncio queues. Each queue is then processed by a different coroutine. Let's see how this works for our temperature and humidity messages: + +```python +import asyncio +import aiomqtt + + +async def temperature_consumer(): + while True: + message = await temperature_queue.get() + print(f"[temperature/#] {message.payload}") + + +async def humidity_consumer(): + while True: + message = await humidity_queue.get() + print(f"[humidity/#] {message.payload}") + + +temperature_queue = asyncio.Queue() +humidity_queue = asyncio.Queue() + + +async def distributor(client): + async with client.messages() as messages: + await client.subscribe("temperature/#") + await client.subscribe("humidity/#") + # Sort messages into the appropriate queues + async for message in messages: + if message.topic.matches("temperature/#"): + temperature_queue.put_nowait(message) + elif message.topic.matches("humidity/#"): + humidity_queue.put_nowait(message) + + +async def main(): + async with aiomqtt.Client("test.mosquitto.org") as client: + # Use a task group to manage and await all tasks + async with asyncio.TaskGroup() as tg: + tg.create_task(distributor(client)) + tg.create_task(temperature_consumer()) + tg.create_task(humidity_consumer()) + + +asyncio.run(main()) +``` + +```{tip} +You can combine this idea with the one [from the previous section](#the-message-queue) to e.g. handle temperature in FIFO and humidity in LIFO order. +``` ## Listening without blocking -When you run the minimal example for subscribing and listening for messages, you'll notice that the program doesn't finish. Waiting for messages through the `messages()` generator blocks the execution of everything that comes afterward. +When you run the minimal example for subscribing and listening for messages, you'll notice that the program doesn't finish. Waiting for messages through the `Client.messages()` generator blocks the execution of everything that comes afterward. In case you want to run other code after starting your listener, this is not very practical. @@ -144,6 +201,7 @@ import aiomqtt async def sleep(seconds): + # Some other task that needs to run concurrently await asyncio.sleep(seconds) print(f"Slept for {seconds} seconds!") @@ -172,7 +230,7 @@ In case task groups are not an option (e.g. because you run aiomqtt [alongside a ```{caution} You need to be a bit careful with this approach. Exceptions raised in asyncio tasks are propagated only when we `await` the task. In this case, we explicitly don't. -This means that you need to handle all possible exceptions _inside_ the fire-and-forget task. Any unhandled exceptions will be silently ignored until the program exits. +You need to handle all possible exceptions _inside_ the fire-and-forget task. Unhandled exceptions will be silently ignored until the program exits. ``` ```python @@ -213,7 +271,7 @@ You [need to keep a reference to the task](https://docs.python.org/3/library/asy ## Stop listening -You might want to have a listener task running alongside other code, and then stop it when you're done. You can use [`Task.cancel()`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel) for this: +You might want to have a listener task running alongside other code, and then stop it when you're done. You can use [`asyncio.Task.cancel()`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel) for this: ```python import asyncio diff --git a/scripts/README.md b/scripts/README.md index 8eef704..08cd38f 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -3,7 +3,7 @@ - `check`: Format and lint the code with `black` and `ruff`; Use `--dry` option for dry run - `develop`: Spin up a local mosquitto broker via Docker - `docs`: Build the documentation; Use `--reload` option to serve locally with live reload -- `setup`: Setup or update the dependencies after a `git clone` or `git pull` +- `setup`: Install or update the dependencies after a `git clone` or `git pull`; Also (re-)installs the local version of aiomqtt - `test`: Run the tests Styled after GitHub's ["Scripts to Rule Them All"](https://github.com/github/scripts-to-rule-them-all). diff --git a/scripts/setup b/scripts/setup index 01da257..3b95e9d 100755 --- a/scripts/setup +++ b/scripts/setup @@ -5,5 +5,5 @@ set -o errexit -o pipefail -o nounset # Change into the project's directory cd "$(dirname "$0")/.." -# Install the dependencies +# Install the dependencies and the local version of aiomqtt poetry install --with dev --sync