From 5e084a89f0d9b59332696a28902f4af48976a2b4 Mon Sep 17 00:00:00 2001 From: jamesan Date: Wed, 27 Apr 2022 15:11:35 -0600 Subject: [PATCH 01/21] Added support for MAX_PYTHON_THREADS environment variable to limit thread pool size --- nornir_pools/multiprocessthreadpool.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/nornir_pools/multiprocessthreadpool.py b/nornir_pools/multiprocessthreadpool.py index 45504f2..e88ae0a 100644 --- a/nornir_pools/multiprocessthreadpool.py +++ b/nornir_pools/multiprocessthreadpool.py @@ -217,6 +217,14 @@ def num_active_tasks(self): def __init__(self, num_threads=None, maxtasksperchild=None, *args, **kwargs): self._tasks = None + + if 'MAX_PYTHON_THREADS' in os.environ and num_threads is not None: + environ_max_threads = int(os.environ['MAX_PYTHON_THREADS']) + if environ_max_threads > num_threads: + prettyoutput.Log("Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={0} threads))".format(num_threads)) + + num_threads=min(environ_max_threads, num_threads) + self._num_processes = num_threads self._maxtasksperchild = maxtasksperchild self._active_tasks = {} # A list of incomplete AsyncResults From 9990622b14235d5257497e1e2af6fd7c4b0b095b Mon Sep 17 00:00:00 2001 From: James Anderson Date: Fri, 29 Apr 2022 23:44:00 -0700 Subject: [PATCH 02/21] Stop printing wait calls if there are no threads running in a pool --- nornir_pools/__init__.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/nornir_pools/__init__.py b/nornir_pools/__init__.py index 73a02ad..5415703 100644 --- a/nornir_pools/__init__.py +++ b/nornir_pools/__init__.py @@ -111,7 +111,7 @@ def __CreatePool(poolclass, Poolname=None, num_threads=None, *args, **kwargs): return dictKnownPools[Poolname] - logging.warn("Creating %s pool of type %s" % (Poolname, poolclass)) + logging.info(f"Creating {Poolname} pool of type {poolclass}") pool = poolclass(num_threads, *args, **kwargs) pool.name = Poolname @@ -129,7 +129,9 @@ def WaitOnAllPools(): pool_items = list(dictKnownPools.items()) for (key, pool) in pool_items: - _sprint("Waiting on pool: " + str(pool)) + if pool.num_active_tasks > 0: + _sprint("Waiting on pool: " + str(pool)) + pool.wait_completion() def _remove_pool(p): @@ -158,7 +160,9 @@ def ClosePools(): while len(pool_items) > 0: for (key, pool) in pool_items: - _sprint("Waiting on pool: {0}".format(str(pool))) + if pool.num_active_tasks > 0: + _sprint("Waiting on pool: {0}".format(str(pool))) + pool = None with __pool_management_lock: if key in dictKnownPools: From 692025383dc30caf1c73324c3fbd67713b473f47 Mon Sep 17 00:00:00 2001 From: James Anderson Date: Fri, 8 Jul 2022 11:11:39 -0700 Subject: [PATCH 03/21] Version bump --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index ed17ebc..4ad7ed8 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ setup(name='nornir_pools', zip_safe=True, classifiers=classifiers, - version='1.4.2', + version='1.5.0', description="A helper library that wraps python threads, multiprocessing, a process pool for shell commands, and parallel python with the same interface", author="James Anderson", author_email="James.R.Anderson@utah.edu", From 60f23ac4546731ac5e7c539e3c447dbc66c5763e Mon Sep 17 00:00:00 2001 From: jamesan Date: Tue, 12 Jul 2022 14:47:57 -0600 Subject: [PATCH 04/21] Brute force fix for systems with more than 63 cores --- nornir_pools/multiprocessthreadpool.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/nornir_pools/multiprocessthreadpool.py b/nornir_pools/multiprocessthreadpool.py index e88ae0a..4a40f4a 100644 --- a/nornir_pools/multiprocessthreadpool.py +++ b/nornir_pools/multiprocessthreadpool.py @@ -224,6 +224,14 @@ def __init__(self, num_threads=None, maxtasksperchild=None, *args, **kwargs): prettyoutput.Log("Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={0} threads))".format(num_threads)) num_threads=min(environ_max_threads, num_threads) + + if num_threads is not None and os.name == 'nt': + if num_threads > 63: + num_threads = 63 + #Limit the maximum number of threads to 63 due to Windows limit + #to waitall + #https://stackoverflow.com/questions/65252807/multiprocessing-pool-pool-on-windows-cpu-limit-of-63 + self._num_processes = num_threads self._maxtasksperchild = maxtasksperchild From b61ed8da3436a88532d4fb9145a79c467d6875f4 Mon Sep 17 00:00:00 2001 From: jamesan Date: Wed, 13 Jul 2022 14:28:37 -0600 Subject: [PATCH 05/21] Temporary fix for 63 thread wait handle limit --- nornir_pools/local_machine_pool.py | 16 ++++++++++++++++ nornir_pools/multiprocessthreadpool.py | 4 ++-- nornir_pools/threadpool.py | 17 +++++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/nornir_pools/local_machine_pool.py b/nornir_pools/local_machine_pool.py index ffded90..c575813 100644 --- a/nornir_pools/local_machine_pool.py +++ b/nornir_pools/local_machine_pool.py @@ -3,6 +3,7 @@ @author: u0490822 ''' +import os import nornir_pools from . import poolbase @@ -51,6 +52,21 @@ def __init__(self, num_threads, is_global = False, *args, **kwargs): ''' Constructor ''' + + if 'MAX_PYTHON_THREADS' in os.environ and num_threads is not None: + environ_max_threads = int(os.environ['MAX_PYTHON_THREADS']) + if environ_max_threads > num_threads: + prettyoutput.Log("Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={0} threads))".format(num_threads)) + + num_threads=min(environ_max_threads, num_threads) + + if num_threads is not None and os.name == 'nt': + if num_threads > 61: + num_threads = 61 + #Limit the maximum number of threads to 63 due to Windows limit + #to waitall + #https://stackoverflow.com/questions/65252807/multiprocessing-pool-pool-on-windows-cpu-limit-of-63 + self._num_threads = num_threads diff --git a/nornir_pools/multiprocessthreadpool.py b/nornir_pools/multiprocessthreadpool.py index 4a40f4a..a96b630 100644 --- a/nornir_pools/multiprocessthreadpool.py +++ b/nornir_pools/multiprocessthreadpool.py @@ -226,8 +226,8 @@ def __init__(self, num_threads=None, maxtasksperchild=None, *args, **kwargs): num_threads=min(environ_max_threads, num_threads) if num_threads is not None and os.name == 'nt': - if num_threads > 63: - num_threads = 63 + if num_threads > 61: + num_threads = 61 #Limit the maximum number of threads to 63 due to Windows limit #to waitall #https://stackoverflow.com/questions/65252807/multiprocessing-pool-pool-on-windows-cpu-limit-of-63 diff --git a/nornir_pools/threadpool.py b/nornir_pools/threadpool.py index 3654dd3..53242b6 100644 --- a/nornir_pools/threadpool.py +++ b/nornir_pools/threadpool.py @@ -9,6 +9,7 @@ import time import traceback import queue +import os #import logging import nornir_pools.task as task @@ -186,6 +187,22 @@ def __init__(self, num_threads=None, WorkerCheckInterval = None, *args, **kwarg :param int num_threads: Maximum number of threads in the pool :param float WorkerCheckInterval: How long worker threads wait for tasks before shutting down ''' + + if 'MAX_PYTHON_THREADS' in os.environ and num_threads is not None: + environ_max_threads = int(os.environ['MAX_PYTHON_THREADS']) + if environ_max_threads > num_threads: + prettyoutput.Log("Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={0} threads))".format(num_threads)) + + num_threads=min(environ_max_threads, num_threads) + + if num_threads is not None and os.name == 'nt': + if num_threads > 61: + num_threads = 61 + #Limit the maximum number of threads to 63 due to Windows limit + #to waitall + #https://stackoverflow.com/questions/65252807/multiprocessing-pool-pool-on-windows-cpu-limit-of-63 + + super(Thread_Pool, self).__init__(num_threads=num_threads, WorkerCheckInterval=WorkerCheckInterval, *args, **kwargs) self._next_thread_id = 0 From 8e93ed912c928a8bbe9c23f51d4bfb85408ffe3c Mon Sep 17 00:00:00 2001 From: James Anderson Date: Wed, 13 Jul 2022 13:48:08 -0700 Subject: [PATCH 06/21] Simplified the thread limit fix for pools --- nornir_pools/__init__.py | 48 +++++++++++++++++++++++--- nornir_pools/local_machine_pool.py | 15 +------- nornir_pools/multiprocessthreadpool.py | 15 +------- nornir_pools/poolbase.py | 3 ++ nornir_pools/threadpool.py | 15 +------- 5 files changed, 49 insertions(+), 47 deletions(-) diff --git a/nornir_pools/__init__.py b/nornir_pools/__init__.py index 5415703..474ce1a 100644 --- a/nornir_pools/__init__.py +++ b/nornir_pools/__init__.py @@ -79,11 +79,14 @@ import six import logging -import nornir_pools.processpool -import nornir_pools.threadpool -import nornir_pools.multiprocessthreadpool -import nornir_pools.local_machine_pool -import nornir_pools.serialpool +import nornir_pools.task as task +import nornir_pools.processpool as processpool +import nornir_pools.threadpool as threadpool +import nornir_pools.multiprocessthreadpool as multiprocessthreadpool +import nornir_pools.local_machine_pool as local_machine_pool +import nornir_pools.serialpool as serialpool + +from nornir_shared import prettyoutput __ParallelPythonAvailable = False @@ -95,11 +98,43 @@ dictKnownPools = {} +max_windows_threads = 61 + +def ApplyOSThreadLimit(num_threads): + """ + :return The minimum of the maximum number of threads on the OS, the + MAX_PYTHON_THREADS environment variable, or the requested num_threads + parameter, whichever is less + """ + global max_windows_threads + + if num_threads is None: + return None + + if 'MAX_PYTHON_THREADS' in os.environ: + environ_max_threads = int(os.environ['MAX_PYTHON_THREADS']) + if environ_max_threads > num_threads: + prettyoutput.Log("Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={0} threads))".format(num_threads)) + + num_threads=min(environ_max_threads, num_threads) + + if os.name == 'nt': + if num_threads > max_windows_threads: + num_threads = max_windows_threads + prettyoutput.Log("Number of threads in pool limited to windows handle limit of {max_windows_threads}") + #Limit the maximum number of threads to 63 due to Windows limit + #to waitall + #https://stackoverflow.com/questions/65252807/multiprocessing-pool-pool-on-windows-cpu-limit-of-63 + + return num_threads + + __pool_management_lock = threading.RLock() def __CreatePool(poolclass, Poolname=None, num_threads=None, *args, **kwargs): global dictKnownPools + global __pool_management_lock with __pool_management_lock: if Poolname is None: @@ -123,6 +158,7 @@ def __CreatePool(poolclass, Poolname=None, num_threads=None, *args, **kwargs): def WaitOnAllPools(): global dictKnownPools + global __pool_management_lock pool_items = None with __pool_management_lock: @@ -137,6 +173,7 @@ def WaitOnAllPools(): def _remove_pool(p): '''Called from pool shutdown implementations to remove the pool from the map of existing pools''' global dictKnownPools + global __pool_management_lock pname = p if not isinstance(p, str): @@ -154,6 +191,7 @@ def ClosePools(): ''' global dictKnownPools global profiler + global __pool_management_lock with __pool_management_lock: pool_items = list(dictKnownPools.items()) diff --git a/nornir_pools/local_machine_pool.py b/nornir_pools/local_machine_pool.py index c575813..bacd869 100644 --- a/nornir_pools/local_machine_pool.py +++ b/nornir_pools/local_machine_pool.py @@ -53,20 +53,7 @@ def __init__(self, num_threads, is_global = False, *args, **kwargs): Constructor ''' - if 'MAX_PYTHON_THREADS' in os.environ and num_threads is not None: - environ_max_threads = int(os.environ['MAX_PYTHON_THREADS']) - if environ_max_threads > num_threads: - prettyoutput.Log("Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={0} threads))".format(num_threads)) - - num_threads=min(environ_max_threads, num_threads) - - if num_threads is not None and os.name == 'nt': - if num_threads > 61: - num_threads = 61 - #Limit the maximum number of threads to 63 due to Windows limit - #to waitall - #https://stackoverflow.com/questions/65252807/multiprocessing-pool-pool-on-windows-cpu-limit-of-63 - + num_threads = nornir_pools.ApplyOSThreadLimit(num_threads) self._num_threads = num_threads diff --git a/nornir_pools/multiprocessthreadpool.py b/nornir_pools/multiprocessthreadpool.py index a96b630..e9820e3 100644 --- a/nornir_pools/multiprocessthreadpool.py +++ b/nornir_pools/multiprocessthreadpool.py @@ -218,20 +218,7 @@ def num_active_tasks(self): def __init__(self, num_threads=None, maxtasksperchild=None, *args, **kwargs): self._tasks = None - if 'MAX_PYTHON_THREADS' in os.environ and num_threads is not None: - environ_max_threads = int(os.environ['MAX_PYTHON_THREADS']) - if environ_max_threads > num_threads: - prettyoutput.Log("Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={0} threads))".format(num_threads)) - - num_threads=min(environ_max_threads, num_threads) - - if num_threads is not None and os.name == 'nt': - if num_threads > 61: - num_threads = 61 - #Limit the maximum number of threads to 63 due to Windows limit - #to waitall - #https://stackoverflow.com/questions/65252807/multiprocessing-pool-pool-on-windows-cpu-limit-of-63 - + num_threads = nornir_pools.ApplyOSThreadLimit(num_threads) self._num_processes = num_threads self._maxtasksperchild = maxtasksperchild diff --git a/nornir_pools/poolbase.py b/nornir_pools/poolbase.py index 5e29669..9583aad 100644 --- a/nornir_pools/poolbase.py +++ b/nornir_pools/poolbase.py @@ -118,9 +118,12 @@ def __init__(self, *args, **kwargs): self.WorkerCheckInterval = 2 self._max_threads = kwargs.get('num_threads', multiprocessing.cpu_count()) + if self._max_threads is None: self._max_threads = multiprocessing.cpu_count() + self._max_threads = nornir_pools.ApplyOSThreadLimit(self._max_threads) + self.tasks = queue.Queue(maxsize=self._max_threads * 32) #Queue for tasks yet to be completed by a thread #self.task_exceptions = queue.Queue() #Tasks that raise an unhandled exception are added to this queue diff --git a/nornir_pools/threadpool.py b/nornir_pools/threadpool.py index 53242b6..9eb6e54 100644 --- a/nornir_pools/threadpool.py +++ b/nornir_pools/threadpool.py @@ -188,20 +188,7 @@ def __init__(self, num_threads=None, WorkerCheckInterval = None, *args, **kwarg :param float WorkerCheckInterval: How long worker threads wait for tasks before shutting down ''' - if 'MAX_PYTHON_THREADS' in os.environ and num_threads is not None: - environ_max_threads = int(os.environ['MAX_PYTHON_THREADS']) - if environ_max_threads > num_threads: - prettyoutput.Log("Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={0} threads))".format(num_threads)) - - num_threads=min(environ_max_threads, num_threads) - - if num_threads is not None and os.name == 'nt': - if num_threads > 61: - num_threads = 61 - #Limit the maximum number of threads to 63 due to Windows limit - #to waitall - #https://stackoverflow.com/questions/65252807/multiprocessing-pool-pool-on-windows-cpu-limit-of-63 - + num_threads = nornir_pools.ApplyOSThreadLimit(num_threads) super(Thread_Pool, self).__init__(num_threads=num_threads, WorkerCheckInterval=WorkerCheckInterval, *args, **kwargs) From ffb01a250c3b1f7360a304b9f2955b5dfb881f0a Mon Sep 17 00:00:00 2001 From: James Anderson Date: Wed, 13 Jul 2022 15:20:46 -0700 Subject: [PATCH 07/21] Fixed string formatting not being used --- nornir_pools/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nornir_pools/__init__.py b/nornir_pools/__init__.py index 474ce1a..1086cad 100644 --- a/nornir_pools/__init__.py +++ b/nornir_pools/__init__.py @@ -104,7 +104,7 @@ def ApplyOSThreadLimit(num_threads): """ :return The minimum of the maximum number of threads on the OS, the MAX_PYTHON_THREADS environment variable, or the requested num_threads - parameter, whichever is less + parameter """ global max_windows_threads @@ -114,14 +114,14 @@ def ApplyOSThreadLimit(num_threads): if 'MAX_PYTHON_THREADS' in os.environ: environ_max_threads = int(os.environ['MAX_PYTHON_THREADS']) if environ_max_threads > num_threads: - prettyoutput.Log("Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={0} threads))".format(num_threads)) + prettyoutput.Log(f"Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={num_threads} threads))") num_threads=min(environ_max_threads, num_threads) if os.name == 'nt': if num_threads > max_windows_threads: num_threads = max_windows_threads - prettyoutput.Log("Number of threads in pool limited to windows handle limit of {max_windows_threads}") + prettyoutput.Log(f"Number of threads in pool limited to windows handle limit of {max_windows_threads}") #Limit the maximum number of threads to 63 due to Windows limit #to waitall #https://stackoverflow.com/questions/65252807/multiprocessing-pool-pool-on-windows-cpu-limit-of-63 From b85b9435dd65d945ae078995d1b31fb878f0e4aa Mon Sep 17 00:00:00 2001 From: James Anderson Date: Tue, 19 Jul 2022 15:14:53 -0700 Subject: [PATCH 08/21] Replaced deprecated isSet function --- nornir_pools/threadpool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nornir_pools/threadpool.py b/nornir_pools/threadpool.py index 9eb6e54..7fe0298 100644 --- a/nornir_pools/threadpool.py +++ b/nornir_pools/threadpool.py @@ -104,7 +104,7 @@ def run(self): entry = self.tasks.get(True, self.queue_wait_time) # Wait five seconds for a new entry in the queue and check if we should shutdown if nothing shows up except queue.Empty as e: # Check if we should kill the thread - if(self.shutdown_event.isSet()): + if(self.shutdown_event.is_set()): #nornir_pools._sprint("Queue Empty, exiting worker thread") self.deadthreadqueue.put(self) return From c9ae096e4edfd558b4e67dffd82b8b05562c6082 Mon Sep 17 00:00:00 2001 From: James Anderson Date: Tue, 30 Aug 2022 15:56:59 -0700 Subject: [PATCH 09/21] Replace deprecated functions --- nornir_pools/poolbase.py | 2 +- nornir_pools/threadpool.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nornir_pools/poolbase.py b/nornir_pools/poolbase.py index 9583aad..5db143d 100644 --- a/nornir_pools/poolbase.py +++ b/nornir_pools/poolbase.py @@ -145,7 +145,7 @@ def add_worker_thread(self): def add_threads_if_needed(self): - assert(False == self.shutdown_event.isSet()) + assert(False == self.shutdown_event.is_set()) self.remove_finished_threads() num_active_threads = len(self._threads) diff --git a/nornir_pools/threadpool.py b/nornir_pools/threadpool.py index 7fe0298..128c457 100644 --- a/nornir_pools/threadpool.py +++ b/nornir_pools/threadpool.py @@ -198,7 +198,7 @@ def __init__(self, num_threads=None, WorkerCheckInterval = None, *args, **kwarg return def add_worker_thread(self): - assert(False == self.shutdown_event.isSet()) + assert(False == self.shutdown_event.is_set()) worker_name = "Thread pool #%d" % (self._next_thread_id) w = Worker(self.tasks, self.deadthreadqueue, self.shutdown_event, self.WorkerCheckInterval, name=worker_name) @@ -214,7 +214,7 @@ def add_task(self, name, func, *args, **kwargs): assert(callable(func)) """Add a task to the queue""" - assert(False == self.shutdown_event.isSet()) + assert(False == self.shutdown_event.is_set()) # keep_alive_thread is a non-daemon thread started when the queue is non-empty. # Python will not shut down while non-daemon threads are alive. When the queue empties the thread exits. @@ -223,7 +223,7 @@ def add_task(self, name, func, *args, **kwargs): entry = ThreadTask(name, func, *args, **kwargs) self.tasks.put(entry) - self.add_threads_if_needed() + self.add_threads_if_needed() return entry From 2b70a7ff2de88c3ae195ddf18c95913adeb1b665 Mon Sep 17 00:00:00 2001 From: James Anderson Date: Thu, 1 Sep 2022 16:05:12 -0700 Subject: [PATCH 10/21] Replaced deprecated isSet call --- nornir_pools/poolbase.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nornir_pools/poolbase.py b/nornir_pools/poolbase.py index 5db143d..cc86669 100644 --- a/nornir_pools/poolbase.py +++ b/nornir_pools/poolbase.py @@ -128,7 +128,7 @@ def __init__(self, *args, **kwargs): #self.task_exceptions = queue.Queue() #Tasks that raise an unhandled exception are added to this queue def shutdown(self): - if self.shutdown_event.isSet(): + if self.shutdown_event.is_set(): return self.wait_completion() From c5a734ef26f9d339a53eaa078a010cb98969ba3b Mon Sep 17 00:00:00 2001 From: James Anderson Date: Thu, 8 Sep 2022 15:14:19 -0700 Subject: [PATCH 11/21] Fixed not recording timestamp of last report time --- nornir_pools/poolbase.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/nornir_pools/poolbase.py b/nornir_pools/poolbase.py index cc86669..9e78538 100644 --- a/nornir_pools/poolbase.py +++ b/nornir_pools/poolbase.py @@ -50,6 +50,7 @@ def __init__(self, *args, **kwargs): self._name = kwargs.get('name', None) self._last_job_report_time = time.time() self.job_report_interval_in_seconds = kwargs.get("job_report_interval", 10.0) + self._last_num_active_tasks = 0 def add_task(self, name, func, *args, **kwargs): ''' @@ -79,10 +80,14 @@ def TryReportActiveTaskCount(self): ''' Report the current job count if we haven't reported it recently ''' + if self.num_active_tasks < 2 and self._last_num_active_tasks < 2: + return + now = time.time() time_since_last_report = now - self._last_job_report_time if time_since_last_report > self.job_report_interval_in_seconds: - time_since_last_report = now + self._last_job_report_time = now + self._last_num_active_tasks = self.num_active_tasks self.PrintActiveTaskCount() def PrintActiveTaskCount(self): From 473144b0a1d11396b5aff19490cbafffb3ca320f Mon Sep 17 00:00:00 2001 From: James Anderson Date: Tue, 13 Sep 2022 14:28:23 -0700 Subject: [PATCH 12/21] Fixes from the PyCharm IDE --- .idea/nornir-pools.iml | 13 +++++ nornir_pools/poolbase.py | 103 ++++++++++++++++++------------------ nornir_pools/processpool.py | 5 +- nornir_pools/task.py | 2 +- 4 files changed, 67 insertions(+), 56 deletions(-) create mode 100644 .idea/nornir-pools.iml diff --git a/.idea/nornir-pools.iml b/.idea/nornir-pools.iml new file mode 100644 index 0000000..da40603 --- /dev/null +++ b/.idea/nornir-pools.iml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/nornir_pools/poolbase.py b/nornir_pools/poolbase.py index 9e78538..9835fbe 100644 --- a/nornir_pools/poolbase.py +++ b/nornir_pools/poolbase.py @@ -5,6 +5,7 @@ import logging import nornir_pools + class PoolBase(object): ''' Pool objects provide the interface to create tasks on the pool. @@ -13,22 +14,22 @@ class PoolBase(object): @property def name(self): return self._name - + @name.setter def name(self, val): self._name = val - + @property def num_active_tasks(self): raise NotImplementedError() - + @property def logger(self): if self._logger is None: self._logger = logging.getLogger(__name__) - + return self._logger - + def __str__(self): return "Pool {0} with {1} active tasks".format(self.name, self.num_active_tasks) @@ -45,7 +46,7 @@ def wait_completion(self): raise NotImplementedError() def __init__(self, *args, **kwargs): - #self.logger = logging.getLogger(__name__) + # self.logger = logging.getLogger(__name__) self._logger = None self._name = kwargs.get('name', None) self._last_job_report_time = time.time() @@ -75,121 +76,121 @@ def add_process(self, name, func, *args, **kwargs): :rtype: task ''' raise NotImplementedError() - + def TryReportActiveTaskCount(self): ''' Report the current job count if we haven't reported it recently ''' if self.num_active_tasks < 2 and self._last_num_active_tasks < 2: - return - + return + now = time.time() time_since_last_report = now - self._last_job_report_time if time_since_last_report > self.job_report_interval_in_seconds: self._last_job_report_time = now self._last_num_active_tasks = self.num_active_tasks self.PrintActiveTaskCount() - + def PrintActiveTaskCount(self): JobQText = "Jobs Queued: " + str(self.num_active_tasks) JobQText = ('\b' * 40) + JobQText + ('.' * (40 - len(JobQText))) - nornir_pools._PrintProgressUpdate (JobQText) + nornir_pools._PrintProgressUpdate(JobQText) return - + + class LocalThreadPoolBase(PoolBase): '''Base class for pools that rely on local threads and a queue to dispatch jobs''' - - WorkerCheckInterval = 0.5 #How often workers check for new jobs in the queue - + + WorkerCheckInterval = 0.5 # How often workers check for new jobs in the queue + @property def num_active_tasks(self): return self.tasks.qsize() - + def __init__(self, *args, **kwargs): ''' :param int num_threads: number of threads, defaults to number of cores installed on system ''' super(LocalThreadPoolBase, self).__init__(*args, **kwargs) - - - self.deadthreadqueue = queue.Queue() #Threads put themselves here when they die + + self.deadthreadqueue = queue.Queue() # Threads put themselves here when they die self.shutdown_event = threading.Event() self.shutdown_event.clear() - #self.keep_alive_thread = None + # self.keep_alive_thread = None self._threads = [] - - self.WorkerCheckInterval=kwargs.get('WorkerCheckInterval', None) + + self.WorkerCheckInterval = kwargs.get('WorkerCheckInterval', None) if self.WorkerCheckInterval is None: self.WorkerCheckInterval = 2 - + self._max_threads = kwargs.get('num_threads', multiprocessing.cpu_count()) - + if self._max_threads is None: self._max_threads = multiprocessing.cpu_count() - + self._max_threads = nornir_pools.ApplyOSThreadLimit(self._max_threads) - - self.tasks = queue.Queue(maxsize=self._max_threads * 32) #Queue for tasks yet to be completed by a thread - #self.task_exceptions = queue.Queue() #Tasks that raise an unhandled exception are added to this queue - + + self.tasks = queue.Queue(maxsize=self._max_threads * 32) # Queue for tasks yet to be completed by a thread + # self.task_exceptions = queue.Queue() #Tasks that raise an unhandled exception are added to this queue + def shutdown(self): if self.shutdown_event.is_set(): return - + self.wait_completion() self.shutdown_event.set() - + nornir_pools._remove_pool(self) # Give threads time to die gracefully time.sleep(self.WorkerCheckInterval + 1) del self._threads -# + + # def add_worker_thread(self): raise NotImplementedError("add_worker_thread must be implemented by derived class and return a thread object") - + def add_threads_if_needed(self): - - assert(False == self.shutdown_event.is_set()) - + + assert (False == self.shutdown_event.is_set()) + self.remove_finished_threads() num_active_threads = len(self._threads) - + if num_active_threads == self._max_threads: - return - - num_threads_needed = min(self._max_threads, self.tasks.qsize()+1) - num_active_threads - + return + + num_threads_needed = min(self._max_threads, self.tasks.qsize() + 1) - num_active_threads + num_threads_created = 0 - #while num_active_threads < min((self._max_threads, self.tasks.qsize()+1)): + # while num_active_threads < min((self._max_threads, self.tasks.qsize()+1)): while num_threads_created < num_threads_needed: if not self.tasks.empty(): t = self.add_worker_thread() - assert(isinstance(t, threading.Thread)) + assert (isinstance(t, threading.Thread)) self._threads.append(t) num_active_threads += 1 num_threads_created += 1 time.sleep(0) - + else: break - + def remove_finished_threads(self): try: while True: t = self.deadthreadqueue.get_nowait() if t is None: break - else: - for i in range(len(self._threads)-1, 0,-1): + else: + for i in range(len(self._threads) - 1, 0, -1): if t == self._threads[i]: del self._threads[i] break except queue.Empty as e: - return - + return + return - def wait_completion(self): """Wait for completion of all the tasks in the queue. @@ -197,7 +198,5 @@ def wait_completion(self): each task to detect exceptions if there were any """ - self.tasks.join() self.remove_finished_threads() - \ No newline at end of file diff --git a/nornir_pools/processpool.py b/nornir_pools/processpool.py index 25ec17b..86d0543 100644 --- a/nornir_pools/processpool.py +++ b/nornir_pools/processpool.py @@ -230,9 +230,8 @@ def add_process(self, name, func, *args, **kwargs): if not 'shell' in kwargs: kwargs['shell'] = True else: - kwargs = {} - kwargs['shell'] = True - + kwargs = {'shell': True} + if func is str: entry = ImmediateProcessTask(name, func, *args, **kwargs) else: diff --git a/nornir_pools/task.py b/nornir_pools/task.py index 5267438..6ac944a 100644 --- a/nornir_pools/task.py +++ b/nornir_pools/task.py @@ -119,7 +119,7 @@ def iscompleted(self): :return: True if the task is completed, otherwise False :rtype: bool ''' - return self.completed.isSet() + return self.completed.is_set() def wait(self): self.completed.wait() From 9a6bfa117c14c8a2ecea0da74a6c8fc6c54ef92e Mon Sep 17 00:00:00 2001 From: James Anderson Date: Tue, 13 Sep 2022 14:30:35 -0700 Subject: [PATCH 13/21] Removing strange file that auto-added itself from PyCharm --- .idea/nornir-pools.iml | 13 ------------- 1 file changed, 13 deletions(-) delete mode 100644 .idea/nornir-pools.iml diff --git a/.idea/nornir-pools.iml b/.idea/nornir-pools.iml deleted file mode 100644 index da40603..0000000 --- a/.idea/nornir-pools.iml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file From 98bed974237e489b30f81fb6282a9b867a10ccd6 Mon Sep 17 00:00:00 2001 From: James Anderson Date: Fri, 23 Sep 2022 12:11:11 -0700 Subject: [PATCH 14/21] Pycharm IDE suggested fixes. Passes tests --- nornir_pools/__init__.py | 190 ++++++++++++++----------- nornir_pools/multiprocessthreadpool.py | 4 +- nornir_pools/parallelpythonpool.py | 4 +- nornir_pools/processpool.py | 4 +- test/__init__.py | 2 +- test/test_nornir_pools.py | 2 +- 6 files changed, 115 insertions(+), 91 deletions(-) diff --git a/nornir_pools/__init__.py b/nornir_pools/__init__.py index 1086cad..0ebe472 100644 --- a/nornir_pools/__init__.py +++ b/nornir_pools/__init__.py @@ -1,6 +1,7 @@ ''' -nornir_pools aims to provide a consistent interface around four different multi-threading and clustering libraries available to Python. +nornir_pools aims to provide a consistent interface around four different multi-threading and clustering libraries +available to Python. The use pattern for pools is: @@ -9,7 +10,8 @@ 3. save the task object returned 4. call wait or wait_return on the task object to fetch the output or raise exceptions -Steps 3 and 4 can be skipped if output is not required. In this case wait_completion can be called on the pool to delay until all tasks have completed. Note that in this pattern exceptions may be lost. +Steps 3 and 4 can be skipped if output is not required. In this case wait_completion can be called on the pool to +delay until all tasks have completed. Note that in this pattern exceptions may be lost. Pool Creation ------------- @@ -55,14 +57,17 @@ Pool Destruction ---------------- -It is not necessary to perform any cleanup. Functions to delete pools would not be hard to add. ClosePools is called automatically at script termination by atexit +It is not necessary to perform any cleanup. Functions to delete pools would not be hard to add. ClosePools is +called automatically at script termination by atexit .. autofunction:: nornir_pools.ClosePools Optimization ------------ -On windows there is significant overhead to passing parameters to multiprocessing jobs. To address this I added pickle overrides to objects being marshalled. I also removed as many global initializations as I could from modules loaded by the tasks. +On windows there is significant overhead to passing parameters to multiprocessing jobs. To address this I added +pickle overrides to objects being marshalled. I also removed as many global initializations as I could from modules +loaded by the tasks. ''' @@ -72,11 +77,9 @@ import datetime import warnings import threading -import cProfile +#import cProfile import pstats import glob -import shutil -import six import logging import nornir_pools.task as task @@ -100,6 +103,7 @@ max_windows_threads = 61 + def ApplyOSThreadLimit(num_threads): """ :return The minimum of the maximum number of threads on the OS, the @@ -107,50 +111,51 @@ def ApplyOSThreadLimit(num_threads): parameter """ global max_windows_threads - + if num_threads is None: return None - + if 'MAX_PYTHON_THREADS' in os.environ: environ_max_threads = int(os.environ['MAX_PYTHON_THREADS']) if environ_max_threads > num_threads: - prettyoutput.Log(f"Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={num_threads} threads))") - - num_threads=min(environ_max_threads, num_threads) - + prettyoutput.Log( + f"Number of threads in pool limited to MAX_PYTHON_THREADS environment variable, (={num_threads} threads))") + + num_threads = min(environ_max_threads, num_threads) + if os.name == 'nt': if num_threads > max_windows_threads: num_threads = max_windows_threads prettyoutput.Log(f"Number of threads in pool limited to windows handle limit of {max_windows_threads}") - #Limit the maximum number of threads to 63 due to Windows limit - #to waitall - #https://stackoverflow.com/questions/65252807/multiprocessing-pool-pool-on-windows-cpu-limit-of-63 - + # Limit the maximum number of threads to 63 due to Windows limit + # to waitall + # https://stackoverflow.com/questions/65252807/multiprocessing-pool-pool-on-windows-cpu-limit-of-63 + return num_threads __pool_management_lock = threading.RLock() -def __CreatePool(poolclass, Poolname=None, num_threads=None, *args, **kwargs): +def __CreatePool(poolclass, Poolname=None, num_threads=None, *args, **kwargs): global dictKnownPools global __pool_management_lock - + with __pool_management_lock: if Poolname is None: return GetGlobalLocalMachinePool() - + if Poolname in dictKnownPools: pool = dictKnownPools[Poolname] - assert(pool.__class__ == poolclass) - + assert (pool.__class__ == poolclass) + return dictKnownPools[Poolname] - + logging.info(f"Creating {Poolname} pool of type {poolclass}") - + pool = poolclass(num_threads, *args, **kwargs) pool.name = Poolname - + dictKnownPools[Poolname] = pool return pool @@ -159,30 +164,32 @@ def __CreatePool(poolclass, Poolname=None, num_threads=None, *args, **kwargs): def WaitOnAllPools(): global dictKnownPools global __pool_management_lock - + pool_items = None with __pool_management_lock: pool_items = list(dictKnownPools.items()) - + for (key, pool) in pool_items: if pool.num_active_tasks > 0: _sprint("Waiting on pool: " + str(pool)) - + pool.wait_completion() + def _remove_pool(p): '''Called from pool shutdown implementations to remove the pool from the map of existing pools''' global dictKnownPools global __pool_management_lock - + pname = p if not isinstance(p, str): - pname = p.name - + pname = p.name + with __pool_management_lock: if pname in dictKnownPools: del dictKnownPools[pname] + @atexit.register def ClosePools(): ''' @@ -192,35 +199,35 @@ def ClosePools(): global dictKnownPools global profiler global __pool_management_lock - + with __pool_management_lock: pool_items = list(dictKnownPools.items()) - + while len(pool_items) > 0: for (key, pool) in pool_items: if pool.num_active_tasks > 0: _sprint("Waiting on pool: {0}".format(str(pool))) - + pool = None with __pool_management_lock: if key in dictKnownPools: pool = dictKnownPools[key] else: _sprint("pool {0} no longer in known pool list. Moving on.".format(str(pool))) - + if pool is None: continue - + pool.shutdown() - + with __pool_management_lock: if key in dictKnownPools: del dictKnownPools[key] - + with __pool_management_lock: pool_items = list(dictKnownPools.items()) - + def GetThreadPool(Poolname=None, num_threads=None): ''' Get or create a specific thread pool using vanilla python threads @@ -229,24 +236,26 @@ def GetThreadPool(Poolname=None, num_threads=None): def GetLocalMachinePool(Poolname=None, num_threads=None, is_global=False): - - return __CreatePool(nornir_pools.local_machine_pool.LocalMachinePool, Poolname, num_threads,is_global=is_global) + return __CreatePool(nornir_pools.local_machine_pool.LocalMachinePool, Poolname, num_threads, is_global=is_global) def GetMultithreadingPool(Poolname=None, num_threads=None): - '''Get or create a specific thread pool to execute threads in other processes on the same computer using the multiprocessing library''' + '''Get or create a specific thread pool to execute threads in other processes on the same computer using the + multiprocessing library ''' warnings.warn(DeprecationWarning("GetMultithreadingPool is deprecated. Use GetLocalMachinePool instead")) - return __CreatePool(nornir_pools.multiprocessthreadpool.MultiprocessThread_Pool , Poolname, num_threads) + return __CreatePool(nornir_pools.multiprocessthreadpool.MultiprocessThread_Pool, Poolname, num_threads) def GetProcessPool(Poolname=None, num_threads=None): - '''Get or create a specific pool to invoke shell command processes on the same computer using the subprocess module''' + '''Get or create a specific pool to invoke shell command processes on the same computer using the subprocess + module ''' warnings.warn(DeprecationWarning("GetProcessPool is deprecated. Use GetLocalMachinePool instead")) return __CreatePool(nornir_pools.processpool.Process_Pool, Poolname, num_threads) def GetParallelPythonPool(Poolname=None, num_threads=None): - '''Get or create a specific pool to invoke functions or shell command processes on a cluster using parallel python''' + '''Get or create a specific pool to invoke functions or shell command processes on a cluster using parallel + python ''' return __CreatePool(nornir_pools.parallelpythonpool.ParallelPythonProcess_Pool, Poolname, num_threads) @@ -259,7 +268,6 @@ def GetSerialPool(Poolname=None, num_threads=None): return __CreatePool(nornir_pools.serialpool.SerialPool, Poolname, num_threads) - def GetGlobalSerialPool(): ''' Common pool for processes on the local machine @@ -267,6 +275,7 @@ def GetGlobalSerialPool(): return GetSerialPool(Poolname="Global") # return GetProcessPool("Global local process pool") + def GetGlobalProcessPool(): ''' Common pool for processes on the local machine @@ -274,13 +283,16 @@ def GetGlobalProcessPool(): return GetProcessPool(Poolname="Global process pool") # return GetProcessPool("Global local process pool") + def GetGlobalLocalMachinePool(): ''' - Common pool for launching other processes for threads or executables. Combines multithreading and process pool interface. + Common pool for launching other processes for threads or executables. Combines multithreading and process pool + interface. ''' return GetLocalMachinePool(Poolname="Global local machine pool", is_global=True) + def GetGlobalClusterPool(): ''' Get the common pool for placing tasks on the cluster @@ -301,40 +313,40 @@ def GetGlobalThreadPool(): def GetGlobalMultithreadingPool(): ''' - Common pool for multithreading module tasks, threads run in different python processes to work around the global interpreter lock + Common pool for multithreading module tasks, threads run in different python processes to work around the global + interpreter lock ''' # return GetGlobalLocalMachinePool() return GetMultithreadingPool("Global multithreading pool") + # ToPreventFlooding the output I only write pool size every five seconds when running under ECLIPSE __LastConsoleWrite = datetime.datetime.utcnow() def __CleanOutputForEclipse(s): - s = s.replace('\b', ''); - s = s.replace('.', ''); - s = s.strip(); + s = s.replace('\b', '') + s = s.replace('.', '') + s = s.strip() return s def __EclipseConsoleWrite(s, newline=False): - - es = __CleanOutputForEclipse(s) + es = __CleanOutputForEclipse(s) if newline: es = es + '\n' - + sys.stdout.write(es) - - -def __EclipseConsoleWriteError(s, newline=False): - es = __CleanOutputForEclipse(s) + +def __EclipseConsoleWriteError(s, newline=False): + es = __CleanOutputForEclipse(s) if newline: es = es + '\n' - + sys.stderr.write(es) - + def __PrintProgressUpdateEclipse(s): global __LastConsoleWrite @@ -354,16 +366,17 @@ def __ConsoleWrite(s, newline=False): s = s + '\n' sys.stdout.write(s) - + + def __ConsoleWriteError(s, newline=False): if newline: s = s + '\n' sys.stderr.write(s) - + def _PrintError(s): - if 'ECLIPSE' in os.environ: + if 'ECLIPSE' in os.environ: __EclipseConsoleWrite(s) return @@ -371,7 +384,7 @@ def _PrintError(s): def _PrintWarning(s): - if 'ECLIPSE' in os.environ: + if 'ECLIPSE' in os.environ: __PrintProgressUpdateEclipse(s) return @@ -379,7 +392,7 @@ def _PrintWarning(s): def _PrintProgressUpdate(s): - if 'ECLIPSE' in os.environ: + if 'ECLIPSE' in os.environ: __PrintProgressUpdateEclipse(s) return @@ -390,7 +403,7 @@ def _sprint(s): """ Thread-safe print fucntion """ # Eclipse copies test output to the unit test window and this copy has # problems if the output has non-alphanumeric characters - if 'ECLIPSE' in os.environ: + if 'ECLIPSE' in os.environ: __EclipseConsoleWrite(s, newline=True) else: __ConsoleWrite(s, newline=True) @@ -401,7 +414,7 @@ def _pprint(s): # Eclipse copies test output to the unit test window and this copy has # problems if the output has non-alphanumeric characters - if 'ECLIPSE' in os.environ: + if 'ECLIPSE' in os.environ: __EclipseConsoleWrite(s, newline=False) else: __ConsoleWrite(s, newline=False) @@ -410,25 +423,28 @@ def _pprint(s): profiler = None profile_data_path = None -def GetAndCreateProfileDataPath(): +def GetAndCreateProfileDataPath(): profile_data_path = os.path.join(os.getcwd(), 'pool_profiles') - #profile_data_path = os.path.join("C:\\Temp\\Testoutput\\PoolTestBase\\", 'pool_profiles') + # profile_data_path = os.path.join("C:\\Temp\\Testoutput\\PoolTestBase\\", 'pool_profiles') os.makedirs(profile_data_path, exist_ok=True) - + return profile_data_path -def GetAndCreateProfileDataFileName(): - + +def GetAndCreateProfileDataFileName(): profile_data_path = GetAndCreateProfileDataPath() - - thread = threading.current_thread() + + thread = threading.current_thread() filename = "%d_%d.profile" % (os.getpid(), thread.ident) profile_data_file = os.path.join(profile_data_path, filename) return profile_data_file + def start_profiling(): return + + # global profiler # # if not profiler is None: @@ -441,6 +457,8 @@ def start_profiling(): def end_profiling(): return + + # global profiler # if not profiler is None: # profile_data_path = GetAndCreateProfileDataFileName() @@ -448,15 +466,18 @@ def end_profiling(): # profiler = None def invoke_with_profiler(func, *args, **kwargs): -# '''Launch a profiler for our function + # '''Launch a profiler for our function func_args = args start_profiling() func(*func_args, **kwargs) + def aggregate_profiler_data(output_path): return + + # profile_data_path = GetAndCreateProfileDataPath() # files = glob.glob(os.path.join(profile_data_path, "*.profile")) # @@ -490,22 +511,23 @@ def aggregate_profiler_data(output_path): # -def MergeProfilerStats(root_output_dir, profile_dir, pool_name): +def MergeProfilerStats(root_output_dir, profile_dir, pool_name): '''Called by atexit. Merges all *.profile files in the profile_dir into a single .profile file''' - profile_files = glob.glob(os.path.join(profile_dir, "**","*.pstats"), recursive=True) - + profile_files = glob.glob(os.path.join(profile_dir, "**", "*.pstats"), recursive=True) + if len(profile_files) == 0: return - + agg = pstats.Stats() agg.add(*profile_files) - + output_full_path = os.path.join(root_output_dir, pool_name + '_aggregate.pstats') agg.dump_stats(output_full_path) - - #Remove the individual .profile files + + # Remove the individual .profile files for f in profile_files: - os.remove(f) + os.remove(f) + if __name__ == '__main__': - start_profiling() \ No newline at end of file + start_profiling() diff --git a/nornir_pools/multiprocessthreadpool.py b/nornir_pools/multiprocessthreadpool.py index e9820e3..3f3f11d 100644 --- a/nornir_pools/multiprocessthreadpool.py +++ b/nornir_pools/multiprocessthreadpool.py @@ -15,12 +15,12 @@ import cProfile import os -import time +#import time from nornir_shared import prettyoutput #from threading import Lock -_profiler = None +_profiler = None # type: None | cProfile.Profile def _poolinit(profile_dir=None): diff --git a/nornir_pools/parallelpythonpool.py b/nornir_pools/parallelpythonpool.py index 71271a6..a7278e8 100644 --- a/nornir_pools/parallelpythonpool.py +++ b/nornir_pools/parallelpythonpool.py @@ -149,7 +149,7 @@ def RemoteWorkerProcess(cmd, fargs): entry['returncode'] = -1 entry['node'] = socket.gethostname() - error_message = "*** {0}\n{1}\n".format(traceback.format_exc()) + error_message = "*** {0}".format(traceback.format_exc()) server_message = "\n*** Cluster node %s raised exception: ***\n" % socket.gethostname() entry['error_message'] = server_message + error_message # sys.stderr.write(error_message) @@ -188,7 +188,7 @@ def RemoteFunction(func, fargs): # inform operator of the name of the task throwing the exception # also, intercept the traceback and send to stderr.write() to avoid interweaving of traceback lines from parallel threads - error_message = "*** {0}\n{1}\n".format(traceback.format_exc()) + error_message = "*** {0}".format(traceback.format_exc()) server_message = "\n*** Cluster node %s raised exception: ***\n" % socket.gethostname() entry['error_message'] = server_message + error_message # sys.stderr.write(error_message) diff --git a/nornir_pools/processpool.py b/nornir_pools/processpool.py index 86d0543..1c4b7a3 100644 --- a/nornir_pools/processpool.py +++ b/nornir_pools/processpool.py @@ -23,8 +23,10 @@ class ImmediateProcessTask(task.TaskWithEvent): '''Launches processes without threads''' def __init__(self, name, func, *args, **kwargs): - super(ProcessTask, self).__init__(name, *args, **kwargs) + super(ImmediateProcessTask, self).__init__(name, *args, **kwargs) + self.proc = None self.cmd = func + self.returned_value = None self.Run() def Run(self): diff --git a/test/__init__.py b/test/__init__.py index cdf25dc..ba808b1 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -1 +1 @@ -__all__ = ['test_nornir_pools'] \ No newline at end of file +__all__ = ['test_nornir_pools'] diff --git a/test/test_nornir_pools.py b/test/test_nornir_pools.py index 8c0e830..d82ee83 100644 --- a/test/test_nornir_pools.py +++ b/test/test_nornir_pools.py @@ -95,7 +95,7 @@ def VerifyExceptionBehaviour(test, pool): task = pool.add_task(exceptText, RaiseException, exceptText) pool.wait_completion() - task.wait(); + task.wait() except IntentionalPoolException as e: print("Correctly found exception in thread\n" + str(e)) ExceptionFound = True From a90437d9a08a43506f7c2407059cd5d0023f9bc8 Mon Sep 17 00:00:00 2001 From: James Anderson Date: Wed, 28 Sep 2022 14:32:06 -0700 Subject: [PATCH 15/21] Clarified class was an abstract base class --- nornir_pools/poolbase.py | 3 ++- nornir_pools/processpool.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/nornir_pools/poolbase.py b/nornir_pools/poolbase.py index 9835fbe..47a62bd 100644 --- a/nornir_pools/poolbase.py +++ b/nornir_pools/poolbase.py @@ -1,4 +1,5 @@ from six.moves import queue +import abc import threading import time import multiprocessing @@ -98,7 +99,7 @@ def PrintActiveTaskCount(self): return -class LocalThreadPoolBase(PoolBase): +class LocalThreadPoolBase(abc.ABC, PoolBase): '''Base class for pools that rely on local threads and a queue to dispatch jobs''' WorkerCheckInterval = 0.5 # How often workers check for new jobs in the queue diff --git a/nornir_pools/processpool.py b/nornir_pools/processpool.py index 1c4b7a3..8db8d32 100644 --- a/nornir_pools/processpool.py +++ b/nornir_pools/processpool.py @@ -115,7 +115,7 @@ def run(self): entry = self.tasks.get(True, self.queue_wait_time) # Wait five seconds for a new entry in the queue and check if we should shutdown if nothing shows up except: # Check if we should kill the thread - if(self.shutdown_event.isSet()): + if self.shutdown_event.is_set(): # _sprint ("Queue Empty, exiting worker thread") self.deadthreadqueue.put(self) return From 9930676809d3bb85f648ded54be182acb23d26cc Mon Sep 17 00:00:00 2001 From: James Anderson Date: Thu, 29 Sep 2022 17:06:48 -0700 Subject: [PATCH 16/21] Additional typing information. Passes tests. --- nornir_pools/__init__.py | 7 +- nornir_pools/multiprocessthreadpool.py | 305 +++++++++++++------------ nornir_pools/poolbase.py | 29 ++- nornir_pools/processpool.py | 72 +++--- nornir_pools/serialpool.py | 5 +- nornir_pools/task.py | 33 +-- nornir_pools/threadpool.py | 139 +++++------ 7 files changed, 320 insertions(+), 270 deletions(-) diff --git a/nornir_pools/__init__.py b/nornir_pools/__init__.py index 0ebe472..6591484 100644 --- a/nornir_pools/__init__.py +++ b/nornir_pools/__init__.py @@ -82,6 +82,7 @@ import glob import logging +import nornir_pools.poolbase as poolbase import nornir_pools.task as task import nornir_pools.processpool as processpool import nornir_pools.threadpool as threadpool @@ -232,7 +233,7 @@ def GetThreadPool(Poolname=None, num_threads=None): ''' Get or create a specific thread pool using vanilla python threads ''' - return __CreatePool(nornir_pools.threadpool.Thread_Pool, Poolname, num_threads) + return __CreatePool(nornir_pools.threadpool.ThreadPool, Poolname, num_threads) def GetLocalMachinePool(Poolname=None, num_threads=None, is_global=False): @@ -243,14 +244,14 @@ def GetMultithreadingPool(Poolname=None, num_threads=None): '''Get or create a specific thread pool to execute threads in other processes on the same computer using the multiprocessing library ''' warnings.warn(DeprecationWarning("GetMultithreadingPool is deprecated. Use GetLocalMachinePool instead")) - return __CreatePool(nornir_pools.multiprocessthreadpool.MultiprocessThread_Pool, Poolname, num_threads) + return __CreatePool(nornir_pools.multiprocessthreadpool.MultiprocessThreadPool, Poolname, num_threads) def GetProcessPool(Poolname=None, num_threads=None): '''Get or create a specific pool to invoke shell command processes on the same computer using the subprocess module ''' warnings.warn(DeprecationWarning("GetProcessPool is deprecated. Use GetLocalMachinePool instead")) - return __CreatePool(nornir_pools.processpool.Process_Pool, Poolname, num_threads) + return __CreatePool(nornir_pools.processpool.ProcessPool, Poolname, num_threads) def GetParallelPythonPool(Poolname=None, num_threads=None): diff --git a/nornir_pools/multiprocessthreadpool.py b/nornir_pools/multiprocessthreadpool.py index 3f3f11d..231c57c 100644 --- a/nornir_pools/multiprocessthreadpool.py +++ b/nornir_pools/multiprocessthreadpool.py @@ -3,10 +3,11 @@ # Initially patterned from http://code.activestate.com/recipes/577187-python-thread-pool/ # Made awesomer by James Anderson # Made prettier by James Tucker - + import atexit import multiprocessing import tempfile +from typing import * import multiprocessing.pool import logging import nornir_pools.task @@ -14,30 +15,28 @@ import nornir_pools import cProfile -import os -#import time +import os +# import time from nornir_shared import prettyoutput -#from threading import Lock +# from threading import Lock _profiler = None # type: None | cProfile.Profile - + def _poolinit(profile_dir=None): - global _profiler _profiler = None - - + if profile_dir is not None: - assert(isinstance(profile_dir, str)) + assert (isinstance(profile_dir, str)) _profiler = cProfile.Profile() _profiler.enable() - + atexit.register(_processfinalizer, profile_dir) - + + def _processfinalizer(profile_dir): - global _profiler if _profiler is not None: _profiler.disable() @@ -45,7 +44,8 @@ def _processfinalizer(profile_dir): profile_fullpath = os.path.join(profile_dir, profile_filename) _profiler.dump_stats(profile_fullpath) _profiler = None - + + # # def _pickle_method(method): # func_name = method.__func__.__name__ @@ -78,7 +78,9 @@ def _set_daemon(self, value): pass daemon = property(_get_daemon, _set_daemon) -# + + +# # def run(self, *args, **kwargs): # ''' # Method to be run in sub-process; can be overridden in sub-class @@ -101,85 +103,87 @@ def _set_daemon(self, value): # # ''' # nornir_pools.end_profiling() # return super(NoDaemonProcess, self).terminate() - + class NonDaemonPool(multiprocessing.pool.Pool): - _root_profile_output_dir = None _instance_id = 0 - + @classmethod def _get_root_profile_output_path(cls): if cls._root_profile_output_dir is None: if os.path.isdir(os.environ['PROFILE']): - cls._root_profile_output_dir = os.environ['PROFILE'] + cls._root_profile_output_dir = os.environ['PROFILE'] else: cls._root_profile_output_dir = tempfile.mkdtemp() - + return cls._root_profile_output_dir - - def __init__(self,*args, **kwargs): - self.profile_dir = None - self.pool_name = str.format("pool-pid_{0}_instance_{1}", multiprocessing.current_process().pid, NonDaemonPool._instance_id) - - NonDaemonPool._instance_id = NonDaemonPool._instance_id + 1 - - #Create a directory to store profile data for each subprocess - if 'PROFILE' in os.environ: + + def __init__(self, *args, **kwargs): + self.profile_dir = None # type: str | None + self.pool_name = str.format("pool-pid_{0}_instance_{1}", multiprocessing.current_process().pid, + NonDaemonPool._instance_id) + + NonDaemonPool._instance_id = NonDaemonPool._instance_id + 1 + + # Create a directory to store profile data for each subprocess + if 'PROFILE' in os.environ: root_output_dir = NonDaemonPool._get_root_profile_output_path() self.profile_dir = os.path.join(root_output_dir, self.pool_name) - + os.makedirs(self.profile_dir, exist_ok=True) - + atexit.register(nornir_pools.MergeProfilerStats, root_output_dir, self.profile_dir, self.pool_name) - assert('initializer' not in kwargs) - + assert ('initializer' not in kwargs) + kwargs['initializer'] = _poolinit kwargs['initargs'] = [self.profile_dir] - - super(NonDaemonPool, self).__init__(*args, **kwargs) - - #def Process(self, *args, **kwds): + + super(NonDaemonPool, self).__init__(*args, **kwargs) + + # def Process(self, *args, **kwds): # return NoDaemonProcess(*args, **kwds) class MultiprocessThreadTask(nornir_pools.task.Task): - + @property def logger(self): return logging.getLogger(__name__) - + def callback(self, result): pass - #DecrementActiveJobCount() - #PrintJobsCount() + # DecrementActiveJobCount() + # PrintJobsCount() self.set_completion_time() - #self.logger.info("%s" % str(self.__str__())) - #nornir_pools._sprint("%s" % str(self.__str__())) - + # self.logger.info("%s" % str(self.__str__())) + # nornir_pools._sprint("%s" % str(self.__str__())) + def callbackontaskfail(self, result): - '''This is manually invoked by the task when a thread fails to complete''' - #DecrementActiveJobCount() - #PrintJobsCount() + """This is manually invoked by the task when a thread fails to complete""" + # DecrementActiveJobCount() + # PrintJobsCount() self.set_completion_time() def __init__(self, name, asyncresult, *args, **kwargs): super(MultiprocessThreadTask, self).__init__(name, *args, **kwargs) - #self.args = args - #self.kwargs = kwargs - self.asyncresult = asyncresult + # self.args = args + # self.kwargs = kwargs + self.asyncresult = asyncresult def wait_return(self): """Waits until the function has completed execution and returns the value returned by the function pointer""" retval = self.asyncresult.get() if self.asyncresult.successful(): - #self.logger.info("Multiprocess successful: " + self.name + '\nargs: ' + str(self.args) + "\nkwargs: " + str(self.kwargs) + # self.logger.info("Multiprocess successful: " + self.name + '\nargs: ' + str(self.args) + "\nkwargs: " + str(self.kwargs) return retval else: - self.logger.error("Multiprocess call not successful: " + self.name + '\nargs: ' + str(self.args) + "\nkwargs: " + str(self.kwargs)) - #self.callbackontaskfail(self) This is called by the get() function above + self.logger.error( + "Multiprocess call not successful: " + self.name + '\nargs: ' + str(self.args) + "\nkwargs: " + str( + self.kwargs)) + # self.callbackontaskfail(self) This is called by the get() function above return None def wait(self): @@ -190,156 +194,165 @@ def wait(self): if self.asyncresult.successful(): return else: - self.logger.error("Multiprocess call not successful: " + self.name + '\nargs: ' + str(self.args) + "\nkwargs: " + str(self.kwargs)) - #self.callbackontaskfail(self) + self.logger.error( + "Multiprocess call not successful: " + self.name + '\nargs: ' + str(self.args) + "\nkwargs: " + str( + self.kwargs)) + # self.callbackontaskfail(self) self.asyncresult.get() # This should cause the original exception to be raised according to multiprocess documentation and trigger the error callback as well return None - + @property - def iscompleted(self): + def iscompleted(self) -> bool: return self.asyncresult.ready() -class MultiprocessThread_Pool(nornir_pools.poolbase.PoolBase): - +class MultiprocessThreadPool(nornir_pools.poolbase.PoolBase): """Pool of threads consuming tasks from a queue""" + def add_process(self, name, func, *args, **kwargs): + raise NotImplemented() + @property def tasks(self): if self._tasks is None: self._tasks = NonDaemonPool(maxtasksperchild=self._maxtasksperchild, processes=self._num_processes) return self._tasks - + @property def num_active_tasks(self): return len(self._active_tasks) - def __init__(self, num_threads=None, maxtasksperchild=None, *args, **kwargs): + def __init__(self, num_threads: int | None = None, maxtasksperchild: int | None = None, *args, **kwargs): self._tasks = None - + num_threads = nornir_pools.ApplyOSThreadLimit(num_threads) - - self._num_processes = num_threads + + self._num_processes = num_threads self._maxtasksperchild = maxtasksperchild - self._active_tasks = {} # A list of incomplete AsyncResults - - super(MultiprocessThread_Pool, self).__init__(*args, **kwargs) - + # A list of incomplete AsyncResults + self._active_tasks = {} # type : Dict[int, MultiprocessThreadTask] + + super(MultiprocessThreadPool, self).__init__(*args, **kwargs) def shutdown(self): if hasattr(self, 'tasks'): self.tasks.close() self.tasks.join() self.wait_completion() - - assert(len(self._active_tasks) == 0) + + assert (len(self._active_tasks) == 0) self._tasks = None nornir_pools._remove_pool(self) - - def callback_wrapper(self, task_id, callback_func): + + def callback_wrapper(self, task_id: int, callback_func: Callable): def wrapper_function(result): - #if isinstance(retval_task, multiprocessing.pool.AsyncResult): + # if isinstance(retval_task, multiprocessing.pool.AsyncResult): # task_id = result._nornir_task_id_ # if not result._nornir_task_id_ in self._active_tasks: # raise ValueError("Unexpected result received") - + # del self._active_tasks[task_id] if not task_id in self._active_tasks: - raise ValueError("Task {0} not listed in active tasks, but a result was received in pool {1}...".format(task_id, str(self))) - + raise ValueError( + "Task {0} not listed in active tasks, but a result was received in pool {1}...".format(task_id, + str(self))) + del self._active_tasks[task_id] - #print("Delete task {0}".format(task_id)) - - - #else: Errors return an exception, which we can't easily trace back to a task + # print("Delete task {0}".format(task_id)) + + # else: Errors return an exception, which we can't easily trace back to a task self.TryReportActiveTaskCount() return callback_func(result) - + return wrapper_function - - def add_task(self, name, func, *args, **kwargs): + + def add_task(self, name: str, func: Callable, *args, **kwargs): """Add a task to the queue""" if func is None: prettyoutput.LogErr("Multiprocess pool add task {0} called with 'None' as function".format(name)) - if callable(func) == False: - prettyoutput.LogErr("Multiprocess pool add task {0} parameter was non-callable value {1} when it should be passed a function".format(name, func)) - - assert(callable(func)) - - # I've seen an issue here were apply_async prints an exception about not being able to import a module. It then swallows the exception. + if not callable(func): + prettyoutput.LogErr( + "Multiprocess pool add task {0} parameter was non-callable value {1} when it should be passed a function".format( + name, func)) + + assert (callable(func)) + + # I've seen an issue here were apply_async prints an exception about not being able to import a module. It then swallows the exception. # The returned task seems valid and not complete, but the MultiprocessThreadTask's event is never set because the callback isn't used. # This hangs the caller if they wait on the task. - + retval_task = MultiprocessThreadTask(name, None, args, kwargs) - retval_task.asyncresult = self.tasks.apply_async(func, args, kwargs, - callback=self.callback_wrapper(retval_task.task_id, retval_task.callback), - error_callback=self.callback_wrapper(retval_task.task_id, retval_task.callbackontaskfail)) + retval_task.asyncresult = self.tasks.apply_async(func, args, kwargs, + callback=self.callback_wrapper(retval_task.task_id, + retval_task.callback), + error_callback=self.callback_wrapper(retval_task.task_id, + retval_task.callbackontaskfail)) if retval_task.asyncresult is None: raise ValueError("apply_async returned None instead of an asyncresult object") - + retval_task.asyncresult._nornir_task_id_ = retval_task.task_id self._active_tasks[retval_task.task_id] = retval_task - #print("Added task #{0}".format(retval_task.task_id)) - + # print("Added task #{0}".format(retval_task.task_id)) + self.TryReportActiveTaskCount() - + return retval_task -# -# def starmap(self, name, func, iterable, chunksize=None): -# -# """Add a task to the queue""" -# -# -# # I've seen an issue here were apply_async prints an exception about not being able to import a module. It then swallows the exception. -# # The returned task seems valid and not complete, but the MultiprocessThreadTask's event is never set because the callback isn't used. -# # This hangs the caller if they wait on the task. -# -# retval_task = MultiprocessThreadTask(name, None) -# retval_task.asyncresult = self.tasks.starmap(func, iterable, chunksize=chunksize, -# callback=self.callback_wrapper(retval_task.task_id, retval_task.callback), -# error_callback=self.callback_wrapper(retval_task.task_id, retval_task.callbackontaskfail)) -# if retval_task.asyncresult is None: -# raise ValueError("starmap_async returned None instead of an asyncresult object") -# -# retval_task.asyncresult._nornir_task_id_ = retval_task.task_id -# self._active_tasks[retval_task.task_id] = retval_task -# #print("Added task #{0}".format(retval_task.task_id)) -# -# return retval_task -# -# -# def starmap_async(self, name, func, iterable, chunksize=None): -# -# """Add a task to the queue""" -# -# -# # I've seen an issue here were apply_async prints an exception about not being able to import a module. It then swallows the exception. -# # The returned task seems valid and not complete, but the MultiprocessThreadTask's event is never set because the callback isn't used. -# # This hangs the caller if they wait on the task. -# -# retval_task = MultiprocessThreadTask(name, None) -# retval_task.asyncresult = self.tasks.starmap_async(func, iterable, chunksize=chunksize, -# callback=self.callback_wrapper(retval_task.task_id, retval_task.callback), -# error_callback=self.callback_wrapper(retval_task.task_id, retval_task.callbackontaskfail)) -# if retval_task.asyncresult is None: -# raise ValueError("starmap_async returned None instead of an asyncresult object") -# -# retval_task.asyncresult._nornir_task_id_ = retval_task.task_id -# self._active_tasks[retval_task.task_id] = retval_task -# #print("Added task #{0}".format(retval_task.task_id)) -# -# return retval_task -# + + # + # def starmap(self, name, func, iterable, chunksize=None): + # + # """Add a task to the queue""" + # + # + # # I've seen an issue here were apply_async prints an exception about not being able to import a module. It then swallows the exception. + # # The returned task seems valid and not complete, but the MultiprocessThreadTask's event is never set because the callback isn't used. + # # This hangs the caller if they wait on the task. + # + # retval_task = MultiprocessThreadTask(name, None) + # retval_task.asyncresult = self.tasks.starmap(func, iterable, chunksize=chunksize, + # callback=self.callback_wrapper(retval_task.task_id, retval_task.callback), + # error_callback=self.callback_wrapper(retval_task.task_id, retval_task.callbackontaskfail)) + # if retval_task.asyncresult is None: + # raise ValueError("starmap_async returned None instead of an asyncresult object") + # + # retval_task.asyncresult._nornir_task_id_ = retval_task.task_id + # self._active_tasks[retval_task.task_id] = retval_task + # #print("Added task #{0}".format(retval_task.task_id)) + # + # return retval_task + # + # + # def starmap_async(self, name, func, iterable, chunksize=None): + # + # """Add a task to the queue""" + # + # + # # I've seen an issue here were apply_async prints an exception about not being able to import a module. It then swallows the exception. + # # The returned task seems valid and not complete, but the MultiprocessThreadTask's event is never set because the callback isn't used. + # # This hangs the caller if they wait on the task. + # + # retval_task = MultiprocessThreadTask(name, None) + # retval_task.asyncresult = self.tasks.starmap_async(func, iterable, chunksize=chunksize, + # callback=self.callback_wrapper(retval_task.task_id, retval_task.callback), + # error_callback=self.callback_wrapper(retval_task.task_id, retval_task.callbackontaskfail)) + # if retval_task.asyncresult is None: + # raise ValueError("starmap_async returned None instead of an asyncresult object") + # + # retval_task.asyncresult._nornir_task_id_ = retval_task.task_id + # self._active_tasks[retval_task.task_id] = retval_task + # #print("Added task #{0}".format(retval_task.task_id)) + # + # return retval_task + # def wait_completion(self): """Wait for completion of all the tasks in the queue""" while len(self._active_tasks) > 0: (task_id, task) = self._active_tasks.popitem() - self._active_tasks[task_id] = task #Re-add the item to the _active_tasks so the callback can find it and remove it as expected - task.wait() #use wait to ensure any exceptions are thrown - - + self._active_tasks[ + task_id] = task # Re-add the item to the _active_tasks so the callback can find it and remove it as expected + task.wait() # use wait to ensure any exceptions are thrown diff --git a/nornir_pools/poolbase.py b/nornir_pools/poolbase.py index 47a62bd..9c31b53 100644 --- a/nornir_pools/poolbase.py +++ b/nornir_pools/poolbase.py @@ -1,5 +1,5 @@ -from six.moves import queue -import abc +import queue +from abc import * import threading import time import multiprocessing @@ -7,21 +7,22 @@ import nornir_pools -class PoolBase(object): +class PoolBase(ABC): ''' Pool objects provide the interface to create tasks on the pool. ''' @property - def name(self): + def name(self) -> str: return self._name @name.setter - def name(self, val): + def name(self, val: str): self._name = val @property - def num_active_tasks(self): + @abstractmethod + def num_active_tasks(self) -> int: raise NotImplementedError() @property @@ -34,12 +35,14 @@ def logger(self): def __str__(self): return "Pool {0} with {1} active tasks".format(self.name, self.num_active_tasks) + @abstractmethod def shutdown(self): ''' The pool waits for all tasks to complete and frees any resources such as threads in a thread pool ''' raise NotImplementedError() + @abstractmethod def wait_completion(self): ''' Blocks until all tasks have completed @@ -54,6 +57,7 @@ def __init__(self, *args, **kwargs): self.job_report_interval_in_seconds = kwargs.get("job_report_interval", 10.0) self._last_num_active_tasks = 0 + @abstractmethod def add_task(self, name, func, *args, **kwargs): ''' Call a python function on the pool @@ -66,6 +70,7 @@ def add_task(self, name, func, *args, **kwargs): ''' raise NotImplementedError() + @abstractmethod def add_process(self, name, func, *args, **kwargs): ''' Invoke a process on the pool. This function creates a task using name and then invokes pythons subprocess @@ -99,13 +104,13 @@ def PrintActiveTaskCount(self): return -class LocalThreadPoolBase(abc.ABC, PoolBase): +class LocalThreadPoolBase(PoolBase, ABC): '''Base class for pools that rely on local threads and a queue to dispatch jobs''' WorkerCheckInterval = 0.5 # How often workers check for new jobs in the queue @property - def num_active_tasks(self): + def num_active_tasks(self) -> int: return self.tasks.qsize() def __init__(self, *args, **kwargs): @@ -147,13 +152,13 @@ def shutdown(self): time.sleep(self.WorkerCheckInterval + 1) del self._threads - # + @abstractmethod def add_worker_thread(self): raise NotImplementedError("add_worker_thread must be implemented by derived class and return a thread object") def add_threads_if_needed(self): - assert (False == self.shutdown_event.is_set()) + assert (self.shutdown_event.is_set() is False) self.remove_finished_threads() num_active_threads = len(self._threads) @@ -188,8 +193,8 @@ def remove_finished_threads(self): if t == self._threads[i]: del self._threads[i] break - except queue.Empty as e: - return + except queue.Empty: + pass return diff --git a/nornir_pools/processpool.py b/nornir_pools/processpool.py index 8db8d32..7a308d0 100644 --- a/nornir_pools/processpool.py +++ b/nornir_pools/processpool.py @@ -1,10 +1,11 @@ # threadpool.py - +import queue # Initially patterned from http://code.activestate.com/recipes/577187-python-thread-pool/ # Made awesomer by James Anderson # Made prettier by James Tucker -import math +from typing import * +import math import sys import threading import time @@ -13,7 +14,7 @@ #import logging import nornir_pools -from . import poolbase +from nornir_pools import poolbase from . import task from nornir_shared import prettyoutput @@ -22,11 +23,13 @@ class ImmediateProcessTask(task.TaskWithEvent): '''Launches processes without threads''' - def __init__(self, name, func, *args, **kwargs): + def __init__(self, name: str, func: str, *args, **kwargs): super(ImmediateProcessTask, self).__init__(name, *args, **kwargs) - self.proc = None - self.cmd = func - self.returned_value = None + self.proc = None # type: subprocess.Popen | None + self.cmd = func # type: str + self.returned_value = None # type: Any + self.stdoutdata: str + self.stderrdata: str self.Run() def Run(self): @@ -64,13 +67,14 @@ def _handle_proc_completion(self): self.set_completion_time() self.completed.set() - def wait_return(self): + def wait_return(self) -> str: self.wait() return self.stdoutdata + class ProcessTask(task.TaskWithEvent): - def __init__(self, name, func, *args, **kwargs): + def __init__(self, name: str, func: Callable, *args, **kwargs): super(ProcessTask, self).__init__(name, *args, **kwargs) self.cmd = func @@ -92,7 +96,12 @@ class Worker(threading.Thread): """Thread executing tasks from a given tasks queue""" - def __init__(self, tasks, deadthreadqueue, shutdown_event, queue_wait_time, **kwargs): + def __init__(self, + tasks: queue.Queue, + deadthreadqueue: queue.Queue, + shutdown_event: threading.Event, + queue_wait_time: float, + **kwargs): threading.Thread.__init__(self, **kwargs) self.tasks = tasks @@ -191,17 +200,19 @@ def run(self): self.tasks.task_done() -class Process_Pool(poolbase.LocalThreadPoolBase): +class ProcessPool(poolbase.LocalThreadPoolBase): """Pool of threads consuming tasks from a queue""" - + + def add_task(self, name:str, func: Callable, *args, **kwargs): + self.add_process(name, func, *args, **kwargs) def __init__(self, num_threads=None, WorkerCheckInterval = 0.5): ''' :param int num_threads: Maximum number of threads in the pool :param float WorkerCheckInterval: How long worker threads wait for tasks before shutting down ''' - super(Process_Pool, self).__init__(num_threads=num_threads, WorkerCheckInterval=WorkerCheckInterval) + super(ProcessPool, self).__init__(num_threads=num_threads, WorkerCheckInterval=WorkerCheckInterval) self._next_thread_id = 0 #self.logger.warn("Creating Process Pool") @@ -214,33 +225,38 @@ def add_worker_thread(self): return w - def add_process(self, name, func, *args, **kwargs): - """Add a task to the queue, args are passed directly to subprocess.Popen""" - if isinstance(func, str): - pass - elif func is None: - prettyoutput.LogErr("Process pool add task {0} called with 'None' as function".format(name)) - elif callable(func) == False: - prettyoutput.LogErr("Process pool add task {0} parameter was non-callable value {1} when it should be passed a function".format(name, func)) - - assert(isinstance(func, str) or callable(func)) + def add_process(self, name: str, func: Callable | str, *args, **kwargs): + """ + Add a task to the queue, args are passed directly to subprocess.Popen + :param name: The name of the task + :param func: If a string is passed a + process is started and the string is executed as a console command. If a callable is passed the multiprocess + is used to invoke the function in another process. + """ + # keep_alive_thread is a non-daemon thread started when the queue is non-empty. # Python will not shut down while non-daemon threads are alive. When the queue empties the thread exits. # When items are added to the queue we create a new keep_alive_thread as needed if isinstance(kwargs, dict): - if not 'shell' in kwargs: + if 'shell' not in kwargs: kwargs['shell'] = True else: kwargs = {'shell': True} - if func is str: + if isinstance(func, str): entry = ImmediateProcessTask(name, func, *args, **kwargs) - else: + elif callable(func): entry = ProcessTask(name, func, *args, **kwargs) + elif func is None: + info = f"Process pool add task {name} called with 'None' as function" + prettyoutput.LogErr(info) + raise ValueError(info) + else: + info = f"Process pool add task {name} parameter was Non-callable and non-string value {func}" + prettyoutput.LogErr(info) + raise ValueError(info) self.tasks.put(entry) self.add_threads_if_needed() - - return entry diff --git a/nornir_pools/serialpool.py b/nornir_pools/serialpool.py index 1086aa3..876f6ee 100644 --- a/nornir_pools/serialpool.py +++ b/nornir_pools/serialpool.py @@ -4,6 +4,7 @@ @author: u0490822 ''' +from typing import Callable import subprocess import nornir_pools from . import poolbase @@ -40,12 +41,12 @@ def __init__(self, num_threads, *args, **kwargs): super(SerialPool, self).__init__(*args, **kwargs) - def add_task(self, name, func, *args, **kwargs): + def add_task(self, name: str, func: Callable, *args, **kwargs): retval = func(*args, **kwargs) return nornir_pools.task.SerialTask(name, retval, *args, **kwargs) - def add_process(self, name, func, *args, **kwargs): + def add_process(self, name: str, func: Callable, *args, **kwargs): SingleParameterProc = subprocess.Popen(func + " && exit", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) entry = nornir_pools.processpool.ProcessTask(name, func, *args, **kwargs) diff --git a/nornir_pools/task.py b/nornir_pools/task.py index 6ac944a..c32dc0a 100644 --- a/nornir_pools/task.py +++ b/nornir_pools/task.py @@ -1,30 +1,33 @@ +from abc import * +import datetime import threading import time import math +from typing import Any -class Task(object): +class Task(ABC): ''' Represents a task assigned to a pool. Responsible for allowing the caller to wait for task completion, raising any exceptions, and returning data from the call. Task objects are created by adding tasks or processes to the pools. They are not intended to be created directly by callers. ''' - _NextID = 0 + _NextID = 0 # type: int _IDLock = threading.Lock() @property - def task_id(self): + def task_id(self) -> int: return self.__task_id__ @classmethod - def GenerateID(cls): + def GenerateID(cls) -> int: with cls._IDLock: _id = cls._NextID cls._NextID = cls._NextID + 1 return _id @property - def elapsed_time(self): + def elapsed_time(self) -> datetime.datetime: endtime = self.task_end_time if endtime is None: endtime = time.time() @@ -32,7 +35,7 @@ def elapsed_time(self): return endtime - self.task_start_time @property - def elapsed_time_str(self): + def elapsed_time_str(self) -> str: t_delta = self.elapsed_time seconds = math.fmod(t_delta, 60) @@ -62,6 +65,7 @@ def __str__(self): out_string += time_str return out_string + @abstractmethod def wait(self): ''' Wait for task to complete, does not return a value @@ -71,7 +75,8 @@ def wait(self): raise NotImplementedError() - def wait_return(self): + @abstractmethod + def wait_return(self) -> Any: ''' Wait for task to complete and return the value @@ -81,8 +86,8 @@ def wait_return(self): ''' raise Exception("Not implemented") - @property - def iscompleted(self): + @abstractmethod + def iscompleted(self) -> bool: ''' Non-blocking test to determine if task has completed. No exception is raised if the task raised an exception during execution until wait or wait_return is called. @@ -101,7 +106,7 @@ def __hash__(self): return self.__task_id__ -class TaskWithEvent(Task): +class TaskWithEvent(Task, ABC): ''' Task object with built-in event for completion ''' @@ -112,7 +117,7 @@ def __init__(self, name, *args, **kwargs): self.returncode = 0 @property - def iscompleted(self): + def iscompleted(self) -> bool: ''' Non-blocking test to determine if task has completed. No exception is raised if the task raised an exception during execution until wait or wait_return is called. @@ -128,13 +133,13 @@ def wait(self): class SerialTask(Task): '''Used for debugging and profiling. Returns a task object but the function has been run serially.''' - def __init__(self, name, retval, *args, **kwargs): + def __init__(self, name: str, retval: Any, *args, **kwargs): super(SerialTask, self).__init__(name, *args, **kwargs) self._retval = retval - self.returncode = 0 + self.returncode = 0 # type: int @property - def iscompleted(self): + def iscompleted(self) -> bool: return True def wait(self): diff --git a/nornir_pools/threadpool.py b/nornir_pools/threadpool.py index 128c457..11df8ea 100644 --- a/nornir_pools/threadpool.py +++ b/nornir_pools/threadpool.py @@ -4,13 +4,14 @@ # Made awesomer by James Anderson # Made prettier by James Tucker -import math +from typing import * +import math import threading import time -import traceback +#import traceback import queue -import os -#import logging +# import os +# import logging import nornir_pools.task as task @@ -22,23 +23,22 @@ class ThreadTask(task.TaskWithEvent): @property - def exception(self): + def exception(self) -> Exception: return self._exception @exception.setter - def exception(self, val): + def exception(self, val: Exception): self._exception = val - - def __init__(self, name, func, *args, **kwargs): + def __init__(self, name: str, func: Callable, *args, **kwargs): self.func = func # Function to be called when Task is removed from queue self.returned_value = None # The value returned by the executing function - self._exception = None + self._exception = None # type: Exception | None super(ThreadTask, self).__init__(name, *args, **kwargs) - def wait_return(self): + def wait_return(self) -> Any: """Waits until the function has completed execution and returns the value returned by the function pointer""" self.wait() @@ -61,25 +61,28 @@ def wait(self): class Worker(threading.Thread): - """Thread executing tasks from a given tasks queue""" - - def __init__(self, tasks, deadthreadqueue, shutdown_event, queue_wait_time, **kwargs): + def __init__(self, + tasks: queue.Queue, + deadthreadqueue: queue.Queue, + shutdown_event: threading.Event, + queue_wait_time: float, + **kwargs): threading.Thread.__init__(self, **kwargs) self.tasks = tasks self.deadthreadqueue = deadthreadqueue self.shutdown_event = shutdown_event self.daemon = True - + if queue_wait_time is None: queue_wait_time = 5 - + self.queue_wait_time = queue_wait_time # self.logger = logging.getLogger(__name__) self.start() - + @staticmethod def generate_elapsed_time_str(task_name, t_delta): '''Generate a string describing how long a task took to complete''' @@ -88,36 +91,37 @@ def generate_elapsed_time_str(task_name, t_delta): seconds = math.fmod(t_delta, 60) seconds_str = "%02.5g" % seconds time_str = str(time.strftime('%H:%M:', time.gmtime(t_delta))) + seconds_str - + out_string = "--- {0}".format(task_name) out_string += " " * (time_position - len(out_string)) out_string += time_str - + return out_string - + def run(self): - + while True: # Get next task from the queue (blocks thread if queue is empty until entry arrives) + # Wait five seconds for a new entry in the queue and check if we should shutdown if nothing shows up try: - entry = self.tasks.get(True, self.queue_wait_time) # Wait five seconds for a new entry in the queue and check if we should shutdown if nothing shows up - except queue.Empty as e: + entry = self.tasks.get(True, + self.queue_wait_time) # type: ThreadTask + + except queue.Empty: # Check if we should kill the thread - if(self.shutdown_event.is_set()): - #nornir_pools._sprint("Queue Empty, exiting worker thread") + if self.shutdown_event.is_set(): + # nornir_pools._sprint("Queue Empty, exiting worker thread") self.deadthreadqueue.put(self) return else: self.deadthreadqueue.put(self) - #nornir_pools._sprint("Thread #%d idle shutdown" % (self.ident)) + # nornir_pools._sprint("Thread #%d idle shutdown" % (self.ident)) return - - continue - + # Record start time so we get a sense of performance - #task_start_time = time.time() + # task_start_time = time.time() # print notification @@ -126,7 +130,7 @@ def run(self): # _sprint("+") # do it! - + original_thread_name = self.name self.name = entry.name @@ -154,76 +158,81 @@ def run(self): # calculate finishing time and mark task as completed - #task_end_time = time.time() + # task_end_time = time.time() # mark the object event as completed entry.completed.set() # print the completion notice with times aligned - #t_delta = task_end_time - task_start_time - #out_string = generate_elapsed_time_str(entry.name, t_delta) - #self.logger.info(out_string) -# ######### -# JobsQueued = self.tasks.qsize() -# if JobsQueued > 0: -# -# JobQText = "Jobs Queued: " + str(self.tasks.qsize()) -# JobQText = ('\b' * 40) + JobQText + (' ' * (40 - len(JobQText))) -# nornir_pools._PrintProgressUpdate(JobQText) -########### + # t_delta = task_end_time - task_start_time + # out_string = generate_elapsed_time_str(entry.name, t_delta) + # self.logger.info(out_string) + # ######### + # JobsQueued = self.tasks.qsize() + # if JobsQueued > 0: + # + # JobQText = "Jobs Queued: " + str(self.tasks.qsize()) + # JobQText = ('\b' * 40) + JobQText + (' ' * (40 - len(JobQText))) + # nornir_pools._PrintProgressUpdate(JobQText) + ########### self.tasks.task_done() self.name = original_thread_name -class Thread_Pool(poolbase.LocalThreadPoolBase): - +class ThreadPool(poolbase.LocalThreadPoolBase): """Pool of threads consuming tasks from a queue""" - #How often workers check for new jobs in the queue - def __init__(self, num_threads=None, WorkerCheckInterval = None, *args, **kwargs): + def add_process(self, name, func, *args, **kwargs): + raise NotImplemented() + + # How often workers check for new jobs in the queue + + def __init__(self, + num_threads: int | None = None, + WorkerCheckInterval=None, *args, **kwargs): ''' :param int num_threads: Maximum number of threads in the pool :param float WorkerCheckInterval: How long worker threads wait for tasks before shutting down ''' - + num_threads = nornir_pools.ApplyOSThreadLimit(num_threads) - - super(Thread_Pool, self).__init__(num_threads=num_threads, WorkerCheckInterval=WorkerCheckInterval, *args, **kwargs) + + super(ThreadPool, self).__init__(num_threads=num_threads, WorkerCheckInterval=WorkerCheckInterval, *args, + **kwargs) self._next_thread_id = 0 - self.logger.info("Creating Thread Pool") + self.logger.info("Creating Thread Pool") return - def add_worker_thread(self): - assert(False == self.shutdown_event.is_set()) - - worker_name = "Thread pool #%d" % (self._next_thread_id) - w = Worker(self.tasks, self.deadthreadqueue, self.shutdown_event, self.WorkerCheckInterval, name=worker_name) + def add_worker_thread(self) -> Worker: + assert ( self.shutdown_event.is_set() is False) + + worker_name = "Thread pool #%d" % self._next_thread_id + w = Worker(self.tasks, self.deadthreadqueue, self.shutdown_event, self.WorkerCheckInterval, name=worker_name) self._next_thread_id += 1 return w - def add_task(self, name, func, *args, **kwargs): + def add_task(self, name, func, *args, **kwargs) -> ThreadTask: if func is None: prettyoutput.LogErr("Thread pool add task {0} called with 'None' as function".format(name)) - if callable(func) == False: - prettyoutput.LogErr("Thread pool add task {0} parameter was non-callable value {1} when it should be passed a function".format(name, func)) - - assert(callable(func)) + if not callable(func): + prettyoutput.LogErr( + "Thread pool add task {0} parameter was non-callable value {1} when it should be passed a function".format( + name, func)) + + assert (callable(func)) """Add a task to the queue""" - assert(False == self.shutdown_event.is_set()) - + assert (self.shutdown_event.is_set() is False) + # keep_alive_thread is a non-daemon thread started when the queue is non-empty. # Python will not shut down while non-daemon threads are alive. When the queue empties the thread exits. # When items are added to the queue we create a new keep_alive_thread as needed - entry = ThreadTask(name, func, *args, **kwargs) self.tasks.put(entry) self.add_threads_if_needed() - return entry - From 5431d80c9d68918c263ebfd31915d2185fd7703e Mon Sep 17 00:00:00 2001 From: James Anderson Date: Thu, 29 Sep 2022 17:16:10 -0700 Subject: [PATCH 17/21] Added Task to nornir_pools --- nornir_pools/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nornir_pools/__init__.py b/nornir_pools/__init__.py index 6591484..2dca600 100644 --- a/nornir_pools/__init__.py +++ b/nornir_pools/__init__.py @@ -84,6 +84,8 @@ import nornir_pools.poolbase as poolbase import nornir_pools.task as task +from nornir_pools.task import Task + import nornir_pools.processpool as processpool import nornir_pools.threadpool as threadpool import nornir_pools.multiprocessthreadpool as multiprocessthreadpool From 19968f3ab1b87750350d19c225a1f854a55a1a6c Mon Sep 17 00:00:00 2001 From: James Anderson Date: Mon, 17 Oct 2022 13:33:05 -0700 Subject: [PATCH 18/21] Fixed issue with self._threads being deleted and then expected during Shutdown() in ProcessPool --- nornir_pools/poolbase.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nornir_pools/poolbase.py b/nornir_pools/poolbase.py index 9c31b53..218efb5 100644 --- a/nornir_pools/poolbase.py +++ b/nornir_pools/poolbase.py @@ -150,7 +150,7 @@ def shutdown(self): # Give threads time to die gracefully time.sleep(self.WorkerCheckInterval + 1) - del self._threads + self._threads.clear() @abstractmethod def add_worker_thread(self): From 3f68e4102d16f7a519203639e2262ff0737b52ad Mon Sep 17 00:00:00 2001 From: James Anderson Date: Thu, 27 Oct 2022 14:58:29 -0700 Subject: [PATCH 19/21] Optimized local thread pool shutdown waits --- nornir_pools/poolbase.py | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/nornir_pools/poolbase.py b/nornir_pools/poolbase.py index 218efb5..ec04725 100644 --- a/nornir_pools/poolbase.py +++ b/nornir_pools/poolbase.py @@ -1,3 +1,4 @@ +import atexit import queue from abc import * import threading @@ -103,21 +104,40 @@ def PrintActiveTaskCount(self): nornir_pools._PrintProgressUpdate(JobQText) return - class LocalThreadPoolBase(PoolBase, ABC): '''Base class for pools that rely on local threads and a queue to dispatch jobs''' - WorkerCheckInterval = 0.5 # How often workers check for new jobs in the queue - + WorkerCheckInterval = 1 # How often workers check for events to end themselves if there are no queue events + AtExitRegisteredWaitTime = 0 # How long we will wait atexit to ensure threads have time to shutdown. Should be the max WorkerCheckInterval of any Threadpool started. + AtExitLock = threading.Lock() + @property def num_active_tasks(self) -> int: return self.tasks.qsize() + + @classmethod + def TryRegisterAtExit(cls, wait_time: float): + """Register a wait atexit so we don't leave threads alive when the program exits and get an error message""" + if cls.AtExitRegisteredWaitTime >= wait_time: + return + + try: + if cls.AtExitLock.acquire(): + if cls.AtExitRegisteredWaitTime >= wait_time: + return + + atexit.register(time.sleep, wait_time) + cls.AtExitRegisteredWaitTime = wait_time + finally: + cls.AtExitLock.release() def __init__(self, *args, **kwargs): ''' :param int num_threads: number of threads, defaults to number of cores installed on system ''' super(LocalThreadPoolBase, self).__init__(*args, **kwargs) + + self.deadthreadqueue = queue.Queue() # Threads put themselves here when they die self.shutdown_event = threading.Event() @@ -127,7 +147,9 @@ def __init__(self, *args, **kwargs): self.WorkerCheckInterval = kwargs.get('WorkerCheckInterval', None) if self.WorkerCheckInterval is None: - self.WorkerCheckInterval = 2 + self.WorkerCheckInterval = LocalThreadPoolBase.WorkerCheckInterval + + LocalThreadPoolBase.TryRegisterAtExit(self.WorkerCheckInterval * 1.25) self._max_threads = kwargs.get('num_threads', multiprocessing.cpu_count()) @@ -148,8 +170,8 @@ def shutdown(self): nornir_pools._remove_pool(self) - # Give threads time to die gracefully - time.sleep(self.WorkerCheckInterval + 1) + # Give threads time to check for new work and die gracefully + #time.sleep(self.WorkerCheckInterval * 1.5) self._threads.clear() @abstractmethod From 9315ca032298aef743f6b05f38ab90d566bebcd0 Mon Sep 17 00:00:00 2001 From: James Anderson Date: Thu, 27 Oct 2022 16:49:14 -0700 Subject: [PATCH 20/21] Tidying --- nornir_pools/poolbase.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/nornir_pools/poolbase.py b/nornir_pools/poolbase.py index ec04725..51d7662 100644 --- a/nornir_pools/poolbase.py +++ b/nornir_pools/poolbase.py @@ -136,9 +136,7 @@ def __init__(self, *args, **kwargs): :param int num_threads: number of threads, defaults to number of cores installed on system ''' super(LocalThreadPoolBase, self).__init__(*args, **kwargs) - - - + self.deadthreadqueue = queue.Queue() # Threads put themselves here when they die self.shutdown_event = threading.Event() self.shutdown_event.clear() @@ -169,9 +167,8 @@ def shutdown(self): self.shutdown_event.set() nornir_pools._remove_pool(self) - - # Give threads time to check for new work and die gracefully - #time.sleep(self.WorkerCheckInterval * 1.5) + + #The static atexit method gives threads time to die gracefully self._threads.clear() @abstractmethod From c465c92c7d7a11efd934f65af9f47a349d6dfebb Mon Sep 17 00:00:00 2001 From: James Anderson Date: Thu, 19 Jan 2023 11:39:04 -0800 Subject: [PATCH 21/21] Added IPool interface Improved typing information --- nornir_pools/__init__.py | 59 +++++++++++++++++++++---------------- nornir_pools/ipool.py | 55 ++++++++++++++++++++++++++++++++++ nornir_pools/poolbase.py | 48 ++---------------------------- nornir_pools/processpool.py | 2 +- 4 files changed, 91 insertions(+), 73 deletions(-) create mode 100644 nornir_pools/ipool.py diff --git a/nornir_pools/__init__.py b/nornir_pools/__init__.py index 2dca600..49075e2 100644 --- a/nornir_pools/__init__.py +++ b/nornir_pools/__init__.py @@ -82,6 +82,10 @@ import glob import logging +from typing import Callable + +import nornir_pools.ipool as ipool +from nornir_pools.ipool import IPool import nornir_pools.poolbase as poolbase import nornir_pools.task as task from nornir_pools.task import Task @@ -140,7 +144,10 @@ def ApplyOSThreadLimit(num_threads): __pool_management_lock = threading.RLock() -def __CreatePool(poolclass, Poolname=None, num_threads=None, *args, **kwargs): +def __CreatePool(poolclass: Callable[[int, list | None, dict | None], IPool], + Poolname: str | None = None, + num_threads: int | None = None, + *args, **kwargs) -> IPool: global dictKnownPools global __pool_management_lock @@ -179,7 +186,7 @@ def WaitOnAllPools(): pool.wait_completion() -def _remove_pool(p): +def _remove_pool(p: str | IPool): '''Called from pool shutdown implementations to remove the pool from the map of existing pools''' global dictKnownPools global __pool_management_lock @@ -231,38 +238,38 @@ def ClosePools(): pool_items = list(dictKnownPools.items()) -def GetThreadPool(Poolname=None, num_threads=None): +def GetThreadPool(Poolname: str | None = None, num_threads: int | None = None) -> IPool: ''' Get or create a specific thread pool using vanilla python threads ''' return __CreatePool(nornir_pools.threadpool.ThreadPool, Poolname, num_threads) -def GetLocalMachinePool(Poolname=None, num_threads=None, is_global=False): +def GetLocalMachinePool(Poolname: str | None = None, num_threads: int | None = None, is_global=False) -> IPool: return __CreatePool(nornir_pools.local_machine_pool.LocalMachinePool, Poolname, num_threads, is_global=is_global) -def GetMultithreadingPool(Poolname=None, num_threads=None): +def GetMultithreadingPool(Poolname: str | None = None, num_threads: int | None = None) -> IPool: '''Get or create a specific thread pool to execute threads in other processes on the same computer using the multiprocessing library ''' warnings.warn(DeprecationWarning("GetMultithreadingPool is deprecated. Use GetLocalMachinePool instead")) return __CreatePool(nornir_pools.multiprocessthreadpool.MultiprocessThreadPool, Poolname, num_threads) -def GetProcessPool(Poolname=None, num_threads=None): +def GetProcessPool(Poolname: str | None = None, num_threads: int | None = None) -> processpool.ProcessPool: '''Get or create a specific pool to invoke shell command processes on the same computer using the subprocess module ''' warnings.warn(DeprecationWarning("GetProcessPool is deprecated. Use GetLocalMachinePool instead")) return __CreatePool(nornir_pools.processpool.ProcessPool, Poolname, num_threads) -def GetParallelPythonPool(Poolname=None, num_threads=None): +def GetParallelPythonPool(Poolname: str | None = None, num_threads: int | None = None) -> IPool: '''Get or create a specific pool to invoke functions or shell command processes on a cluster using parallel python ''' return __CreatePool(nornir_pools.parallelpythonpool.ParallelPythonProcess_Pool, Poolname, num_threads) -def GetSerialPool(Poolname=None, num_threads=None): +def GetSerialPool(Poolname: str | None = None, num_threads: int | None = None) -> IPool: ''' Get or create a specific thread pool using vanilla python threads ''' @@ -271,7 +278,7 @@ def GetSerialPool(Poolname=None, num_threads=None): return __CreatePool(nornir_pools.serialpool.SerialPool, Poolname, num_threads) -def GetGlobalSerialPool(): +def GetGlobalSerialPool() -> IPool: ''' Common pool for processes on the local machine ''' @@ -279,7 +286,7 @@ def GetGlobalSerialPool(): # return GetProcessPool("Global local process pool") -def GetGlobalProcessPool(): +def GetGlobalProcessPool() -> processpool.ProcessPool: ''' Common pool for processes on the local machine ''' @@ -287,7 +294,7 @@ def GetGlobalProcessPool(): # return GetProcessPool("Global local process pool") -def GetGlobalLocalMachinePool(): +def GetGlobalLocalMachinePool() -> IPool: ''' Common pool for launching other processes for threads or executables. Combines multithreading and process pool interface. @@ -296,7 +303,7 @@ def GetGlobalLocalMachinePool(): return GetLocalMachinePool(Poolname="Global local machine pool", is_global=True) -def GetGlobalClusterPool(): +def GetGlobalClusterPool() -> IPool: ''' Get the common pool for placing tasks on the cluster ''' @@ -307,14 +314,14 @@ def GetGlobalClusterPool(): return GetParallelPythonPool("Global cluster pool") -def GetGlobalThreadPool(): +def GetGlobalThreadPool() -> IPool: ''' Common pool for thread based tasks ''' return GetThreadPool("Global local thread pool") -def GetGlobalMultithreadingPool(): +def GetGlobalMultithreadingPool() -> IPool: ''' Common pool for multithreading module tasks, threads run in different python processes to work around the global interpreter lock @@ -327,7 +334,7 @@ def GetGlobalMultithreadingPool(): __LastConsoleWrite = datetime.datetime.utcnow() -def __CleanOutputForEclipse(s): +def __CleanOutputForEclipse(s: str): s = s.replace('\b', '') s = s.replace('.', '') s = s.strip() @@ -335,7 +342,7 @@ def __CleanOutputForEclipse(s): return s -def __EclipseConsoleWrite(s, newline=False): +def __EclipseConsoleWrite(s: str, newline: bool = False): es = __CleanOutputForEclipse(s) if newline: es = es + '\n' @@ -343,7 +350,7 @@ def __EclipseConsoleWrite(s, newline=False): sys.stdout.write(es) -def __EclipseConsoleWriteError(s, newline=False): +def __EclipseConsoleWriteError(s: str, newline:bool = False): es = __CleanOutputForEclipse(s) if newline: es = es + '\n' @@ -351,7 +358,7 @@ def __EclipseConsoleWriteError(s, newline=False): sys.stderr.write(es) -def __PrintProgressUpdateEclipse(s): +def __PrintProgressUpdateEclipse(s: str): global __LastConsoleWrite now = datetime.datetime.utcnow() @@ -364,21 +371,21 @@ def __PrintProgressUpdateEclipse(s): __LastConsoleWrite = datetime.datetime.utcnow() -def __ConsoleWrite(s, newline=False): +def __ConsoleWrite(s: str, newline: bool = False): if newline: s = s + '\n' sys.stdout.write(s) -def __ConsoleWriteError(s, newline=False): +def __ConsoleWriteError(s: str, newline: bool = False): if newline: s = s + '\n' sys.stderr.write(s) -def _PrintError(s): +def _PrintError(s: str): if 'ECLIPSE' in os.environ: __EclipseConsoleWrite(s) return @@ -386,7 +393,7 @@ def _PrintError(s): __ConsoleWriteError(s, newline=True) -def _PrintWarning(s): +def _PrintWarning(s: str): if 'ECLIPSE' in os.environ: __PrintProgressUpdateEclipse(s) return @@ -394,7 +401,7 @@ def _PrintWarning(s): __ConsoleWrite(s, newline=True) -def _PrintProgressUpdate(s): +def _PrintProgressUpdate(s: str): if 'ECLIPSE' in os.environ: __PrintProgressUpdateEclipse(s) return @@ -402,7 +409,7 @@ def _PrintProgressUpdate(s): __ConsoleWrite(s) -def _sprint(s): +def _sprint(s: str): """ Thread-safe print fucntion """ # Eclipse copies test output to the unit test window and this copy has # problems if the output has non-alphanumeric characters @@ -412,7 +419,7 @@ def _sprint(s): __ConsoleWrite(s, newline=True) -def _pprint(s): +def _pprint(s: str): """ Thread-safe print fucntion, no newline """ # Eclipse copies test output to the unit test window and this copy has @@ -514,7 +521,7 @@ def aggregate_profiler_data(output_path): # -def MergeProfilerStats(root_output_dir, profile_dir, pool_name): +def MergeProfilerStats(root_output_dir: str, profile_dir: str, pool_name: str): '''Called by atexit. Merges all *.profile files in the profile_dir into a single .profile file''' profile_files = glob.glob(os.path.join(profile_dir, "**", "*.pstats"), recursive=True) diff --git a/nornir_pools/ipool.py b/nornir_pools/ipool.py new file mode 100644 index 0000000..23eeb9f --- /dev/null +++ b/nornir_pools/ipool.py @@ -0,0 +1,55 @@ +from abc import ABC, abstractmethod +from typing import Callable, Any + + +class IPool(ABC): + + @property + @abstractmethod + def name(self) -> str: + raise NotImplementedError() + + @property + @abstractmethod + def num_active_tasks(self) -> int: + raise NotImplementedError() + + @abstractmethod + def shutdown(self): + ''' + The pool waits for all tasks to complete and frees any resources such as threads in a thread pool + ''' + raise NotImplementedError() + + @abstractmethod + def wait_completion(self): + ''' + Blocks until all tasks have completed + ''' + raise NotImplementedError() + + @abstractmethod + def add_task(self, name: str, func: Callable[..., Any], *args, **kwargs): + ''' + Call a python function on the pool + + :param str name: Friendly name of the task. Non-unique + :param function func: Python function pointer to invoke on the pool + + :returns: task object + :rtype: task + ''' + raise NotImplementedError() + + @abstractmethod + def add_process(self, name: str, func: Callable[..., Any], *args, **kwargs): + ''' + Invoke a process on the pool. This function creates a task using name and then invokes pythons subprocess + + :param str name: Friendly name of the task. Non-unique + :param function func: Process name to invoke using subprocess + + :returns: task object + :rtype: task + ''' + raise NotImplementedError() diff --git a/nornir_pools/poolbase.py b/nornir_pools/poolbase.py index 51d7662..2e16af4 100644 --- a/nornir_pools/poolbase.py +++ b/nornir_pools/poolbase.py @@ -6,9 +6,10 @@ import multiprocessing import logging import nornir_pools +from nornir_pools.ipool import IPool -class PoolBase(ABC): +class PoolBase(IPool): ''' Pool objects provide the interface to create tasks on the pool. ''' @@ -21,11 +22,6 @@ def name(self) -> str: def name(self, val: str): self._name = val - @property - @abstractmethod - def num_active_tasks(self) -> int: - raise NotImplementedError() - @property def logger(self): if self._logger is None: @@ -36,20 +32,6 @@ def logger(self): def __str__(self): return "Pool {0} with {1} active tasks".format(self.name, self.num_active_tasks) - @abstractmethod - def shutdown(self): - ''' - The pool waits for all tasks to complete and frees any resources such as threads in a thread pool - ''' - raise NotImplementedError() - - @abstractmethod - def wait_completion(self): - ''' - Blocks until all tasks have completed - ''' - raise NotImplementedError() - def __init__(self, *args, **kwargs): # self.logger = logging.getLogger(__name__) self._logger = None @@ -58,32 +40,6 @@ def __init__(self, *args, **kwargs): self.job_report_interval_in_seconds = kwargs.get("job_report_interval", 10.0) self._last_num_active_tasks = 0 - @abstractmethod - def add_task(self, name, func, *args, **kwargs): - ''' - Call a python function on the pool - - :param str name: Friendly name of the task. Non-unique - :param function func: Python function pointer to invoke on the pool - - :returns: task object - :rtype: task - ''' - raise NotImplementedError() - - @abstractmethod - def add_process(self, name, func, *args, **kwargs): - ''' - Invoke a process on the pool. This function creates a task using name and then invokes pythons subprocess - - :param str name: Friendly name of the task. Non-unique - :param function func: Process name to invoke using subprocess - - :returns: task object - :rtype: task - ''' - raise NotImplementedError() - def TryReportActiveTaskCount(self): ''' Report the current job count if we haven't reported it recently diff --git a/nornir_pools/processpool.py b/nornir_pools/processpool.py index 7a308d0..dcc7a1f 100644 --- a/nornir_pools/processpool.py +++ b/nornir_pools/processpool.py @@ -225,7 +225,7 @@ def add_worker_thread(self): return w - def add_process(self, name: str, func: Callable | str, *args, **kwargs): + def add_process(self, name: str, func: Callable[..., Any] | str, *args, **kwargs): """ Add a task to the queue, args are passed directly to subprocess.Popen :param name: The name of the task