Skip to content

Commit

Permalink
Fix blocking send message (#4)
Browse files Browse the repository at this point in the history
* Ensure thread-safely
Update README.md
* Error handling
* Update CEO's prompt
* Unblock the main event loop
  • Loading branch information
bonk1t authored Dec 11, 2023
1 parent 3ccf4d2 commit 0cda6f4
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/nalgonda/agency_config_lock_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import threading
from collections import defaultdict


class AgencyConfigLockManager:
"""Lock manager for agency config files"""

Expand Down
2 changes: 1 addition & 1 deletion src/nalgonda/data/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"id": null,
"role": "CEO",
"description": "Responsible for client communication, task planning and management.",
"instructions": "# Instructions for CEO Agent\n\n- Ensure that proposal is send to the user before proceeding with task execution.\n- Delegate tasks to appropriate agents, ensuring they align with their expertise and capabilities.\n- Clearly define the objectives and expected outcomes for each task.\n- Provide necessary context and background information for successful task completion.\n- Maintain ongoing communication with agents until complete task execution.\n- Review completed tasks to ensure they meet the set objectives.\n- Report the results back to the user.",
"instructions": "# Instructions for CEO Agent\n\n- Send the proposal to the user before beginning task execution.\n- Assign tasks to agents based on their expertise and capabilities.\n- Clearly outline the goals and expected outcomes for each task.\n- Provide essential context and background for successful task completion.\n- Keep in constant communication with agents throughout task execution.\n- Review completed tasks to ensure they meet the objectives.\n- Report the outcomes to the user.\n- Pass on any user feedback to the agents. Note: All conversations with agents are private. Information must be relayed directly by you, as cross-referencing or referencing 'above' is not possible in these separate, private conversations.",
"files_folder": null,
"tools": []
},
Expand Down
38 changes: 28 additions & 10 deletions src/nalgonda/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid

from agency_manager import AgencyManager
from agency_swarm import Agency
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from nalgonda.constants import DATA_DIR

Expand Down Expand Up @@ -69,24 +70,41 @@ async def websocket_endpoint(websocket: WebSocket, agency_id: str):

try:
while True:
user_message = await websocket.receive_text()
try:
user_message = await websocket.receive_text()

if not user_message.strip():
await ws_manager.send_message("message not provided", websocket)
await ws_manager.disconnect(websocket)
await websocket.close(code=1003)
return
if not user_message.strip():
await ws_manager.send_message("message not provided", websocket)
continue

gen = await asyncio.to_thread(agency.get_completion, message=user_message, yield_messages=True)
for response in gen:
response_text = response.get_formatted_content()
await ws_manager.send_message(response_text, websocket)
await process_message(user_message, agency, websocket)

except Exception as e:
logger.exception(e)
await ws_manager.send_message(f"Error: {e}\nPlease try again.", websocket)
continue

except WebSocketDisconnect:
await ws_manager.disconnect(websocket)
logger.info(f"WebSocket disconnected for agency_id: {agency_id}")


async def process_message(user_message: str, agency: Agency, websocket: WebSocket):
"""Process the user message and send the response to the websocket."""
gen = agency.get_completion(message=user_message, yield_messages=True)

async for response in async_gen(gen):
response_text = response.get_formatted_content()
await ws_manager.send_message(response_text, websocket)


async def async_gen(gen):
"""Asynchronous wrapper for a synchronous generator."""
for value in gen:
# Offload the blocking operation to a separate thread
yield await asyncio.to_thread(lambda v=value: v)


if __name__ == "__main__":
import uvicorn

Expand Down

0 comments on commit 0cda6f4

Please sign in to comment.