diff --git a/src/tapper/controller/sleep_processor.py b/src/tapper/controller/sleep_processor.py new file mode 100644 index 0000000..2009fc0 --- /dev/null +++ b/src/tapper/controller/sleep_processor.py @@ -0,0 +1,53 @@ +import time +from typing import Callable + +from attr import dataclass +from tapper.parser import common + + +class StopTapperActionException(Exception): + """Normal way to interrupt tapper's action. Will not cause error logs etc.""" + + +@dataclass +class SleepCommandProcessor: + """Implementation of tapper's "sleep" command.""" + + check_interval: float + """How often the check is made for whether to continue sleeping.""" + kill_check_fn: Callable[[], bool] + """Checks whether action should be killed.""" + _actual_sleep_fn: Callable[[float], None] = time.sleep + + def __post_init__(self) -> None: + if self.check_interval <= 0: + raise ValueError( + "SleepCommandProcessor check_interval must be greater than 0." + ) + + def sleep(self, length_of_time: float | str) -> None: + """ + Suspend execution for a length of time. + This is functionally identical to `time.sleep`, but tapper + has control needed to interrupt or pause this. + + :param length_of_time: Either number (seconds), + or str seconds/millis like: "1s", "50ms". + """ + self.kill_if_required() + started_at = time.perf_counter() + time_s = ( + common.parse_sleep_time(length_of_time) + if isinstance(length_of_time, str) + else length_of_time + ) + if time_s is None or time_s < 0: + raise ValueError(f"sleep {length_of_time} is invalid") + finish_time = started_at + time_s + while (now := time.perf_counter()) < finish_time: + self._actual_sleep_fn(min(self.check_interval, finish_time - now)) + self.kill_if_required() + + def kill_if_required(self) -> None: + if self.kill_check_fn(): + raise StopTapperActionException diff --git a/src/tapper/parser/common.py b/src/tapper/parser/common.py index a63fae1..1e09fef 100644 --- a/src/tapper/parser/common.py +++ b/src/tapper/parser/common.py @@ -44,6 +44,15 @@ def parse_xy(coord_str: str) -> tuple[tuple[int, int], bool]: fn = parse_xy +def parse_sleep_time(prop: str) -> float | None: + """If doesn't match sleep pattern, will return None.""" + if SECONDS.regex.fullmatch(prop): + return SECONDS.fn(prop) + if MILLIS.regex.fullmatch(prop): + return MILLIS.fn(prop) + return None + + def split(text: str, delimiter: str) -> list[str]: """Split str, with minimum length of tokens same as delimiter. diff --git a/src/tapper/parser/send_parser.py b/src/tapper/parser/send_parser.py index 2c1cc14..19f630a 100644 --- a/src/tapper/parser/send_parser.py +++ b/src/tapper/parser/send_parser.py @@ -117,10 +117,8 @@ def resolve_chain_opening_with_props( def sleep_prop(prop: str) -> Optional[SleepInstruction]: - if common.SECONDS.regex.fullmatch(prop): - return SleepInstruction(common.SECONDS.fn(prop)) - if common.MILLIS.regex.fullmatch(prop): - return SleepInstruction(common.MILLIS.fn(prop)) + if (parsed := common.parse_sleep_time(prop)) is not None: + return SleepInstruction(parsed) return None diff --git a/tests/tapper/controller/test_sleep_processor.py b/tests/tapper/controller/test_sleep_processor.py new file mode 100644 index 0000000..208be43 --- /dev/null +++ b/tests/tapper/controller/test_sleep_processor.py @@ -0,0 +1,114 @@ +import time +from dataclasses import dataclass +from dataclasses import field +from typing import Callable + +import pytest +from tapper.controller.sleep_processor import SleepCommandProcessor +from tapper.controller.sleep_processor import StopTapperActionException + + +@dataclass +class Counter: + count: int = 0 + ticks: list[float] = field(default_factory=list) + result: bool = False + at_3: Callable[[], bool] | None = None + + def tick(self) -> bool: + self.count += 1 + if self.count == 3 and self.at_3 is not None: + return self.at_3() + return self.result + + +class TestSleepProcessor: + def test_simplest(self) -> None: + processor = SleepCommandProcessor( + check_interval=1, + kill_check_fn=lambda: False, + ) + + time_start = time.perf_counter() + processor.sleep(0) + assert time_start == pytest.approx(time.perf_counter(), abs=0.01) + + def test_immediate_kill(self) -> None: + counter = Counter(result=True) + processor = SleepCommandProcessor(check_interval=1, kill_check_fn=counter.tick) + with pytest.raises(StopTapperActionException): + processor.sleep(20) + + def test_zero_time(self) -> None: + counter = Counter() + processor = SleepCommandProcessor( + check_interval=1, + kill_check_fn=counter.tick, + ) + processor.sleep(0) + assert counter.count == 1 + + def test_negative_time(self) -> None: + processor = SleepCommandProcessor( + check_interval=1, + kill_check_fn=lambda: False, + ) + with pytest.raises(ValueError): + processor.sleep(-1) + + def test_correct_time_and_number_of_checks(self) -> None: + counter = Counter() + processor = SleepCommandProcessor( + check_interval=0.05, + kill_check_fn=counter.tick, + ) + + time_start = time.perf_counter() + processor.sleep(0.1) + assert counter.count == 3 # 1 check at the start and 2 intervals + assert time_start + 0.1 == pytest.approx(time.perf_counter(), abs=0.01) + + def test_check_interval_bigger_than_sleep_time(self) -> None: + counter = Counter() + processor = SleepCommandProcessor( + check_interval=1, + kill_check_fn=counter.tick, + ) + + time_start = time.perf_counter() + processor.sleep(0.02) + assert counter.count == 2 + assert time_start + 0.02 == pytest.approx(time.perf_counter(), abs=0.01) + + def test_time_str_seconds(self) -> None: + processor = SleepCommandProcessor( + check_interval=0.01, + kill_check_fn=lambda: False, + ) + + time_start = time.perf_counter() + processor.sleep("0.02s") + assert time_start + 0.02 == pytest.approx(time.perf_counter(), abs=0.01) + + def test_time_str_millis(self) -> None: + processor = SleepCommandProcessor( + check_interval=0.01, + kill_check_fn=lambda: False, + ) + + time_start = time.perf_counter() + processor.sleep("20ms") + assert time_start + 0.02 == pytest.approx(time.perf_counter(), abs=0.01) + + def test_killed_after_3_interval(self) -> None: + counter = Counter(at_3=lambda: True) + processor = SleepCommandProcessor( + check_interval=0.01, + kill_check_fn=counter.tick, + ) + + time_start = time.perf_counter() + with pytest.raises(StopTapperActionException): + processor.sleep(1) + assert counter.count == 3 # 1 initial and 2 sleeps + assert time_start + 0.02 == pytest.approx(time.perf_counter(), abs=0.01)