Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot send stream from LiveKit agent #1295

Open
sam-goldman opened this issue Dec 25, 2024 · 3 comments
Open

Cannot send stream from LiveKit agent #1295

sam-goldman opened this issue Dec 25, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@sam-goldman
Copy link

Sending an HTTP stream from the LiveKit agent to a FastAPI endpoint results in empty chunks being sent to the FastAPI endpoint. This issue occurs in the latest version of livekit-agents, which is 0.12.3.

This issue can be reproduced by setting up a basic LiveKit frontend, voice pipeline agent, and a FastAPI server. The LiveKit agent's entrypoint function initiates the stream to the FastAPI server.

Steps to reproduce

  1. Create a frontend:
lk app create --template voice-assistant-frontend
  1. Install dependencies:
cd <frontend_dir>
pnpm install
  1. Set these env vars for the frontend in .env.local:
LIVEKIT_URL=ws://127.0.0.1:7880
LIVEKIT_API_KEY=devkey
LIVEKIT_API_SECRET=secret
  1. Start the frontend:
pnpm dev
  1. In a new terminal and directory, create an agent:
lk app create --template voice-pipeline-agent-python
  1. Create an environment for the agent:
cd <agent_dir>
python3 -m venv venv
source venv/bin/activate
  1. Open the agent's requirements.txt and update the livekit-agents dependency to:
livekit-agents==0.12.3
  1. Install dependencies and FastAPI:
python3 -m pip install -r requirements.txt
python3 -m pip install fastapi uvicorn && pip freeze > requirements.txt
  1. Update your .env.local file to be the following, and fill in the OPENAI_API_KEY and DEEPGRAM_API_KEY:
LIVEKIT_URL=ws://127.0.0.1:7880
LIVEKIT_API_KEY=devkey
LIVEKIT_API_SECRET=secret
OPENAI_API_KEY=<YOUR_OPENAI_KEY>
DEEPGRAM_API_KEY=<YOUR_OPENAI_KEY>
  1. Create a file called server.py in the agents repository, and copy and paste this code into it:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/my-stream")
async def before_tts_stream(request: Request):
    print("[FASTAPI] my-stream called")
    async def read_chunks():
        async for chunk in request.stream():
            print("[FASTAPI] Got chunk:", chunk)
            yield chunk

    return StreamingResponse(read_chunks(), media_type="text/plain")
  1. Open agent.py and copy and paste this code into it:
from dotenv import load_dotenv
from livekit.agents import (
    AutoSubscribe,
    JobContext,
    JobProcess,
    WorkerOptions,
    cli,
    llm,
)
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import openai, deepgram, silero
import asyncio
from typing import AsyncIterable
import httpx
import logging
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli

load_dotenv(dotenv_path=".env.local")
logger = logging.getLogger("voice-agent")

async def sample_text_generator() -> AsyncIterable[str]:
    for i in range(3):
        yield f"chunk {i}\n"
        await asyncio.sleep(0.5)

async def entrypoint(ctx: JobContext):
    initial_ctx = llm.ChatContext().append(
        role="system",
        text=(
            "You are a voice assistant created by LiveKit. Your interface with users will be voice. "
            "You should use short and concise responses, and avoiding usage of unpronouncable punctuation. "
            "You were created as a demo to showcase the capabilities of LiveKit's agents framework."
        ),
    )

    url = "http://localhost:8000/my-stream"

    async def test_production_logic():
        async def input_stream() -> AsyncIterable[bytes]:
            async for chunk in sample_text_generator():
                logger.info(f"[WORKER] Sending chunk: {chunk.strip()}")
                yield chunk.encode("utf-8")

        async def output_stream() -> AsyncIterable[str]:
            async with httpx.AsyncClient() as client:
                async with client.stream(
                    "POST",
                    url,
                    content=input_stream(),
                    headers={"Content-Type": "text/plain"},
                ) as resp:
                    async for out_chunk in resp.aiter_text():
                        yield out_chunk

        logger.info("[WORKER] Starting streaming POST...")
        async for response_chunk in output_stream():
            logger.info(f"[WORKER] Got from server: {response_chunk}")

    await test_production_logic()
    await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

    logger.info(f"connecting to room {ctx.room.name}")

    participant = await ctx.wait_for_participant()
    logger.info(f"starting voice assistant for participant {participant.identity}")

    agent = VoicePipelineAgent(
        vad=silero.VAD.load(),
        stt=deepgram.STT(),
        llm=openai.LLM(model="gpt-4o-mini"),
        tts=openai.TTS(),
        chat_ctx=initial_ctx,
    )

    agent.start(ctx.room, participant)

    await agent.say("Hey, how can I help you today?", allow_interruptions=True)


