Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework data stream API #352

Merged
merged 12 commits into from
Feb 1, 2025
9 changes: 5 additions & 4 deletions livekit-rtc/livekit/rtc/participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,9 @@ async def set_attributes(self, attributes: dict[str, str]) -> None:
async def stream_text(
self,
*,
destination_identities: List[str] = [],
destination_identities: Optional[List[str]] = None,
topic: str = "",
extensions: Dict[str, str] = {},
extensions: Optional[Dict[str, str]] = None,
reply_to_id: str | None = None,
total_size: int | None = None,
) -> TextStreamWriter:
Expand All @@ -579,9 +579,9 @@ async def send_text(
self,
text: str,
*,
destination_identities: List[str] = [],
destination_identities: Optional[List[str]] = None,
topic: str = "",
extensions: Dict[str, str] = {},
extensions: Optional[Dict[str, str]] = None,
reply_to_id: str | None = None,
):
total_size = len(text.encode())
Expand Down Expand Up @@ -632,6 +632,7 @@ async def stream_bytes(
async def send_file(
self,
file_path: str,
*,
topic: str = "",
destination_identities: Optional[List[str]] = None,
attributes: Optional[Dict[str, str]] = None,
Expand Down
10 changes: 6 additions & 4 deletions livekit-rtc/livekit/rtc/room.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uff, thanks, that was a debug left over, removed

Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def set_byte_stream_handler(self, handler: ByteStreamHandler, topic: str = ""):
if existing_handler is None:
self._byte_stream_handlers[topic] = handler
else:
raise TypeError("byte stream handler for topic '%s' already set" % topic)
raise ValueError("byte stream handler for topic '%s' already set" % topic)

def remove_byte_stream_handler(self, topic: str = ""):
if self._byte_stream_handlers.get(topic):
Expand All @@ -442,7 +442,7 @@ def set_text_stream_handler(self, handler: TextStreamHandler, topic: str = ""):
if existing_handler is None:
self._text_stream_handlers[topic] = handler
else:
raise TypeError("text stream handler for topic '%s' already set" % topic)
raise ValueError("text stream handler for topic '%s' already set" % topic)

def remove_text_stream_handler(self, topic: str = ""):
if self._text_stream_handlers.get(topic):
Expand Down Expand Up @@ -747,9 +747,11 @@ def _on_room_event(self, event: proto_room.RoomEvent):
event.stream_header_received.participant_identity,
)
elif which == "stream_chunk_received":
asyncio.gather(self._handle_stream_chunk(event.stream_chunk_received.chunk))
asyncio.create_task(
lukasIO marked this conversation as resolved.
Show resolved Hide resolved
self._handle_stream_chunk(event.stream_chunk_received.chunk)
)
elif which == "stream_trailer_received":
asyncio.gather(
asyncio.create_task(
self._handle_stream_trailer(event.stream_trailer_received.trailer)
)

Expand Down
Loading