Skip to content

Commit

Permalink
Merge branch 'main' into feature/visualization
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Jan 21, 2025
2 parents d1be608 + f728c7d commit 3cecf89
Show file tree
Hide file tree
Showing 35 changed files with 621 additions and 534 deletions.
14 changes: 8 additions & 6 deletions src/broker/operandi_broker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
__all__ = [
"cli",
"ServiceBroker",
"JobStatusWorker",
"Worker"
"cli",
"JobWorkerDownload",
"JobWorkerStatus",
"JobWorkerSubmit",
"ServiceBroker",
]

from .cli import cli
from .broker import ServiceBroker
from .job_status_worker import JobStatusWorker
from .worker import Worker
from .job_worker_download import JobWorkerDownload
from .job_worker_status import JobWorkerStatus
from .job_worker_submit import JobWorkerSubmit
87 changes: 14 additions & 73 deletions src/broker/operandi_broker/broker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from logging import getLogger
from os import environ, fork
import psutil
import signal
from os import environ
from time import sleep

from operandi_utils import (
get_log_file_path_prefix, reconfigure_all_loggers, verify_database_uri, verify_and_parse_mq_uri)
from operandi_utils.constants import LOG_LEVEL_BROKER
from operandi_utils.rabbitmq.constants import (
RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS, RABBITMQ_QUEUE_JOB_STATUSES)
from .worker import Worker
from .job_status_worker import JobStatusWorker
RABBITMQ_QUEUE_HPC_DOWNLOADS, RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS, RABBITMQ_QUEUE_JOB_STATUSES)

from .broker_utils import create_child_process, kill_workers


class ServiceBroker:
Expand Down Expand Up @@ -48,14 +46,15 @@ def run_broker(self):
# A list of queues for which a worker process should be created
queues = [RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS]
status_queue = RABBITMQ_QUEUE_JOB_STATUSES
hpc_download_queue = RABBITMQ_QUEUE_HPC_DOWNLOADS
try:
for queue_name in queues:
self.log.info(f"Creating a worker process to consume from queue: {queue_name}")
self.create_worker_process(
queue_name=queue_name, status_checker=False, tunnel_port_executor=22, tunnel_port_transfer=22)
self.create_worker_process(queue_name, "submit_worker")
self.log.info(f"Creating a status worker process to consume from queue: {status_queue}")
self.create_worker_process(
queue_name=status_queue, status_checker=True, tunnel_port_executor=22, tunnel_port_transfer=22)
self.create_worker_process(status_queue, "status_worker")
self.log.info(f"Creating a download worker process to consume from queue: {hpc_download_queue}")
self.create_worker_process(hpc_download_queue, "download_worker")
except Exception as error:
self.log.error(f"Error while creating worker processes: {error}")

Expand All @@ -72,85 +71,27 @@ def run_broker(self):
except KeyboardInterrupt:
self.log.info(f"SIGINT signal received. Sending SIGINT to worker processes.")
# Sends SIGINT to workers
self.kill_workers()
kill_workers(self.log, self.queues_and_workers)
self.log.info(f"Closing gracefully in 3 seconds!")
exit(0)
except Exception as error:
# This is for logging any other errors
self.log.error(f"Unexpected error: {error}")

# Creates a separate worker process and append its pid if successful
def create_worker_process(
self, queue_name, tunnel_port_executor: int = 22, tunnel_port_transfer: int = 22, status_checker=False
) -> None:
def create_worker_process(self, queue_name, worker_type: str) -> None:
# If the entry for queue_name does not exist, create id
if queue_name not in self.queues_and_workers:
self.log.info(f"Initializing workers list for queue: {queue_name}")
# Initialize the worker pids list for the queue
self.queues_and_workers[queue_name] = []
child_pid = self.__create_child_process(
queue_name=queue_name, status_checker=status_checker, tunnel_port_executor=tunnel_port_executor,
tunnel_port_transfer=tunnel_port_transfer)
child_pid = create_child_process(
self.log, self.db_url, self.rabbitmq_url, queue_name, worker_type, self.test_sbatch)
# If creation of the child process was successful
if child_pid:
self.log.info(f"Assigning a new worker process with pid: {child_pid}, to queue: {queue_name}")
# append the pid to the workers list of the queue_name
(self.queues_and_workers[queue_name]).append(child_pid)

