Skip to content

Commit

Permalink
docs: Update docs on .messages and queues
Browse files Browse the repository at this point in the history
  • Loading branch information
empicano committed Jul 1, 2024
1 parent c17b773 commit 8552522
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 68 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# How to contribute

We're very happy about contributions to aiomqtt!
We're very happy about contributions to aiomqtt! 🎉

## Development setup

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@

<!-- documentation start -->

`aiomqtt` is a lightweight and idiomatic MQTT client:
Write code like this:

**Publish**

```python
async with aiomqtt.Client("test.mosquitto.org") as client:
async with Client("test.mosquitto.org") as client:
await client.publish("temperature/outside", payload=28.4)
```

**Subscribe**

```python
async with aiomqtt.Client("test.mosquitto.org") as client:
async with Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
async for message in client.messages:
print(message.payload)
Expand Down Expand Up @@ -73,7 +73,7 @@ Note that the underlying paho-mqtt library is dual-licensed. One of the licenses

## Contributing

We're very happy about contributions to aiomqtt! You can get started by reading [CONTRIBUTING.md](https://github.com/sbtinstruments/aiomqtt/blob/main/CONTRIBUTING.md).
We're happy about contributions to aiomqtt! 🎉 You can get started by reading [CONTRIBUTING.md](https://github.com/sbtinstruments/aiomqtt/blob/main/CONTRIBUTING.md).

## Versioning

Expand Down
5 changes: 3 additions & 2 deletions docs/reconnection.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

Network connections are inherently unstable and can fail at any time. Especially for long-running applications, this can be a challenge. To make our application resilient against connection failures, we can wrap the code in a `try`/`except`-block, listen for `MqttError`s, and reconnect like so:

<!-- The `Client` context is designed to be [reusable (but not reentrant)](https://docs.python.org/3/library/contextlib.html#reusable-context-managers). -->
The `Client` context is designed to be [reusable (but not reentrant)](https://docs.python.org/3/library/contextlib.html#reusable-context-managers).

```python
import asyncio
import aiomqtt


async def main():
client = aiomqtt.Client("test.mosquitto.org")
interval = 5 # Seconds
while True:
try:
async with aiomqtt.Client("test.mosquitto.org") as client:
async with client:
await client.subscribe("humidity/#")
async for message in client.messages:
print(message.payload)
Expand Down
69 changes: 8 additions & 61 deletions docs/subscribing-to-a-topic.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,26 @@ By default, the size of the queue is unlimited. You can set a limit through the

Messages are queued internally and returned sequentially from `Client.messages`. If a message takes a long time to handle, it blocks the handling of other messages.

You can handle messages concurrently by using an `asyncio.TaskGroup` like so:
You can handle messages concurrently by using multiple worker tasks like so:

```python
import asyncio
import aiomqtt


async def handle(message):
await asyncio.sleep(5) # Simulate some I/O-bound work
print(message.payload)
async def work(client):
async for message in client.messages:
await asyncio.sleep(5) # Simulate some I/O-bound work
print(message.payload)


async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
# Use a task group to manage and await all tasks
# Use a task group to manage and await all worker tasks
async with asyncio.TaskGroup() as tg:
async for message in client.messages:
tg.create_task(handle(message)) # Spawn new coroutine
for _ in range(2): # You can use more than two workers here
tg.create_task(work(client))


asyncio.run(main())
Expand All @@ -133,60 +134,6 @@ asyncio.run(main())
Coroutines only make sense if your message handling is I/O-bound. If it's CPU-bound, you should spawn multiple processes instead.
```

## Multiple queues

The code snippet above handles each message in a new coroutine. Sometimes, we want to handle messages from different topics concurrently, but sequentially inside a single topic.

The idea here is to implement a "distributor" that sorts incoming messages into multiple asyncio queues. Each queue is then processed by a different coroutine. Let's see how this works for our temperature and humidity messages:

```python
import asyncio
import aiomqtt


async def temperature_consumer():
while True:
message = await temperature_queue.get()
print(f"[temperature/#] {message.payload}")


async def humidity_consumer():
while True:
message = await humidity_queue.get()
print(f"[humidity/#] {message.payload}")


temperature_queue = asyncio.Queue()
humidity_queue = asyncio.Queue()


async def distributor(client):
# Sort messages into the appropriate queues
async for message in client.messages:
if message.topic.matches("temperature/#"):
temperature_queue.put_nowait(message)
elif message.topic.matches("humidity/#"):
humidity_queue.put_nowait(message)


async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
await client.subscribe("humidity/#")
# Use a task group to manage and await all tasks
async with asyncio.TaskGroup() as tg:
tg.create_task(distributor(client))
tg.create_task(temperature_consumer())
tg.create_task(humidity_consumer())


asyncio.run(main())
```

```{tip}
You can use [different queue types](#the-message-queue) for these queues to e.g. handle temperature in FIFO and humidity in LIFO order.
```

## Listening without blocking

When you run the minimal example for subscribing and listening for messages, you'll notice that the program doesn't finish. Waiting for messages through the `Client.messages()` generator blocks the execution of everything that comes afterward.
Expand Down

0 comments on commit 8552522

Please sign in to comment.