Skip to content

Commit

Permalink
Removes async client
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Kolevska <[email protected]>
  • Loading branch information
elena-kolevska committed Oct 21, 2024
1 parent be4cc40 commit 46f2923
Show file tree
Hide file tree
Showing 8 changed files with 5 additions and 569 deletions.
71 changes: 1 addition & 70 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from warnings import warn

from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any, Awaitable
from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any
from typing_extensions import Self

from google.protobuf.message import Message as GrpcMessage
Expand All @@ -39,14 +39,12 @@
AioRpcError,
)

from dapr.aio.clients.grpc.subscription import Subscription
from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
from dapr.common.pubsub.subscription import StreamInactiveError
from dapr.conf.helpers import GrpcEndpoint
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
Expand Down Expand Up @@ -96,7 +94,6 @@
UnlockResponse,
GetWorkflowResponse,
StartWorkflowResponse,
TopicEventResponse,
)


Expand Down Expand Up @@ -485,72 +482,6 @@ async def publish_event(

return DaprResponse(await call.initial_metadata())

async def subscribe(
self,
pubsub_name: str,
topic: str,
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Subscription:
"""
Subscribe to a topic with a bidirectional stream
Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
metadata (Optional[dict]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
Returns:
Subscription: The Subscription object managing the stream.
"""
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
await subscription.start()
return subscription

async def subscribe_with_handler(
self,
pubsub_name: str,
topic: str,
handler_fn: Callable[..., TopicEventResponse],
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Callable[[], Awaitable[None]]:
"""
Subscribe to a topic with a bidirectional stream and a message handler function
Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received.
metadata (Optional[dict]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
Returns:
Callable[[], Awaitable[None]]: An async function to close the subscription.
"""
subscription = await self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)

async def stream_messages(sub: Subscription):
while True:
try:
message = await sub.next_message()
if message:
response = await handler_fn(message)
if response:
await subscription.respond(message, response.status)
else:
continue
except StreamInactiveError:
break

async def close_subscription():
await subscription.close()

asyncio.create_task(stream_messages(subscription))

return close_subscription

async def get_state(
self,
store_name: str,
Expand Down
110 changes: 0 additions & 110 deletions dapr/aio/clients/grpc/subscription.py

This file was deleted.

8 changes: 4 additions & 4 deletions daprdocs/content/en/python-sdk-docs/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,10 @@ You can create a streaming subscription to a PubSub topic using either the `subs
or `subscribe_handler` methods.

The `subscribe` method returns a `Subscription` object, which allows you to pull messages from the
stream by
calling the `next_message` method. This will block on the main thread while waiting for messages.
When done, you should call the close method to terminate the
subscription and stop receiving messages.
stream by calling the `next_message` method. This will block on the main thread while waiting for
messages.
When done, you should call the close method to terminate the subscription and stop receiving
messages.

The `subscribe_with_handler` method accepts a callback function that is executed for each message
received from the stream.
Expand Down
122 changes: 0 additions & 122 deletions examples/pubsub-streaming-async/README.md

This file was deleted.

45 changes: 0 additions & 45 deletions examples/pubsub-streaming-async/publisher.py

This file was deleted.

Loading

0 comments on commit 46f2923

Please sign in to comment.