Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added batch polling and refactored worker parallelization #130

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ jobs:
run: python3 -m pip install .
env:
CONDUCTOR_PYTHON_VERSION: v0.0.0+test.unit
- name: Instal pytest
run: python3 -m pip install pytest==7.1.2
- name: Run Unit Tests
run: python3 -m unittest discover --verbose --start-directory=./tests/unit
run: python3 -m pytest tests/unit/
integration-tests:
runs-on: ubuntu-latest
steps:
Expand Down
97 changes: 46 additions & 51 deletions src/conductor/client/automator/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from conductor.client.worker.worker_interface import WorkerInterface
from multiprocessing import Process
from typing import List
from typing_extensions import Self
import logging
import threading

logger = logging.getLogger(
Configuration.get_logging_formatted_name(
Expand All @@ -16,19 +18,22 @@

class TaskHandler:
def __init__(
self,
workers: List[WorkerInterface],
configuration: Configuration = None,
metrics_settings: MetricsSettings = None
):
self,
workers: List[WorkerInterface],
configuration: Configuration = None,
metrics_settings: MetricsSettings = None
) -> Self:
if not isinstance(workers, list):
workers = [workers]
self.__create_task_runner_processes(
workers, configuration, metrics_settings
)
self.__create_metrics_provider_process(
metrics_settings
)
self.configuration = configuration
self.metrics_settings = metrics_settings

self._task_runner = {}
self._task_runner_thread = {}
self._task_runner_mutex = threading.Lock()

self.start_worker(*workers)
self.__create_metrics_provider_process()
logger.info('Created all processes')

def __enter__(self):
Expand All @@ -38,83 +43,73 @@ def __exit__(self, exc_type, exc_value, traceback):
self.stop_processes()

def stop_processes(self) -> None:
self.__stop_task_runner_processes()
self.__stop_metrics_provider_process()

def start_processes(self) -> None:
self.__start_task_runner_processes()
self.__start_metrics_provider_process()
logger.info('Started all processes')

def join_processes(self) -> None:
self.__join_task_runner_processes()
self.__join_workers()
self.__join_metrics_provider_process()
logger.info('Joined all processes')

def __create_metrics_provider_process(self, metrics_settings: MetricsSettings) -> None:
if metrics_settings == None:
def __create_metrics_provider_process(self) -> None:
if self.metrics_settings == None:
self.metrics_provider_process = None
return
self.metrics_provider_process = Process(
target=MetricsCollector.provide_metrics,
args=(metrics_settings,)
args=(self.metrics_settings,)
)
logger.info('Created MetricsProvider process')

def __create_task_runner_processes(
self,
workers: List[WorkerInterface],
configuration: Configuration,
metrics_settings: MetricsSettings
) -> None:
self.task_runner_processes = []
def start_worker(self, *workers: WorkerInterface) -> None:
for worker in workers:
self.__create_task_runner_process(
worker, configuration, metrics_settings
)
self.__start_worker(worker)
logger.info('Created TaskRunner processes')

def __create_task_runner_process(
self,
worker: WorkerInterface,
configuration: Configuration,
metrics_settings: MetricsSettings
) -> None:
task_runner = TaskRunner(worker, configuration, metrics_settings)
process = Process(
target=task_runner.run
)
self.task_runner_processes.append(process)
def __start_worker(self, worker: WorkerInterface):
task_name = worker.get_task_definition_name()
with self._task_runner_mutex:
if task_name in self._task_runner:
raise Exception(f'worker already started for {task_name}')
task_runner = TaskRunner(
configuration=self.configuration,
task_definition_name=worker.task_definition_name,
batch_size=worker.batch_size,
polling_interval=worker.polling_interval,
worker_execution_function=worker.execute,
worker_id=worker.get_identity(),
domain=worker.get_domain(),
metrics_settings=self.metrics_settings
)
self._task_runner[task_name] = task_runner
task_runner_thread = threading.Thread(target=task_runner.run)
self._task_runner_thread[task_name] = task_runner_thread
task_runner_thread.start()

def __start_metrics_provider_process(self):
if self.metrics_provider_process == None:
return
self.metrics_provider_process.start()
logger.info('Started MetricsProvider process')

def __start_task_runner_processes(self):
for task_runner_process in self.task_runner_processes:
task_runner_process.start()
logger.info('Started TaskRunner processes')

def __join_metrics_provider_process(self):
if self.metrics_provider_process == None:
return
self.metrics_provider_process.join()
logger.info('Joined MetricsProvider processes')

def __join_task_runner_processes(self):
for task_runner_process in self.task_runner_processes:
task_runner_process.join()
logger.info('Joined TaskRunner processes')
def __join_workers(self):
with self._task_runner_mutex:
for thread in self._task_runner_thread:
thread.join()
logger.info('Joined all workers')

def __stop_metrics_provider_process(self):
self.__stop_process(self.metrics_provider_process)

def __stop_task_runner_processes(self):
for task_runner_process in self.task_runner_processes:
self.__stop_process(task_runner_process)

def __stop_process(self, process: Process):
if process == None:
return
Expand Down
Loading