Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Kolevska <[email protected]>
  • Loading branch information
elena-kolevska committed Sep 23, 2024
1 parent 9d67b78 commit 2a4a99b
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
12 changes: 12 additions & 0 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,18 @@ def publish_event(
return DaprResponse(call.initial_metadata())

def subscribe(self, pubsub_name, topic, metadata=None, dead_letter_topic=None):
"""
Subscribe to a topic with a bidirectional stream
Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
metadata (Optional[Dict]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
Returns:
Subscription: The Subscription object managing the stream.
"""
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
subscription.start()
return subscription
Expand Down
2 changes: 0 additions & 2 deletions tests/clients/fake_dapr_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,6 @@ def SubscribeTopicEventsAlpha1(self, request_iterator, context):
spec_version='1.0',
)
yield api_v1.SubscribeTopicEventsResponseAlpha1(event_message=msg3)
# Simulate the stream being closed with an error
context.abort(grpc.StatusCode.CANCELLED, 'Stream closed by server')

def SaveState(self, request, context):
self.check_for_exception(context)
Expand Down
4 changes: 2 additions & 2 deletions tests/clients/test_dapr_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def test_subscribe_topic(self):
dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}')
subscription = dapr.subscribe(pubsub_name='pubsub', topic='example')

# First message
# First message - text
message1 = subscription.next_message()
subscription.respond_success(message1)

Expand All @@ -281,7 +281,7 @@ def test_subscribe_topic(self):
self.assertEqual('text/plain', message1.data_content_type())
self.assertEqual('hello2', message1.data())

# Second message
# Second message - json
message2 = subscription.next_message()
subscription.respond_success(message2)

Expand Down

0 comments on commit 2a4a99b

Please sign in to comment.