From d25bac1426361fe940e836705652e8fd9514a2e9 Mon Sep 17 00:00:00 2001 From: Maksim Naumov Date: Tue, 28 Mar 2023 18:45:40 +0200 Subject: [PATCH] Use a separate thread to watch for dead processes --- taskiq/cli/worker/process_manager.py | 68 +++++++++++++++++----------- 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index 0bb319b..0961a74 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -2,8 +2,9 @@ import signal from dataclasses import dataclass from multiprocessing import Process, Queue +from threading import Thread from time import sleep -from typing import Any, Callable, List +from typing import Any, Callable, List, NoReturn from watchdog.observers import Observer @@ -172,7 +173,32 @@ def prepare_workers(self) -> None: ) self.workers.append(work_proc) - def start(self) -> None: # noqa: C901, WPS213 + def start_worker_watcher(self) -> None: + """ + Start worker watcher thread. + + The worker watcher thread periodically checks + that the worker processes are alive and schedules + a reload for ones that failed. + """ + + def watcher( + workers: List[Process], + action_queue: Queue[ProcessActionBase], + ) -> NoReturn: + logging.info("Started the worker watcher thread.") + while True: # noqa: WPS457 + sleep(1.0) + for worker_num, worker in enumerate(workers): + if not worker.is_alive(): + logger.info(f"{worker.name} is dead. Scheduling reload.") + action_queue.put(ReloadOneAction(worker_num=worker_num)) + + self._watcher = Thread(target=watcher, args=(self.workers, self.action_queue)) + self._watcher.daemon = True + self._watcher.start() + + def start(self) -> None: """ Start managing child processes. @@ -199,29 +225,17 @@ def start(self) -> None: # noqa: C901, WPS213 some reason, it schedules a restart for dead process. """ self.prepare_workers() + self.start_worker_watcher() while True: - sleep(1) - reloaded_workers = set() - # We bulk_process all pending events. - while not self.action_queue.empty(): - action = self.action_queue.get() - logging.debug(f"Got event: {action}") - if isinstance(action, ReloadAllAction): - action.handle( - workers_num=len(self.workers), - action_queue=self.action_queue, - ) - elif isinstance(action, ReloadOneAction): - # If we just reloaded this worker, skip handling. - if action.worker_num in reloaded_workers: - continue - action.handle(self.workers, self.args, self.worker_function) - reloaded_workers.add(action.worker_num) - elif isinstance(action, ShutdownAction): - logger.debug("Process manager closed.") - return - - for worker_num, worker in enumerate(self.workers): - if not worker.is_alive(): - logger.info(f"{worker.name} is dead. Scheduling reload.") - self.action_queue.put(ReloadOneAction(worker_num=worker_num)) + action = self.action_queue.get() + logging.debug(f"Got event: {action}") + if isinstance(action, ReloadAllAction): + action.handle( + workers_num=len(self.workers), + action_queue=self.action_queue, + ) + elif isinstance(action, ReloadOneAction): + action.handle(self.workers, self.args, self.worker_function) + elif isinstance(action, ShutdownAction): + logger.debug("Process manager closed.") + return