Skip to content

Commit

Permalink
fix: send task ui updates in own thread
Browse files Browse the repository at this point in the history
  • Loading branch information
MSchmoecker committed Jan 10, 2025
1 parent 774d18a commit ded55d7
Showing 1 changed file with 39 additions and 25 deletions.
64 changes: 39 additions & 25 deletions controller/thymis_controller/task/subscribe_ui.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import asyncio.queues
import logging
import threading

from fastapi import WebSocket
from fastapi.websockets import WebSocketState
from thymis_controller import crud, db_models

logger = logging.getLogger(__name__)
Expand All @@ -11,45 +14,56 @@ class TaskUISubscriptionManager:
# manages websocket subscriptions
def __init__(self):
self.subscribers: set[WebSocket] = set()
self.subscribers_lock = threading.Lock()
self.task_queue = asyncio.Queue()
self.send_thread: threading.Thread | None = None
self.loop = None

async def connect(self, websocket: WebSocket):
await websocket.accept()
self.subscribers.add(websocket)
with self.subscribers_lock:
self.subscribers.add(websocket)
try:
while True:
await websocket.receive_text()
except:
self.subscribers.remove(websocket)
except Exception:
with self.subscribers_lock:
self.subscribers.remove(websocket)
logger.info("Websocket disconnected")

async def start(self):
pass
self.send_thread = threading.Thread(
target=asyncio.run, args=(self.send_loop(),)
)
self.send_thread.start()

def stop(self):
pass
self.loop.call_soon_threadsafe(self.task_queue.shutdown)
self.send_thread.join()

async def send_loop(self):
self.loop = asyncio.get_event_loop()
while True:
try:
task = await self.task_queue.get()

with self.subscribers_lock:
for subscriber in self.subscribers:
if subscriber.application_state == WebSocketState.CONNECTED:
await subscriber.send_json(task)
except asyncio.QueueShutDown:
break

def notify_new_task(self, task: db_models.Task):
short_task = crud.task.TaskShort.from_orm_task(task)
for subscriber in self.subscribers:
# TODO remove event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
subscriber.send_json(
{"type": "new_task", "task": short_task.model_dump(mode="json")}
)
)
loop.close()
self.loop.call_soon_threadsafe(
self.task_queue.put_nowait,
{"type": "new_task", "task": short_task.model_dump(mode="json")},
)

def notify_task_update(self, task: db_models.Task):
short_task = crud.task.TaskShort.from_orm_task(task)
for subscriber in self.subscribers:
# TODO remove event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
subscriber.send_json(
{"type": "task_update", "task": short_task.model_dump(mode="json")}
)
)
loop.close()
self.loop.call_soon_threadsafe(
self.task_queue.put_nowait,
{"type": "task_update", "task": short_task.model_dump(mode="json")},
)

0 comments on commit ded55d7

Please sign in to comment.