Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix blocking send message #4

Merged
merged 5 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/nalgonda/agency_config_lock_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import threading
from collections import defaultdict


class AgencyConfigLockManager:
"""Lock manager for agency config files"""

Expand Down
2 changes: 1 addition & 1 deletion src/nalgonda/data/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"id": null,
"role": "CEO",
"description": "Responsible for client communication, task planning and management.",
"instructions": "# Instructions for CEO Agent\n\n- Ensure that proposal is send to the user before proceeding with task execution.\n- Delegate tasks to appropriate agents, ensuring they align with their expertise and capabilities.\n- Clearly define the objectives and expected outcomes for each task.\n- Provide necessary context and background information for successful task completion.\n- Maintain ongoing communication with agents until complete task execution.\n- Review completed tasks to ensure they meet the set objectives.\n- Report the results back to the user.",
"instructions": "# Instructions for CEO Agent\n\n- Send the proposal to the user before beginning task execution.\n- Assign tasks to agents based on their expertise and capabilities.\n- Clearly outline the goals and expected outcomes for each task.\n- Provide essential context and background for successful task completion.\n- Keep in constant communication with agents throughout task execution.\n- Review completed tasks to ensure they meet the objectives.\n- Report the outcomes to the user.\n- Pass on any user feedback to the agents. Note: All conversations with agents are private. Information must be relayed directly by you, as cross-referencing or referencing 'above' is not possible in these separate, private conversations.",
"files_folder": null,
"tools": []
},
Expand Down
38 changes: 28 additions & 10 deletions src/nalgonda/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid

from agency_manager import AgencyManager
from agency_swarm import Agency
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from nalgonda.constants import DATA_DIR

Expand Down Expand Up @@ -69,24 +70,41 @@ async def websocket_endpoint(websocket: WebSocket, agency_id: str):

try:
while True:
user_message = await websocket.receive_text()
try:
user_message = await websocket.receive_text()

if not user_message.strip():
await ws_manager.send_message("message not provided", websocket)
await ws_manager.disconnect(websocket)
await websocket.close(code=1003)
return
if not user_message.strip():
await ws_manager.send_message("message not provided", websocket)
continue

gen = await asyncio.to_thread(agency.get_completion, message=user_message, yield_messages=True)
for response in gen:
response_text = response.get_formatted_content()
await ws_manager.send_message(response_text, websocket)
await process_message(user_message, agency, websocket)

except Exception as e:
logger.exception(e)
await ws_manager.send_message(f"Error: {e}\nPlease try again.", websocket)
continue

except WebSocketDisconnect:
await ws_manager.disconnect(websocket)
logger.info(f"WebSocket disconnected for agency_id: {agency_id}")


async def process_message(user_message: str, agency: Agency, websocket: WebSocket):
"""Process the user message and send the response to the websocket."""
gen = agency.get_completion(message=user_message, yield_messages=True)

async for response in async_gen(gen):
response_text = response.get_formatted_content()
await ws_manager.send_message(response_text, websocket)


async def async_gen(gen):
"""Asynchronous wrapper for a synchronous generator."""
for value in gen:
# Offload the blocking operation to a separate thread
yield await asyncio.to_thread(lambda v=value: v)


if __name__ == "__main__":
import uvicorn

Expand Down