Skip to content

Commit

Permalink
add send_file method
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasIO committed Jan 16, 2025
1 parent 61eb7fd commit 318b8d4
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 6 deletions.
13 changes: 11 additions & 2 deletions livekit-rtc/livekit/rtc/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def __init__(
stream_id: str | None = None,
total_size: int | None = None,
mime_type: str = "",
destination_identities: List[str] = [],
):
self._local_participant = local_participant
if stream_id is None:
Expand All @@ -157,13 +158,15 @@ def __init__(
total_length=total_size,
)
self._next_chunk_index: int = 0
self._destination_identities = destination_identities

async def _send_header(self, destination_identities: List[str] = []):
async def _send_header(self):
req = proto_ffi.FfiRequest(
send_stream_header=proto_room.SendStreamHeaderRequest(
header=self._header,
local_participant_handle=self._local_participant._ffi_handle.handle,
destination_identities=destination_identities,
destination_identities=self._destination_identities,
sender_identity=self._local_participant.identity,
)
)

Expand All @@ -185,6 +188,8 @@ async def _send_chunk(self, chunk: proto_DataStream.Chunk):
send_stream_chunk=proto_room.SendStreamChunkRequest(
chunk=chunk,
local_participant_handle=self._local_participant._ffi_handle.handle,
sender_identity=self._local_participant.identity,
destination_identities=self._destination_identities,
)
)

Expand Down Expand Up @@ -235,6 +240,7 @@ def __init__(
stream_id: str | None = None,
total_size: int | None = None,
reply_to_id: str | None = None,
destination_identities: List[str] = [],
) -> None:
super().__init__(
local_participant,
Expand All @@ -243,6 +249,7 @@ def __init__(
stream_id,
total_size,
mime_type="text/plain",
destination_identities=destination_identities,
)
if reply_to_id:
self._header.text_header.reply_to_stream_id = reply_to_id
Expand Down Expand Up @@ -285,6 +292,7 @@ def __init__(
stream_id: str | None = None,
total_size: int | None = None,
mime_type: str = "",
destination_identities: List[str] = [],
) -> None:
super().__init__(
local_participant,
Expand All @@ -293,6 +301,7 @@ def __init__(
stream_id,
total_size,
mime_type=mime_type,
destination_identities=destination_identities,
)
self._header.file_header.file_name = file_name
self._info = FileStreamInfo(
Expand Down
46 changes: 42 additions & 4 deletions livekit-rtc/livekit/rtc/participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
from __future__ import annotations

import ctypes
import asyncio
import os
import mimetypes
import aiofiles
from typing import List, Union, Callable, Dict, Awaitable, Optional, Mapping, cast
from abc import abstractmethod, ABC


from ._ffi_client import FfiClient, FfiHandle
from ._proto import ffi_pb2 as proto_ffi
from ._proto import participant_pb2 as proto_participant
Expand All @@ -38,10 +43,9 @@
from .rpc import RpcError
from ._proto.rpc_pb2 import RpcMethodInvocationResponseRequest
from .log import logger
import asyncio

from .rpc import RpcInvocationData
from .data_stream import TextStreamWriter, FileStreamWriter
from .data_stream import TextStreamWriter, FileStreamWriter, STREAM_CHUNK_SIZE


class PublishTrackError(Exception):
Expand Down Expand Up @@ -552,19 +556,21 @@ async def stream_text(
topic=topic,
extensions=extensions,
reply_to_id=reply_to_id,
destination_identities=destination_identities,
)

await writer._send_header(destination_identities=destination_identities)
await writer._send_header()

return writer

async def stream_file(
self,
file_name: str,
file_size: int | None = None,
mime_type: str = "",
mime_type: str = "application/octet-stream",
extensions: Dict[str, str] = {},
stream_id: str | None = None,
destination_identities: List[str] = [],
):
writer = FileStreamWriter(
self,
Expand All @@ -573,10 +579,42 @@ async def stream_file(
total_size=file_size,
stream_id=stream_id,
mime_type=mime_type,
destination_identities=destination_identities,
)

await writer._send_header()

return writer

async def send_file(
self,
file_path: str,
destination_identities: List[str] = [],
extensions: Dict[str, str] = {},
stream_id: str | None = None,
):
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type is None:
mime_type = (
"application/octet-stream" # Fallback MIME type for unknown files
)

writer: FileStreamWriter = await self.stream_file(
file_name=file_name,
file_size=file_size,
mime_type=mime_type,
extensions=extensions,
stream_id=stream_id,
destination_identities=destination_identities,
)

async with aiofiles.open(file_path, "rb") as f:
while bytes := await f.read(STREAM_CHUNK_SIZE):
await writer.write(bytes)
await writer.close()

async def publish_track(
self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions()
) -> LocalTrackPublication:
Expand Down

0 comments on commit 318b8d4

Please sign in to comment.