Skip to content

Commit

Permalink
docs: Add section about sorting into multiple queues
Browse files Browse the repository at this point in the history
  • Loading branch information
empicano committed Dec 26, 2023
1 parent 7c48e3d commit 0ac878e
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 37 deletions.
11 changes: 5 additions & 6 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
# Contributing
# How to contribute

We're very happy about contributions to aiomqtt! ✨

## Development setup

- Clone the aiomqtt repository
- Install the Python version noted in `.python-version` via `pyenv`
- [Install poetry](https://python-poetry.org/docs/#installation)
- Install the dependencies with `./scripts/setup`
- Install poetry; Then run `./scripts/setup` to install the dependencies and aiomqtt itself
- Run black, ruff, and mypy with `./scripts/check`
- Run the tests with `./scripts/test`
- Spin up a local mosquitto broker via Docker with `./scripts/develop`
- Spin up a local mosquitto broker via Docker with `./scripts/develop`; You can connect to this broker via `aiomqtt.Client("localhost", port=1883)`

## The documentation

Expand All @@ -20,8 +19,8 @@ The Markdown source files are located in the `docs` folder. The reference sectio

## Making a pull request

The branch to contribute to is `main`. You can create a draft pull request if your contribution is not yet ready to merge. Please check if your changes call for updates to the documentation and don't forget to add your name and contribution to the `CHANGELOG.md`!
Please check if your changes call for updates to the documentation and don't forget to add your name and contribution to the `CHANGELOG.md`! You can create a draft pull request if your contribution is not yet ready to merge.

### Visual Studio Code

If you're using VSCode, you can find workspace settings and recommended extensions in the `.vscode` folder.
You can find workspace settings and recommended extensions in the `.vscode` folder.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async with Client("test.mosquitto.org") as client:
print(message.payload)
```

aiomqtt combines the stability of the time-proven [paho-mqtt](https://github.com/eclipse/paho.mqtt.python) library with an intuitive, idiomatic asyncio interface:
aiomqtt combines the stability of the time-proven [paho-mqtt](https://github.com/eclipse/paho.mqtt.python) library with an idiomatic asyncio interface:

- No more callbacks! 👍
- No more return codes (welcome to the `MqttError`)
Expand All @@ -55,9 +55,9 @@ aiomqtt combines the stability of the time-proven [paho-mqtt](https://github.com

## Installation

aiomqtt can be installed via `pip install aiomqtt`. It requires Python `3.8+` to run. The only dependency is [paho-mqtt](https://github.com/eclipse/paho.mqtt.python).
aiomqtt can be installed via `pip install aiomqtt`. The only dependency is [paho-mqtt](https://github.com/eclipse/paho.mqtt.python).

If you can't wait for the latest version and want to install directly from GitHub, use:
If you can't wait for the latest version, you can install aiomqtt directly from GitHub with:

`pip install git+https://github.com/sbtinstruments/aiomqtt`

Expand Down Expand Up @@ -85,7 +85,7 @@ Note that the underlying paho-mqtt library is dual-licensed. One of the licenses

## Contributing

We're very happy about contributions to the project! You can get started by reading [CONTRIBUTING.md](https://github.com/sbtinstruments/aiomqtt/blob/main/CONTRIBUTING.md).
We're very happy about contributions to aiomqtt! ✨ You can get started by reading [CONTRIBUTING.md](https://github.com/sbtinstruments/aiomqtt/blob/main/CONTRIBUTING.md).

## Versioning

Expand All @@ -99,9 +99,9 @@ The changelog lives in [CHANGELOG.md](https://github.com/sbtinstruments/aiomqtt/

Is aiomqtt not what you're looking for? There are a few other clients you can try:

- [paho-mqtt](https://github.com/eclipse/paho.mqtt.python) — Own protocol implementation. Synchronous.<br>![GitHub stars](https://img.shields.io/github/stars/eclipse/paho.mqtt.python) ![license](https://img.shields.io/github/license/eclipse/paho.mqtt.python)
- [gmqtt](https://github.com/wialon/gmqtt) — Own protocol implementation. Asynchronous.<br>![GitHub stars](https://img.shields.io/github/stars/wialon/gmqtt) ![license](https://img.shields.io/github/license/wialon/gmqtt)
- [fastapi-mqtt](https://github.com/sabuhish/fastapi-mqtt)Asynchronous wrapper around gmqtt. Simplifies integration in your FastAPI application.<br>![GitHub stars](https://img.shields.io/github/stars/sabuhish/fastapi-mqtt) ![license](https://img.shields.io/github/license/sabuhish/fastapi-mqtt)
- [amqtt](https://github.com/Yakifo/amqtt) — Own protocol implementation. Asynchronous. Includes a broker.<br>![GitHub stars](https://img.shields.io/github/stars/Yakifo/amqtt) ![license](https://img.shields.io/github/license/Yakifo/amqtt)
- [mqttools](https://github.com/eerimoq/mqttools) — Own protocol implementation. Asynchronous.<br>![GitHub stars](https://img.shields.io/github/stars/eerimoq/mqttools) ![license](https://img.shields.io/github/license/eerimoq/mqttools)
- [trio-paho-mqtt](https://github.com/bkanuka/trio-paho-mqtt)Asynchronous wrapper around paho-mqtt (similar to aiomqtt). Based on trio instead of asyncio.<br>![GitHub stars](https://img.shields.io/github/stars/bkanuka/trio-paho-mqtt) ![license](https://img.shields.io/github/license/bkanuka/trio-paho-mqtt)
- [paho-mqtt](https://github.com/eclipse/paho.mqtt.python): Synchronous.
- [micropython-mqtt](https://github.com/peterhinch/micropython-mqtt): Asynchronous client for microcontrollers in MicroPython.
- [gmqtt](https://github.com/wialon/gmqtt): Asynchronous.
- [fastapi-mqtt](https://github.com/sabuhish/fastapi-mqtt): Asynchronous wrapper around gmqtt; Simplifies integration with FastAPI.
- [amqtt](https://github.com/Yakifo/amqtt): Asynchronous; Includes a broker.
- [trio-paho-mqtt](https://github.com/bkanuka/trio-paho-mqtt): Asynchronous wrapper around paho-mqtt; Based on trio instead of asyncio.
4 changes: 4 additions & 0 deletions docs/alongside-fastapi-and-co.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ async def publish():
```{note}
This is a combination of some concepts addressed in more detail in other sections: The connection is shared between the listener task and the routes, as explained in [](connecting-to-the-broker.md#sharing-the-connection). We don't immediately await the listener task in order to avoid blocking other code, as explained in [](subscribing-to-a-topic.md#listening-without-blocking).
```

```{tip}
With Starlette you can yield the initialized client to [the lifespan's state](https://www.starlette.io/lifespan/) instead of using global variables.
```
8 changes: 4 additions & 4 deletions docs/introduction.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Introduction

This documentation aims to cover everything you need to know to use aiomqtt in your projects. We expect some knowledge of MQTT and asyncio, but we'll try to explain things as clearly as possible.
This documentation aims to cover everything you need to know to use aiomqtt in your projects. We expect some knowledge of MQTT and asyncio, but we try to explain things as clearly as possible. If you get stuck, don't hesitate to start a new discussion on GitHub. We're happy to help!

If you're new to MQTT, we recommend reading the [HiveMQ MQTT essentials](https://www.hivemq.com/mqtt-essentials/) guide as an introduction. Afterward, the [OASIS specification](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html) is the best reference. For asyncio, we recommend the [RealPython asyncio walkthrough](https://realpython.com/async-io-python/) as an introduction and the [asyncio docs](https://docs.python.org/3/library/asyncio.html) as a reference.
If you're new to MQTT, we recommend reading the [HiveMQ MQTT essentials](https://www.hivemq.com/mqtt-essentials/) guide as an introduction. Afterward, the [OASIS specification](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html) is a great reference. For asyncio, we recommend the [RealPython asyncio walkthrough](https://realpython.com/async-io-python/) as an introduction and the [asyncio docs](https://docs.python.org/3/library/asyncio.html) as a reference.

Especially first-time readers have a very valuable view of the documentation. You can spot ambiguities and unclear explanations that simply stay hidden from us maintainers! If you find an error somewhere or if you feel there's anything that can improve these docs, you're a big help if you open an issue or a pull request on GitHub.
You're a big help if you open an issue or a pull request when you find an error or when you have an idea of how to improve these docs. It's easy to miss things that are unclear to newcomers when you're already familiar with the project. We need your fresh eyes here!

Let's dive in! 🤿
That said, let's dive in! 🤿
88 changes: 73 additions & 15 deletions docs/subscribing-to-a-topic.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ asyncio.run(main())
Now you can use the [minimal publisher example](publishing-a-message.md) to publish a message to `temperature/outside` and see it appear in the console.

```{tip}
You can set the [Quality of Service](publishing-a-message.md#quality-of-service-qos) of the subscription by passing the `qos` parameter to `subscribe()`.
You can set the [Quality of Service](publishing-a-message.md#quality-of-service-qos) of the subscription by passing the `qos` parameter to `Client.subscribe()`.
```

## Filtering messages

Imagine that we measure temperature and humidity on the outside and inside, and our topics have the structure `temperature/outside`. We want to receive all types of measurements but handle them differently.
Imagine that we measure temperature and humidity on the outside and inside. We want to receive all types of measurements but handle them differently.

aiomqtt provides `Topic.matches()` to make this easy:

Expand Down Expand Up @@ -62,13 +62,13 @@ For details on the `+` and `#` wildcards and what topics they match, see the [OA

## The message queue

Messages are buffered in a queue internally and handled one after another.
Messages are queued and returned sequentially from `Client.messages()`.

The default queue is `asyncio.Queue` which returns messages on a FIFO ("first in first out") basis. You can pass [other asyncio queues](https://docs.python.org/3/library/asyncio-queue.html) as `queue_class` to `messages()` to modify the order in which messages are returned, e.g. `asyncio.LifoQueue`.
The default queue is `asyncio.Queue` which returns messages on a FIFO ("first in first out") basis. You can pass [other types of asyncio queues](https://docs.python.org/3/library/asyncio-queue.html) as `queue_class` to `Client.messages()` to modify the order in which messages are returned, e.g. `asyncio.LifoQueue`.

If you want to queue based on priority, you can subclass `asyncio.PriorityQueue`. This queue returns messages in priority order (lowest priority first). In case of ties, messages with lower message identifiers are returned first.
You can subclass `asyncio.PriorityQueue` to queue based on priority. Messages are returned ascendingly by their priority values. In the case of ties, messages with lower message identifiers are returned first.

Let's say we measure temperature and humidity again, but we want to prioritize humidity messages:
Let's say we measure temperature and humidity again, but we want to prioritize humidity:

```python
import asyncio
Expand Down Expand Up @@ -99,11 +99,14 @@ asyncio.run(main())
```

```{tip}
By default, the size of the queue is unlimited. You can limit it by passing the `queue_maxsize` parameter to `messages()`.
By default, the size of the queue is unlimited. You can set a limit by passing the `queue_maxsize` parameter to `Client.messages()`.
```

````{important}
If a message takes a long time to handle, it blocks the handling of other messages. You can handle messages in parallel by using an `asyncio.TaskGroup` like so:
## Processing concurrently

Messages are queued and returned sequentially from `Client.messages()`. If a message takes a long time to handle, it blocks the handling of other messages.

You can handle messages concurrently by using an `asyncio.TaskGroup` like so:

```python
import asyncio
Expand All @@ -121,18 +124,72 @@ async def main():
await client.subscribe("temperature/#")
async with asyncio.TaskGroup() as tg:
async for message in messages:
tg.create_task(handle(message))
tg.create_task(handle(message)) # Spawn new coroutine


asyncio.run(main())
```

Note that this only makes sense if your message handling is I/O-bound. If it's CPU-bound, you should spawn multiple processes instead.
````
```{important}
Coroutines only make sense if your message handling is I/O-bound. If it's CPU-bound, you should spawn multiple processes instead.
```

The code snippet above handles each message in a new coroutine. Sometimes, we want to handle messages from different topics concurrently, but sequentially inside a single topic.

The idea here is to implement a "distributor" that sorts incoming messages into multiple asyncio queues. Each queue is then processed by a different coroutine. Let's see how this works for our temperature and humidity messages:

```python
import asyncio
import aiomqtt


async def temperature_consumer():
while True:
message = await temperature_queue.get()
print(f"[temperature/#] {message.payload}")


async def humidity_consumer():
while True:
message = await humidity_queue.get()
print(f"[humidity/#] {message.payload}")


temperature_queue = asyncio.Queue()
humidity_queue = asyncio.Queue()


async def distributor(client):
async with client.messages() as messages:
await client.subscribe("temperature/#")
await client.subscribe("humidity/#")
# Sort messages into the appropriate queues
async for message in messages:
if message.topic.matches("temperature/#"):
temperature_queue.put_nowait(message)
elif message.topic.matches("humidity/#"):
humidity_queue.put_nowait(message)


async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
# Use a task group to manage and await all tasks
async with asyncio.TaskGroup() as tg:
tg.create_task(distributor(client))
tg.create_task(temperature_consumer())
tg.create_task(humidity_consumer())


asyncio.run(main())
```

```{tip}
You can combine this idea with the one [from the previous section](#the-message-queue) to e.g. handle temperature in FIFO and humidity in LIFO order.
```

## Listening without blocking

When you run the minimal example for subscribing and listening for messages, you'll notice that the program doesn't finish. Waiting for messages through the `messages()` generator blocks the execution of everything that comes afterward.
When you run the minimal example for subscribing and listening for messages, you'll notice that the program doesn't finish. Waiting for messages through the `Client.messages()` generator blocks the execution of everything that comes afterward.

In case you want to run other code after starting your listener, this is not very practical.

Expand All @@ -144,6 +201,7 @@ import aiomqtt


async def sleep(seconds):
# Some other task that needs to run concurrently
await asyncio.sleep(seconds)
print(f"Slept for {seconds} seconds!")

Expand Down Expand Up @@ -172,7 +230,7 @@ In case task groups are not an option (e.g. because you run aiomqtt [alongside a
```{caution}
You need to be a bit careful with this approach. Exceptions raised in asyncio tasks are propagated only when we `await` the task. In this case, we explicitly don't.
This means that you need to handle all possible exceptions _inside_ the fire-and-forget task. Any unhandled exceptions will be silently ignored until the program exits.
You need to handle all possible exceptions _inside_ the fire-and-forget task. Unhandled exceptions will be silently ignored until the program exits.
```

```python
Expand Down Expand Up @@ -213,7 +271,7 @@ You [need to keep a reference to the task](https://docs.python.org/3/library/asy

## Stop listening

You might want to have a listener task running alongside other code, and then stop it when you're done. You can use [`Task.cancel()`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel) for this:
You might want to have a listener task running alongside other code, and then stop it when you're done. You can use [`asyncio.Task.cancel()`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel) for this:

```python
import asyncio
Expand Down
2 changes: 1 addition & 1 deletion scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
- `check`: Format and lint the code with `black` and `ruff`; Use `--dry` option for dry run
- `develop`: Spin up a local mosquitto broker via Docker
- `docs`: Build the documentation; Use `--reload` option to serve locally with live reload
- `setup`: Setup or update the dependencies after a `git clone` or `git pull`
- `setup`: Install or update the dependencies after a `git clone` or `git pull`; Also (re-)installs the local version of aiomqtt
- `test`: Run the tests

Styled after GitHub's ["Scripts to Rule Them All"](https://github.com/github/scripts-to-rule-them-all).
2 changes: 1 addition & 1 deletion scripts/setup
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ set -o errexit -o pipefail -o nounset
# Change into the project's directory
cd "$(dirname "$0")/.."

# Install the dependencies
# Install the dependencies and the local version of aiomqtt
poetry install --with dev --sync

0 comments on commit 0ac878e

Please sign in to comment.