-
Notifications
You must be signed in to change notification settings - Fork 130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bidirectional streaming for pubsub #735
Conversation
f64334f
to
fbd12a7
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #735 +/- ##
==========================================
- Coverage 86.63% 86.04% -0.59%
==========================================
Files 84 87 +3
Lines 4473 4772 +299
==========================================
+ Hits 3875 4106 +231
- Misses 598 666 +68 ☔ View full report in Codecov by Sentry. |
|
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]> fixes typing Signed-off-by: Elena Kolevska <[email protected]> more readable example Signed-off-by: Elena Kolevska <[email protected]> linter Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
This reverts commit cb4b65b. Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
…ew message Signed-off-by: Elena Kolevska <[email protected]>
7ef5a2f
to
8c9ce85
Compare
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
I am really grateful that somebody is finally working on this! I like how we can choose between subscribe and subscribe_with_handler, even though it's a bit unusual. It's a good approach imo. But from what I understand this doesn't use/support async iterator, which I think would be pretty useful if it did. I am not an expert, but adding an async iteration support should not be such a problem. Here are some code changes that were suggested to me by an AI, but it looks pretty legit:
class Subscription:
def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None):
# ... existing init code ...
self._iterator_active = True
async def __aiter__(self):
"""Make the subscription async iterable."""
return self
async def __anext__(self):
"""Get the next message from the subscription stream."""
if not self._iterator_active:
raise StopAsyncIteration
try:
message = await self.next_message()
if message is None:
# If no message is received, continue iteration
return await self.__anext__()
return message
except StreamInactiveError:
self._iterator_active = False
raise StopAsyncIteration
except StreamCancelledError:
self._iterator_active = False
raise StopAsyncIteration
async def close(self):
"""Close the subscription and stop iteration."""
self._iterator_active = False
if self._stream:
try:
self._stream.cancel()
self._stream_active.clear()
except AioRpcError as e:
if e.code() != StatusCode.CANCELLED:
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')
# ... other methods stay unchanged ...
async def subscribe_with_handler(
self,
pubsub_name: str,
topic: str,
handler_fn: Callable[..., Awaitable[TopicEventResponse]],
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Callable[[], Awaitable[None]]:
"""
Subscribe to a topic with a bidirectional stream and a message handler function
Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
handler_fn (Callable[..., Awaitable[TopicEventResponse]]): The function to call when a message is received.
metadata (Optional[dict]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
Returns:
Callable[[], Awaitable[None]]: An async function to close the subscription.
"""
subscription = await self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)
async def stream_messages():
try:
async for message in subscription:
try:
response = await handler_fn(message)
if response:
await subscription.respond(message, response.status)
except Exception as e:
# Log error and continue processing messages
print(f"Error processing message: {e}")
continue
except Exception as e:
print(f"Stream error: {e}")
# Start processing messages in background task
task = asyncio.create_task(stream_messages())
async def close_subscription():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await subscription.close()
return close_subscription
# examples/pubsub-streaming-async/subscriber-iterator.py
import argparse
import asyncio
from dapr.aio.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse
parser = argparse.ArgumentParser()
parser.add_argument('--topic', required=True)
args = parser.parse_args()
async def main():
async with DaprClient() as client:
subscription = await client.subscribe(
pubsub_name='pubsub',
topic=args.topic,
dead_letter_topic=f'{args.topic}_DEAD'
)
try:
# Process messages using async for
async for message in subscription:
print(f'Processing message: {message.data()} from {message.topic()}...')
# Process message here
await subscription.respond_success(message)
except Exception as e:
print(f"Error: {e}")
finally:
await subscription.close()
if __name__ == '__main__':
asyncio.run(main())
# tests/clients/test_dapr_grpc_client_async.py
async def test_subscribe_topic_iterator(self):
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
subscription = await dapr.subscribe(pubsub_name='pubsub', topic='example')
messages = []
async for message in subscription:
messages.append(message)
await subscription.respond_success(message)
if len(messages) >= 2: # Get first two messages
break
self.assertEqual(2, len(messages))
self.assertEqual('111', messages[0].id())
self.assertEqual('222', messages[1].id())
await subscription.close() After that, hopefully the previous methods should still work the same way as before. So the user would have more options. |
Thanks for the review @Aldraz! I like the iterator approach, it's much more pythonic. Let's do it! Since you already did the work here, would you like to send a PR to my branch so that you can get credit for it, and show up in the list of contributors for this release? :) |
@elena-kolevska @Aldraz I will merge this PR so we can iteratively refactor things to use the async iterator instead. This PR has always lingered on for a long while :) Let's consider this PR to be Part 1 /2 of this feature. |
Thanks @berndverst! :) |
Description
Adds bidirectional streaming support for pub-sub, extending the API with the
subscribe
andsubscribe_handler methods
.The
subscribe
method returns aSubscription
object, which allows users to pull messages from thestream by calling the
next_message
method. This will block on the main thread while waiting for messages.When done, the
close
method should be called to terminate the subscription and stop receiving messages.The
subscribe_with_handler
method accepts a user callback function that is executed for each messagereceived from the stream.
It runs in a separate thread, so it doesn't block the main thread. The callback function should return a
TopicEventResponseStatus
, indicating whether the message was processed successfully, should beretried, or discarded. Users can return these statuses using the
Subscription.SUCCESS
,Subscription.RETRY
, andSubscription.DROP
class properties. The method will automatically managemessage acknowledgements based on the returned status.
The call to
subscribe_with_handler
method returns a close function, which should be called to terminate the subscription when done.Issue reference
#730
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list: