-
-
Notifications
You must be signed in to change notification settings - Fork 122
/
fastapi_utils_tasks.py
80 lines (68 loc) · 3.39 KB
/
fastapi_utils_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import asyncio
import logging
from asyncio import ensure_future
from functools import wraps
from traceback import format_exception
from typing import Any, Callable, Coroutine, Optional, Union
from starlette.concurrency import run_in_threadpool
NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[[Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]], NoArgsNoReturnAsyncFuncT]
def repeat_every(
*,
seconds: float,
wait_first: bool = False,
logger: Optional[logging.Logger] = None,
raise_exceptions: bool = False,
max_repetitions: Optional[int] = None,
) -> NoArgsNoReturnDecorator:
"""
This function returns a decorator that modifies a function so it is periodically re-executed after its first call.
The function it decorates should accept no arguments and return nothing. If necessary, this can be accomplished
by using `functools.partial` or otherwise wrapping the target function prior to decoration.
Parameters
----------
seconds: float
The number of seconds to wait between repeated calls
wait_first: bool (default False)
If True, the function will wait for a single period before the first call
logger: Optional[logging.Logger] (default None)
The logger to use to log any exceptions raised by calls to the decorated function.
If not provided, exceptions will not be logged by this function (though they may be handled by the event loop).
raise_exceptions: bool (default False)
If True, errors raised by the decorated function will be raised to the event loop's exception handler.
Note that if an error is raised, the repeated execution will stop.
Otherwise, exceptions are just logged and the execution continues to repeat.
See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.set_exception_handler for more info.
max_repetitions: Optional[int] (default None)
The maximum number of times to call the repeated function. If `None`, the function is repeated forever.
"""
def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
"""
Converts the decorated function into a repeated, periodically-called version of itself.
"""
is_coroutine = asyncio.iscoroutinefunction(func)
@wraps(func)
async def wrapped() -> None:
repetitions = 0
async def loop() -> None:
nonlocal repetitions
if wait_first:
await asyncio.sleep(seconds)
while max_repetitions is None or repetitions < max_repetitions:
try:
if is_coroutine:
await func() # type: ignore
else:
await run_in_threadpool(func)
repetitions += 1
except Exception as exc:
if logger is not None:
formatted_exception = "".join(format_exception(type(exc), exc, exc.__traceback__))
logger.error(formatted_exception)
if raise_exceptions:
raise exc
await asyncio.sleep(seconds)
ensure_future(loop())
return wrapped
return decorator