-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added helper class Generator_Manager
- Loading branch information
Showing
6 changed files
with
636 additions
and
0 deletions.
There are no files selected for viewing
21 changes: 21 additions & 0 deletions
21
osbot_utils/helpers/generators/Generator_Context_Manager.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from osbot_utils.helpers.generators.Model__Generator_State import Model__Generator_State | ||
|
||
|
||
class Generator_Context_Manager: | ||
def __init__(self, manager, generator_func): | ||
self.manager = manager | ||
self.generator_func = generator_func | ||
self.target_id = None | ||
|
||
def __enter__(self): | ||
self.target_id = self.manager.add(self.generator_func) # Add the generator to the manager | ||
return self.manager.generator(self.target_id).target # Return the generator's reference | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
with self.manager.lock: | ||
generator = self.manager.generator(self.target_id) | ||
if generator and generator.state == Model__Generator_State.RUNNING: | ||
generator.state = Model__Generator_State.COMPLETED | ||
|
||
|
||
#self.manager.cleanup() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import threading | ||
from _thread import RLock # Reentrant lock for thread-safe access | ||
from types import GeneratorType | ||
from typing import Dict # Typing imports for type hints | ||
from osbot_utils.base_classes.Type_Safe import Type_Safe # Type_Safe base class for type-safe attributes | ||
from osbot_utils.helpers.Random_Guid import Random_Guid # Helper for generating unique IDs | ||
from osbot_utils.helpers.generators.Generator_Context_Manager import Generator_Context_Manager | ||
from osbot_utils.helpers.generators.Model__Generator_State import Model__Generator_State | ||
from osbot_utils.helpers.generators.Model__Generator_Target import Model__Generator_Target | ||
|
||
|
||
class Generator_Manager(Type_Safe): # Class for managing multiple generator targets | ||
generators: Dict[Random_Guid, Model__Generator_Target] # Dictionary mapping target IDs to generator targets | ||
lock : RLock = None # Reentrant lock for thread-safe access to shared data | ||
|
||
def __init__(self, **kwargs): # Constructor method | ||
super().__init__(**kwargs) | ||
self.lock = threading.RLock() # return an object of type _thread.RLock | ||
|
||
|
||
def active(self) -> Dict[Random_Guid, Model__Generator_Target]: # Method to get all active (running) generators | ||
with self.lock: # Acquire the lock for thread-safe access | ||
return {k: v for k, v in self.generators.items() if v.state == Model__Generator_State.RUNNING} # Return a dictionary of running generators | ||
|
||
def add(self, target: GeneratorType) -> Random_Guid: # Method to add a new generator to the manager | ||
with self.lock: # Acquire the lock for thread-safe access | ||
existing_target_id = self.target_id(target) # Check if the target already exists | ||
if existing_target_id: # If the target already exists | ||
raise ValueError(f"In Generator_Manager.add_generator, " # Raise an error with the ID of the existing target | ||
f"target already exists with ID: {existing_target_id}") | ||
|
||
generator = Model__Generator_Target(target=target, state=Model__Generator_State.RUNNING) # Create a new Generator_Target with RUNNING state | ||
self.generators[generator.target_id] = generator # Add the generator to the dictionary | ||
return generator.target_id # Return the unique ID of the added generator | ||
|
||
def capture(self, generator_func: GeneratorType): # Use this method to manage a generator's lifecycle via a context manager. | ||
return Generator_Context_Manager(self, generator_func) | ||
|
||
def cleanup(self) -> int: # Method to remove all completed or stopped generators | ||
with self.lock: # Acquire the lock for thread-safe access | ||
cleaned_count = 0 # Counter for the number of generators cleaned up | ||
for target_id in list(self.generators.keys()): # Iterate over the keys of the dictionary | ||
generator = self.generator(target_id) # Get the generator by its ID | ||
if generator and generator.state in [Model__Generator_State.STOPPED, | ||
Model__Generator_State.COMPLETED]: # Check if the generator is stopped or completed | ||
self.generators.pop(target_id, None) # Remove the generator from the dictionary | ||
cleaned_count += 1 # Increment the cleaned count | ||
return cleaned_count # Return the total number of cleaned generators | ||
|
||
def find_generator(self, target: GeneratorType) -> Model__Generator_Target: | ||
with self.lock: | ||
for generator in list(self.generators.values()): | ||
if generator.target == target: | ||
return generator | ||
|
||
def generator(self, target_id: Random_Guid) -> Model__Generator_Target: # Method to get a generator by its ID | ||
with self.lock: # Acquire the lock for thread-safe access | ||
return self.generators.get(target_id) # Return the generator or None if it doesn't exist | ||
|
||
def remove(self, target_id: Random_Guid) -> bool: # Method to remove a generator if it is stopped or completed | ||
with self.lock: # Acquire the lock for thread-safe access | ||
generator = self.generator(target_id) # Get the generator by its ID | ||
if not generator: # If the generator doesn't exist | ||
return False # Silently return False | ||
if generator.state in [Model__Generator_State.STOPPED, Model__Generator_State.COMPLETED]: # Check if the generator is in a removable state | ||
del self.generators[target_id] # Remove the generator from the dictionary | ||
return True # Return True to indicate successful removal | ||
return False # Return False if the generator was not removable | ||
|
||
def should_stop(self, target_id: Random_Guid) -> bool: # Method to check if a generator should stop | ||
with self.lock: # Acquire the lock for thread-safe access | ||
generator = self.generator(target_id) # Get the generator by its ID | ||
if not generator: # If the generator doesn't exist | ||
raise ValueError(f"In Generator_Manager.should_stop, " # Raise an error indicating missing generator | ||
f"Generator with ID {target_id} does not exist.") | ||
return generator.state != Model__Generator_State.RUNNING # Return True if the generator is not running | ||
|
||
def stop(self, target_id: Random_Guid) -> bool: # Method to stop a running generator | ||
with self.lock: # Acquire the lock for thread-safe access | ||
generator = self.generator(target_id) # Get the generator by its ID | ||
if generator and generator.state == Model__Generator_State.RUNNING: # If the generator is running | ||
generator.state = Model__Generator_State.STOPPING # Set its state to STOPPING | ||
return True # Return True to indicate the generator is stopping | ||
return False # Return False if the generator could not be stopped | ||
|
||
def stop_all(self) -> int: # Method to stop all running generators | ||
with self.lock: # Acquire the lock for thread-safe access | ||
stopped_count = 0 # Counter for the number of generators stopped | ||
for target_id in list(self.generators.keys()): # Iterate over the keys of the dictionary | ||
if self.stop(target_id): # Attempt to stop each generator | ||
stopped_count += 1 # Increment the stopped count if successful | ||
return stopped_count # Return the total number of stopped generators | ||
|
||
def target_id(self, target: GeneratorType) -> Random_Guid | None: # Method to get the ID of a specific generator | ||
with self.lock: # Acquire the lock for thread-safe access | ||
for generator in list(self.generators.values()): # Iterate over the generator targets | ||
if generator.target == target: # Check if the target matches | ||
return generator.target_id # Return the matching target's ID |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from enum import Enum | ||
|
||
class Model__Generator_State(Enum): # Enum representing possible states of a generator | ||
CREATED : str = "created" # Initial state when the generator is created | ||
RUNNING : str = "running" # State when the generator is actively running | ||
STOPPING : str = "stopping" # State when the generator is in the process of stopping | ||
STOPPED : str = "stopped" # State when the generator is fully stopped | ||
COMPLETED : str = "completed" # State when the generator has completed its execution | ||
ERROR : str = "error" # State when the generator encounters an error | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from types import GeneratorType | ||
from osbot_utils.base_classes.Type_Safe import Type_Safe | ||
from osbot_utils.helpers.Random_Guid import Random_Guid | ||
from osbot_utils.helpers.generators.Model__Generator_State import Model__Generator_State | ||
|
||
|
||
class Model__Generator_Target(Type_Safe): # Class representing a generator target and its metadata | ||
target_id : Random_Guid # Unique ID for the generator target | ||
target : GeneratorType = None # The generator instance being managed | ||
state : Model__Generator_State = Model__Generator_State.CREATED # Current state of the generator (default is CREATED) |
Empty file.
Oops, something went wrong.