Skip to content

Commit

Permalink
Bidirectional streaming for pubsub (#735)
Browse files Browse the repository at this point in the history
* works

Signed-off-by: Elena Kolevska <[email protected]>

* works

Signed-off-by: Elena Kolevska <[email protected]>

* Sync bidi streaming and tests

Signed-off-by: Elena Kolevska <[email protected]>

* example fix

Signed-off-by: Elena Kolevska <[email protected]>

fixes typing

Signed-off-by: Elena Kolevska <[email protected]>

more readable example

Signed-off-by: Elena Kolevska <[email protected]>

linter

Signed-off-by: Elena Kolevska <[email protected]>

* examples fix

Signed-off-by: Elena Kolevska <[email protected]>

* Adds support for api token

Signed-off-by: Elena Kolevska <[email protected]>

* clean up

Signed-off-by: Elena Kolevska <[email protected]>

* Adds docs

Signed-off-by: Elena Kolevska <[email protected]>

* more small tweaks

Signed-off-by: Elena Kolevska <[email protected]>

* cleanups and tests

Signed-off-by: Elena Kolevska <[email protected]>

* Removes receive queue

Signed-off-by: Elena Kolevska <[email protected]>

* Adds `subscribe_with_handler`

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes linter

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes linter

Signed-off-by: Elena Kolevska <[email protected]>

* Adds async

Signed-off-by: Elena Kolevska <[email protected]>

* Adds tests for async streaming subscription

Signed-off-by: Elena Kolevska <[email protected]>

* Linter

Signed-off-by: Elena Kolevska <[email protected]>

* Split sync and async examples

Signed-off-by: Elena Kolevska <[email protected]>

* linter

Signed-off-by: Elena Kolevska <[email protected]>

* Adds interceptors to the async client for bidirectional streaming

Signed-off-by: Elena Kolevska <[email protected]>

* Removes unneeded class

Signed-off-by: Elena Kolevska <[email protected]>

* Removes async client

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes missing docker-compose in examples (#736)

Signed-off-by: Elena Kolevska <[email protected]>

* Removes async examples test

Signed-off-by: Elena Kolevska <[email protected]>

* Small cleanup

Signed-off-by: Elena Kolevska <[email protected]>

* Split up topic names between tests

Signed-off-by: Elena Kolevska <[email protected]>

* lint

Signed-off-by: Elena Kolevska <[email protected]>

* Revert "Removes async client"

This reverts commit cb4b65b.

Signed-off-by: Elena Kolevska <[email protected]>

* Split up topic names between tests

Signed-off-by: Elena Kolevska <[email protected]>

* updates fake server to wait for confirmation message before sending new message

Signed-off-by: Elena Kolevska <[email protected]>

* Updates protos

Signed-off-by: Elena Kolevska <[email protected]>

* Adds stream cancelled error

Signed-off-by: Elena Kolevska <[email protected]>

* linter

Signed-off-by: Elena Kolevska <[email protected]>

---------

Signed-off-by: Elena Kolevska <[email protected]>
  • Loading branch information
elena-kolevska authored Oct 25, 2024
1 parent 0cd0482 commit 6e90e84
Show file tree
Hide file tree
Showing 31 changed files with 2,927 additions and 1,797 deletions.
71 changes: 70 additions & 1 deletion dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from warnings import warn

from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any
from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any, Awaitable
from typing_extensions import Self

from google.protobuf.message import Message as GrpcMessage
Expand All @@ -39,12 +39,14 @@
AioRpcError,
)

from dapr.aio.clients.grpc.subscription import Subscription
from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
from dapr.common.pubsub.subscription import StreamInactiveError
from dapr.conf.helpers import GrpcEndpoint
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
Expand Down Expand Up @@ -94,6 +96,7 @@
UnlockResponse,
GetWorkflowResponse,
StartWorkflowResponse,
TopicEventResponse,
)