if __name__ == "__main__":
    cli.run_app(
        WorkerOptions(
            entrypoint_fnc=entrypoint,
        ),
    )
  1. Run the FastAPI server:
uvicorn server:app --host 0.0.0.0 --port 8000
  1. In a different terminal, run the LiveKit server:
livekit-server --dev
  1. In a different terminal, run the LiveKit agent:
python3 agent.py dev
  1. In a browser, go to http://localhost:3000/ and then click "Start a Conversation"

Expected outcome

The FastAPI server should receive the streamed content, causing it to print the following logs:

[FASTAPI] /my-stream called
INFO:     127.0.0.1:63587 - "POST /my-stream HTTP/1.1" 200 OK
[FASTAPI] Got chunk: b'chunk 0\n'
[FASTAPI] Got chunk: b'chunk 1\n'
[FASTAPI] Got chunk: b'chunk 2\n'
[FASTAPI] Got chunk: b''

Actual outcome

The FastAPI server's logs display:

[FASTAPI] /my-stream called
INFO:     127.0.0.1:63155 - "POST /my-stream HTTP/1.1" 200 OK
[FASTAPI] Got chunk: b''

Notice how it only displays empty bytes instead of displaying the streamed content (e.g. "Got chunk: b'chunk 0\n'".)

@sam-goldman sam-goldman added the bug Something isn't working label Dec 25, 2024
@davidzhao
Copy link
Member

@sam-goldman it's not clear your example has anything to do with agents.. if I'm reading it correctly, it spins up a new task that attempts to post to your API endpoint..

@sam-goldman
Copy link
Author

Hey @davidzhao, let me clarify what the issue is.

If I post to my API endpoint from a standalone script (script.py below), the expected outcome occurs, i.e. the streamed content is received in the FastAPI server, and this is logged in its console:

[FASTAPI] Got chunk: b'chunk 0\n'
[FASTAPI] Got chunk: b'chunk 1\n'
[FASTAPI] Got chunk: b'chunk 2\n'
[FASTAPI] Got chunk: b''

Here’s the standalone script, script.py, which successfully streams this content:

import asyncio
from asyncio.log import logger
from typing import AsyncIterable

import httpx

async def sample_text_generator() -> AsyncIterable[str]:
    for i in range(3):
        yield f"chunk {i}\n"
        await asyncio.sleep(0.5)

async def main():
    url = "http://localhost:8000/my-stream"

    async def test_production_logic():
        async def input_stream() -> AsyncIterable[bytes]:
            async for chunk in sample_text_generator():
                logger.info(f"[WORKER] Sending chunk: {chunk.strip()}")
                yield chunk.encode("utf-8")

        async def output_stream() -> AsyncIterable[str]:
            async with httpx.AsyncClient() as client:
                async with client.stream(
                    "POST",
                    url,
                    content=input_stream(),
                    headers={"Content-Type": "text/plain"},
                ) as resp:
                    async for out_chunk in resp.aiter_text():
                        yield out_chunk

        logger.info("[WORKER] Starting streaming POST...")
        async for response_chunk in output_stream():
            logger.info(f"[WORKER] Got from server: {response_chunk}")

    await test_production_logic()

asyncio.run(main())

(You can run that script via python script.py).

However, if you copy and paste the exact logic from script.py into the Livekit agent’s entrypoint function, the streamed content is not received in the FastAPI server. Instead, empty bytes are sent, and the FastAPI logs display:


[FASTAPI] /my-stream called
INFO:     127.0.0.1:63155 - "POST /my-stream HTTP/1.1" 200 OK
[FASTAPI] Got chunk: b''

Notice how it only displays empty bytes instead of displaying the streamed content (e.g. "Got chunk: b'chunk 0\n'".)

This indicates to me that there’s an issue in the logic that runs the entrypoint in the Livekit agent, which is preventing the content from streaming to the server.

Does that make sense?

@sam-goldman
Copy link
Author

Hey @davidzhao, just wanted to follow up on this. Let me know if you need any additional info

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants