Skip to content

Commit

Permalink
Changes:
Browse files Browse the repository at this point in the history
- add non-blocking proccessing lock to distribute between threads
  • Loading branch information
devkral committed Dec 30, 2024
1 parent 0a1cf1c commit 9590e64
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions asyncz/schedulers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
)
from asyncz.executors.pool import ThreadPoolExecutor
from asyncz.executors.types import ExecutorType
from asyncz.locks import RLockProtected
from asyncz.locks import ReadProtected
from asyncz.schedulers import defaults
from asyncz.schedulers.asgi import ASGIApp, ASGIHelper
from asyncz.schedulers.datastructures import TaskDefaultStruct
Expand Down Expand Up @@ -131,7 +131,8 @@ def __init__(
self.executors: dict[str, ExecutorType] = {}
self.executor_lock: RLock = self.create_lock()
self.stores: dict[str, StoreType] = {}
self.store_lock: LockProtectedProtocol = self.create_store_lock()
self.store_processing_lock: LockProtectedProtocol = self.create_store_lock()
self.store_lock: RLock = self.create_lock()
self.listeners: list[Any] = []
self.listeners_lock: RLock = self.create_lock()
self.pending_tasks: list[tuple[TaskType, bool, bool]] = []
Expand Down Expand Up @@ -228,7 +229,7 @@ def start(self, paused: bool = False) -> bool:
for alias, executor in self.executors.items():
executor.start(self, alias)

with self.store_lock.protected(blocking=True):
with self.store_lock:
if "default" not in self.stores:
self.add_store(self.create_default_store(), "default")

Expand Down Expand Up @@ -289,7 +290,7 @@ def shutdown(self, wait: bool = True) -> bool:

self.state = SchedulerState.STATE_STOPPED

with self.executor_lock, self.store_lock.protected(blocking=True):
with self.executor_lock, self.store_lock:
for executor in self.executors.values():
executor.shutdown(wait)
coros = []
Expand Down Expand Up @@ -417,7 +418,7 @@ def add_store(
Any extra keyword arguments will be passed to the task store plugin's constructor, assuming
that the first argument is the name of a task store plugin.
"""
with self.store_lock.protected(blocking=True):
with self.store_lock:
if alias in self.stores:
raise ValueError(
f"This scheduler already has a task store by the alias of '{alias}'."
Expand Down Expand Up @@ -446,7 +447,7 @@ def remove_store(self, alias: str, shutdown: bool = True) -> None:
"""
Removes the task store by the given alias from this scheduler.
"""
with self.store_lock.protected(blocking=True):
with self.store_lock:
store = self.lookup_store(alias)
del self.stores[alias]

Expand Down Expand Up @@ -577,7 +578,7 @@ def add_task(
fn_or_task.store_alias = "default"
assert fn_or_task.trigger is not None, "Cannot submit a task without a trigger."
assert fn_or_task.id is not None, "Cannot submit a decorator type task."
with self.store_lock.protected(blocking=True):
with self.store_lock:
if self.state == SchedulerState.STATE_STOPPED:
self.pending_tasks.append(
(
Expand Down Expand Up @@ -645,7 +646,7 @@ def update_task(
else:
new_updates = updates

with self.store_lock.protected(blocking=True):
with self.store_lock:
task, store = self.lookup_task(task_id, store)
task.update(**new_updates)

Expand Down Expand Up @@ -703,7 +704,7 @@ def resume_task(
if isinstance(task_id, TaskType):
assert task_id.id, "Cannot resume decorator style Task"
task_id = task_id.id
with self.store_lock.protected(blocking=True):
with self.store_lock:
task, store = self.lookup_task(task_id, store)
now = datetime.now(self.timezone)
next_run_time = task.trigger.get_next_trigger_time(self.timezone, None, now) # type: ignore
Expand All @@ -725,7 +726,7 @@ def get_tasks(self, store: Optional[str] = None) -> list[TaskType]:
Args:
store: alias of the task store.
"""
with self.store_lock.protected(blocking=True):
with self.store_lock:
tasks = []
if self.state == SchedulerState.STATE_STOPPED:
for task, _, _ in self.pending_tasks:
Expand All @@ -746,7 +747,7 @@ def get_task(self, task_id: str, store: Optional[str] = None) -> Union[TaskType,
task_id: The identifier of the task.
store: Alias of the task store that most likely contains the task.
"""
with self.store_lock.protected(blocking=True):
with self.store_lock:
try:
return self.lookup_task(task_id, store)[0]
except TaskLookupError:
Expand All @@ -768,7 +769,7 @@ def delete_task(
return
store_alias = None

with self.store_lock.protected(blocking=True):
with self.store_lock:
if self.state == SchedulerState.STATE_STOPPED:
for index, (task, _, _) in enumerate(self.pending_tasks):
if task.id == task_id and store in (None, task.store_alias):
Expand Down Expand Up @@ -797,7 +798,7 @@ def remove_all_tasks(self, store: Optional[str]) -> None:
"""
Removes all tasks from the specified task store, or all task stores if none is given.
"""
with self.store_lock.protected(blocking=True):
with self.store_lock:
if self.state == SchedulerState.STATE_STOPPED:
if store:
self.pending_tasks = [
Expand Down Expand Up @@ -1096,7 +1097,7 @@ def create_store_lock(self) -> LockProtectedProtocol:
"""
Creates a reentrant lock object.
"""
return RLockProtected()
return ReadProtected()

def _process_tasks_of_store(
self,
Expand Down Expand Up @@ -1200,13 +1201,15 @@ def process_tasks(self) -> Optional[float]:
next_wakeup_time: Optional[datetime] = None
events = []

# threading lock
with self.store_lock.protected(blocking=False) as blocking_success:
# check for other processing thread
with self.store_processing_lock.protected(blocking=False) as blocking_success:
if blocking_success:
for store_alias, store in self.stores.items():
next_wakeup_time = self._process_tasks_of_store(
now, next_wakeup_time, store_alias, store, events
)
# threading lock
with self.store_lock:
for store_alias, store in self.stores.items():
next_wakeup_time = self._process_tasks_of_store(
now, next_wakeup_time, store_alias, store, events
)
else:
retry_wakeup_time = now + timedelta(seconds=self.store_retry_interval)
if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
Expand Down

0 comments on commit 9590e64

Please sign in to comment.