Expand Down Expand Up @@ -482,6 +485,72 @@ async def publish_event(

return DaprResponse(await call.initial_metadata())

async def subscribe(
self,
pubsub_name: str,
topic: str,
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Subscription:
"""
Subscribe to a topic with a bidirectional stream
Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
metadata (Optional[dict]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
Returns:
Subscription: The Subscription object managing the stream.
"""
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
await subscription.start()
return subscription

async def subscribe_with_handler(
self,
pubsub_name: str,
topic: str,
handler_fn: Callable[..., TopicEventResponse],
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Callable[[], Awaitable[None]]:
"""
Subscribe to a topic with a bidirectional stream and a message handler function
Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received.
metadata (Optional[dict]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
Returns:
Callable[[], Awaitable[None]]: An async function to close the subscription.
"""
subscription = await self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)

async def stream_messages(sub: Subscription):
while True:
try:
message = await sub.next_message()
if message:
response = await handler_fn(message)
if response:
await subscription.respond(message, response.status)
else:
continue
except StreamInactiveError:
break

async def close_subscription():
await subscription.close()

asyncio.create_task(stream_messages(subscription))

return close_subscription

async def get_state(
self,
store_name: str,
Expand Down
22 changes: 19 additions & 3 deletions dapr/aio/clients/grpc/interceptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from collections import namedtuple
from typing import List, Tuple

from grpc.aio import UnaryUnaryClientInterceptor, ClientCallDetails # type: ignore
from grpc.aio import UnaryUnaryClientInterceptor, StreamStreamClientInterceptor, ClientCallDetails # type: ignore

from dapr.conf import settings

Expand Down Expand Up @@ -51,7 +51,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request):
return continuation(client_call_details, request)


class DaprClientInterceptorAsync(UnaryUnaryClientInterceptor):
class DaprClientInterceptorAsync(UnaryUnaryClientInterceptor, StreamStreamClientInterceptor):
"""The class implements a UnaryUnaryClientInterceptor from grpc to add an interceptor to add
additional headers to all calls as needed.
Expand Down Expand Up @@ -115,8 +115,24 @@ async def intercept_unary_unary(self, continuation, client_call_details, request
Returns:
A response object after invoking the continuation callable
"""
new_call_details = await self._intercept_call(client_call_details)
# Call continuation
response = await continuation(new_call_details, request)
return response

async def intercept_stream_stream(self, continuation, client_call_details, request):
"""This method intercepts a stream-stream gRPC call. This is the implementation of the
abstract method defined in StreamStreamClientInterceptor defined in grpc. This is invoked
automatically by grpc based on the order in which interceptors are added to the channel.
Args:
continuation: a callable to be invoked to continue with the RPC or next interceptor
client_call_details: a ClientCallDetails object describing the outgoing RPC
request: the request value for the RPC
# Pre-process or intercept call
Returns:
A response object after invoking the continuation callable
"""
new_call_details = await self._intercept_call(client_call_details)
# Call continuation
response = await continuation(new_call_details, request)
Expand Down
116 changes: 116 additions & 0 deletions dapr/aio/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import asyncio
from grpc import StatusCode
from grpc.aio import AioRpcError

from dapr.clients.grpc._response import TopicEventResponse
from dapr.clients.health import DaprHealth
from dapr.common.pubsub.subscription import (
StreamInactiveError,
SubscriptionMessage,
StreamCancelledError,
)
from dapr.proto import api_v1, appcallback_v1


class Subscription:
def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None):
self._stub = stub
self._pubsub_name = pubsub_name
self._topic = topic
self._metadata = metadata or {}
self._dead_letter_topic = dead_letter_topic or ''
self._stream = None
self._send_queue = asyncio.Queue()
self._stream_active = asyncio.Event()

async def start(self):
async def outgoing_request_iterator():
try:
initial_request = api_v1.SubscribeTopicEventsRequestAlpha1(
initial_request=api_v1.SubscribeTopicEventsRequestInitialAlpha1(
pubsub_name=self._pubsub_name,
topic=self._topic,
metadata=self._metadata,
dead_letter_topic=self._dead_letter_topic,
)
)
yield initial_request

while self._stream_active.is_set():
try:
response = await asyncio.wait_for(self._send_queue.get(), timeout=1.0)
yield response
except asyncio.TimeoutError:
continue
except Exception as e:
raise Exception(f'Error while writing to stream: {e}')

self._stream = self._stub.SubscribeTopicEventsAlpha1(outgoing_request_iterator())
self._stream_active.set()
await self._stream.read() # discard the initial message

async def reconnect_stream(self):
await self.close()
DaprHealth.wait_until_ready()
print('Attempting to reconnect...')
await self.start()

async def next_message(self):
if not self._stream_active.is_set():
raise StreamInactiveError('Stream is not active')

try:
if self._stream is not None:
message = await self._stream.read()
if message is None:
return None
return SubscriptionMessage(message.event_message)
except AioRpcError as e:
if e.code() == StatusCode.UNAVAILABLE:
print(
f'gRPC error while reading from stream: {e.details()}, '
f'Status Code: {e.code()}. '
f'Attempting to reconnect...'
)
await self.reconnect_stream()
elif e.code() == StatusCode.CANCELLED:
raise StreamCancelledError('Stream has been cancelled')
else:
raise Exception(f'gRPC error while reading from subscription stream: {e} ')
except Exception as e:
raise Exception(f'Error while fetching message: {e}')

return None

async def respond(self, message, status):
try:
status = appcallback_v1.TopicEventResponse(status=status.value)
response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1(
id=message.id(), status=status
)
msg = api_v1.SubscribeTopicEventsRequestAlpha1(event_processed=response)
if not self._stream_active.is_set():
raise StreamInactiveError('Stream is not active')
await self._send_queue.put(msg)
except Exception as e:
print(f"Can't send message: {e}")

async def respond_success(self, message):
await self.respond(message, TopicEventResponse('success').status)

async def respond_retry(self, message):
await self.respond(message, TopicEventResponse('retry').status)

async def respond_drop(self, message):
await self.respond(message, TopicEventResponse('drop').status)

async def close(self):
if self._stream:
try:
self._stream.cancel()
self._stream_active.clear()
except AioRpcError as e:
if e.code() != StatusCode.CANCELLED:
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')
76 changes: 75 additions & 1 deletion dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""

import threading
import time
import socket
import json
Expand Down Expand Up @@ -41,6 +41,7 @@
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc.subscription import Subscription, StreamInactiveError
from dapr.clients.grpc.interceptors import DaprClientInterceptor, DaprClientTimeoutInterceptor
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
Expand Down Expand Up @@ -85,6 +86,7 @@
StartWorkflowResponse,
EncryptResponse,
DecryptResponse,
TopicEventResponse,
)