# Forks a child process
def __create_child_process(
self, queue_name, tunnel_port_executor: int = 22, tunnel_port_transfer: int = 22, status_checker=False
) -> int:
self.log.info(f"Trying to create a new worker process for queue: {queue_name}")
try:
# TODO: Try to utilize Popen() instead of fork()
created_pid = fork()
except Exception as os_error:
self.log.error(f"Failed to create a child process, reason: {os_error}")
return 0
if created_pid != 0:
return created_pid
try:
# Clean unnecessary data
# self.queues_and_workers = None
if status_checker:
child_worker = JobStatusWorker(
db_url=self.db_url, rabbitmq_url=self.rabbitmq_url, queue_name=queue_name,
tunnel_port_executor=tunnel_port_executor, tunnel_port_transfer=tunnel_port_transfer,
test_sbatch=self.test_sbatch)
else:
child_worker = Worker(
db_url=self.db_url, rabbitmq_url=self.rabbitmq_url, queue_name=queue_name,
tunnel_port_executor=tunnel_port_executor, tunnel_port_transfer=tunnel_port_transfer,
test_sbatch=self.test_sbatch)
child_worker.run()
exit(0)
except Exception as e:
self.log.error(f"Worker process failed for queue: {queue_name}, reason: {e}")
exit(-1)

def _send_signal_to_worker(self, worker_pid: int, signal_type: signal):
try:
process = psutil.Process(pid=worker_pid)
process.send_signal(signal_type)
except psutil.ZombieProcess as error:
self.log.info(f"Worker process has become a zombie: {worker_pid}, {error}")
except psutil.NoSuchProcess as error:
self.log.error(f"No such worker process with pid: {worker_pid}, {error}")
except psutil.AccessDenied as error:
self.log.error(f"Access denied to the worker process with pid: {worker_pid}, {error}")

def kill_workers(self):
interrupted_pids = []
self.log.info(f"Starting to send SIGINT to all workers")
# Send SIGINT to all workers
for queue_name in self.queues_and_workers:
self.log.info(f"Sending SIGINT to workers of queue: {queue_name}")
for worker_pid in self.queues_and_workers[queue_name]:
self._send_signal_to_worker(worker_pid=worker_pid, signal_type=signal.SIGINT)
interrupted_pids.append(worker_pid)
sleep(3)
self.log.info(f"Sending SIGKILL (if needed) to previously interrupted workers")
# Check whether workers exited properly
for pid in interrupted_pids:
self._send_signal_to_worker(worker_pid=pid, signal_type=signal.SIGKILL)
kill_workers(self.log, self.queues_and_workers)
67 changes: 67 additions & 0 deletions src/broker/operandi_broker/broker_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from logging import Logger
from os import fork
import psutil
import signal
from time import sleep
from typing import Dict

from .job_worker_download import JobWorkerDownload
from .job_worker_status import JobWorkerStatus
from .job_worker_submit import JobWorkerSubmit


# Forks a child process
def create_child_process(
logger: Logger, db_url: str, rabbitmq_url: str, queue_name: str, worker_type: str, test_batch: bool
) -> int:
logger.info(f"Trying to create a new worker process for queue: {queue_name}")
try:
created_pid = fork()
except Exception as os_error:
logger.error(f"Failed to create a child process, reason: {os_error}")
return 0

if created_pid != 0:
return created_pid
try:
if worker_type == "status_worker":
child_worker = JobWorkerStatus(db_url, rabbitmq_url, queue_name)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=True)
elif worker_type == "download_worker":
child_worker = JobWorkerDownload(db_url, rabbitmq_url, queue_name)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=False)
else: # worker_type == "submit_worker"
child_worker = JobWorkerSubmit(db_url, rabbitmq_url, queue_name, test_batch)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=False)
exit(0)
except Exception as e:
logger.error(f"Worker process failed for queue: {queue_name}, reason: {e}")
exit(-1)


def send_signal_to_worker(logger: Logger, worker_pid: int, signal_type: signal):
try:
process = psutil.Process(pid=worker_pid)
process.send_signal(signal_type)
except psutil.ZombieProcess as error:
logger.info(f"Worker process has become a zombie: {worker_pid}, {error}")
except psutil.NoSuchProcess as error:
logger.error(f"No such worker process with pid: {worker_pid}, {error}")
except psutil.AccessDenied as error:
logger.error(f"Access denied to the worker process with pid: {worker_pid}, {error}")


def kill_workers(logger: Logger, queues_and_workers: Dict):
interrupted_pids = []
logger.info(f"Starting to send SIGINT to all workers")
# Send SIGINT to all workers
for queue_name in queues_and_workers:
logger.info(f"Sending SIGINT to workers of queue: {queue_name}")
for worker_pid in queues_and_workers[queue_name]:
send_signal_to_worker(logger, worker_pid=worker_pid, signal_type=signal.SIGINT)
interrupted_pids.append(worker_pid)
sleep(3)
logger.info(f"Sending SIGKILL (if needed) to previously interrupted workers")
# Check whether workers exited properly
for pid in interrupted_pids:
send_signal_to_worker(logger, worker_pid=pid, signal_type=signal.SIGKILL)
Loading

0 comments on commit 3cecf89

Please sign in to comment.