Expand Down Expand Up @@ -481,6 +483,78 @@ def publish_event(

return DaprResponse(call.initial_metadata())

def subscribe(
self,
pubsub_name: str,
topic: str,
metadata: Optional[MetadataTuple] = None,
dead_letter_topic: Optional[str] = None,
) -> Subscription:
"""
Subscribe to a topic with a bidirectional stream
Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
metadata (Optional[MetadataTuple]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
timeout (Optional[int]): The time in seconds to wait for a message before returning None
If not set, the `next_message` method will block indefinitely
until a message is received.
Returns:
Subscription: The Subscription object managing the stream.
"""
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
subscription.start()
return subscription

def subscribe_with_handler(
self,
pubsub_name: str,
topic: str,
handler_fn: Callable[..., TopicEventResponse],
metadata: Optional[MetadataTuple] = None,
dead_letter_topic: Optional[str] = None,
) -> Callable:
"""
Subscribe to a topic with a bidirectional stream and a message handler function
Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received.
metadata (Optional[MetadataTuple]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
timeout (Optional[int]): The time in seconds to wait for a message before returning None
If not set, the `next_message` method will block indefinitely
until a message is received.
"""
subscription = self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)

def stream_messages(sub):
while True:
try:
message = sub.next_message()
if message:
# Process the message
response = handler_fn(message)
if response:
subscription.respond(message, response.status)
else:
# No message received
continue
except StreamInactiveError:
break

def close_subscription():
subscription.close()

streaming_thread = threading.Thread(target=stream_messages, args=(subscription,))
streaming_thread.start()

return close_subscription

def get_state(
self,
store_name: str,
Expand Down
Loading

0 comments on commit 6e90e84

Please sign in to comment.