diff --git a/atxm/_task.py b/atxm/_task.py deleted file mode 100644 index e2b79b5..0000000 --- a/atxm/_task.py +++ /dev/null @@ -1,47 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Any - -from twisted.internet import reactor -from twisted.internet.task import LoopingCall -from twisted.logger import Logger - - -class SimpleTask(ABC): - """Simple Twisted Looping Call abstract base class.""" - - INTERVAL = NotImplemented - CLOCK = reactor - - def __init__(self, interval: float = INTERVAL): - self.interval = interval - self.log = Logger(self.__class__.__name__) - self._task = LoopingCall(self.run) - self._task.clock = self.CLOCK - self._task.interval = self.interval - - @property - def running(self) -> bool: - """Determine whether the task is already running.""" - return self._task.running - - def start(self, now: bool = False): - """Start task.""" - if not self.running: - d = self._task.start(interval=self.interval, now=now) - d.addErrback(self.handle_errors) - return d - - def stop(self): - """Stop task.""" - if self.running: - self._task.stop() - - @abstractmethod - def run(self): - """Task method that should be periodically run.""" - raise NotImplementedError - - @abstractmethod - def handle_errors(self, *args, **kwargs) -> Any: - """Error callback for error handling during execution.""" - raise NotImplementedError diff --git a/atxm/machine.py b/atxm/machine.py index 185bced..ecb731e 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -1,9 +1,11 @@ -import time from copy import deepcopy +from logging import Logger from typing import Optional, Union, List, Type from eth_account.signers.local import LocalAccount +from twisted.internet import reactor from twisted.internet.defer import Deferred +from twisted.internet.task import LoopingCall from web3 import Web3 from web3.exceptions import TransactionNotFound from web3.types import TxReceipt, TxParams @@ -26,37 +28,36 @@ FutureTx, PendingTx, TxHash, - _make_tx_params, ) from atxm.utils import ( _get_average_blocktime, _get_receipt, _handle_rpc_error, fire_hook, + _make_tx_params, ) -from ._task import SimpleTask from .logging import log -class _Machine(SimpleTask): - """Do not import this - use the public `AutomaticTxMachine` instead.""" +class _Machine: + """ + Do not import this - it will not work. + Please import the publicly exposed `AutomaticTxMachine` instead. + Thanks! :) + """ + + # internal + __CLOCK = reactor # twisted reactor # tweaks _TRACKING_CONFIRMATIONS = 300 # blocks until clearing a finalized transaction _RPC_THROTTLE = 1 # min. seconds between RPC calls (>1 recommended) - _MIN_INTERVAL = ( - 1 # absolute min. async loop interval (edge case of low block count) - ) - - # idle (slower) - INTERVAL = 60 * 5 # seconds - IDLE_INTERVAL = INTERVAL # renames above constant + _MIN_INTERVAL = 1 + _IDLE_INTERVAL = 60 * 5 # seconds + _BLOCK_INTERVAL = 20 # ~20 blocks + _BLOCK_SAMPLE_SIZE = 10_000 # blocks - # work (faster) - BLOCK_INTERVAL = 20 # ~20 blocks - BLOCK_SAMPLE_SIZE = 10_000 # blocks - - __DEFAULT_STRATEGIES: List[Type[AsyncTxStrategy]] = [ + STRATEGIES: List[Type[AsyncTxStrategy]] = [ InsufficientFundsPause, TimeoutPause, FixedRateSpeedUp, @@ -71,48 +72,134 @@ def __init__( # public self.w3 = w3 self.signers = {} - self.strategies = [s(w3) for s in self.__DEFAULT_STRATEGIES] + self.log = Logger(self.__class__.__name__) + self.strategies = [s(w3) for s in self.STRATEGIES] if strategies: self.strategies.extend(list(strategies)) - # internal - self._state = _State(disk_cache=disk_cache) + # state self.__pause = False - super().__init__(interval=self.INTERVAL) + self._state = _State(disk_cache=disk_cache) + + # async + self._task = LoopingCall(self._cycle) + self._task.clock = self.__CLOCK + self._task.interval = self._IDLE_INTERVAL + + @property + def _clock(self): + return self.__CLOCK + + @property + def _interval(self) -> Optional[float]: + return self._task.interval + + @property + def _start_time(self) -> float: + return self._task.starttime + + @property + def _busy(self) -> bool: + """Returns True if the machine is busy.""" + return bool(self._state.queue or self._state.pending) + + # + # Async + # + + def _handle_errors(self, *args, **kwargs): + """Handles unexpected errors during task processing.""" + self._state.commit() + self.log.warn( + "[recovery] error during transaction: {}".format(args[0].getTraceback()) + ) + if self._task.running: + return + self.log.warn("[recovery] restarting transaction machine!") + self._start(now=False) + + def _cycle(self) -> None: + """Execute one cycle""" + + if self.__pause: + self.log.warn("[pause] paused") + return + + self.__monitor_finalized() + if not self._busy: + self.log.info(f"[idle] cycle interval is {self._task.interval} seconds") + return + + # sample block time and adjust the interval + # every work cycle. + self.__work_mode() + self.log.info( + f"[working] tracking {len(self._state.queue)} queued " + f"transaction{'s' if len(self._state.queue) > 1 else ''} " + f"{'and 1 pending transaction' if self._state.pending else ''}" + ) + + if self._state.pending: + # There is an active transaction + self.__handle_active_transaction() + + elif self._state.queue: + # There is no active transaction + # and there are queued transactions + self.__broadcast() + + # After one work cycle, return to idle mode + # if the machine is not busy. + if not self._busy: + self.__idle_mode() # # Throttle # + def _start(self, now: bool = False) -> Deferred: + if self._task.running: + return self._task.deferred + when = "now" if now else f"in {self._task.interval} seconds" + self.log.info(f"[atxm] starting async transaction machine {when}") + d = self._task.start(interval=self._interval, now=now) + d.addErrback(self._handle_errors) + return d + + def _stop(self): + """Stop task.""" + if self._task.running: + self._task.stop() + def __idle_mode(self) -> None: """Return to idle mode (slow down)""" - self._task.interval = self.IDLE_INTERVAL + self._task.interval = self._IDLE_INTERVAL self.log.info( f"[done] returning to idle mode with " f"{self._task.interval} second interval" ) - if not self.busy: + if not self._busy: self._sleep() def __work_mode(self) -> None: """Start work mode (speed up)""" average_block_time = _get_average_blocktime( - w3=self.w3, sample_size=self.BLOCK_SAMPLE_SIZE + w3=self.w3, sample_size=self._BLOCK_SAMPLE_SIZE ) self._task.interval = max( - round(average_block_time * self.BLOCK_INTERVAL), self._MIN_INTERVAL + round(average_block_time * self._BLOCK_INTERVAL), self._MIN_INTERVAL ) self.log.info(f"[working] cycle interval is {self._task.interval} seconds") def _wake(self) -> None: if not self._task.running: log.info("[wake] waking up") - self.start(now=True) + self._start(now=True) def _sleep(self) -> None: if self._task.running: log.info("[sleep] sleeping") - self.stop() + self._stop() # # Lifecycle @@ -344,71 +431,3 @@ def __monitor_finalized(self) -> None: self.log.info( f"[monitor] transaction {txhash.hex()} has {confirmations} confirmations" ) - - # - # Async - # - - def handle_errors(self, *args, **kwargs) -> None: - """Handles unexpected errors during task processing.""" - self.log.warn( - "[recovery] error during transaction: {}".format(args[0].getTraceback()) - ) - self._state.commit() - time.sleep(self._RPC_THROTTLE) - if not self._task.running: - self.log.warn("[recovery] restarting transaction machine!") - self.start(now=False) # take a breather - - def run(self) -> None: - """Execute one cycle""" - - if self.__pause: - self.log.warn("[pause] paused") - return - - self.__monitor_finalized() - if not self.busy: - self.log.info(f"[idle] cycle interval is {self._task.interval} seconds") - return - - # sample block time and adjust the interval - # every work cycle. - self.__work_mode() - self.log.info( - f"[working] tracking {len(self._state.queue)} queued " - f"transaction{'s' if len(self._state.queue) > 1 else ''} " - f"{'and 1 pending transaction' if self._state.pending else ''}" - ) - - if self._state.pending: - # There is an active transaction - self.__handle_active_transaction() - - elif self._state.queue: - # There is no active transaction - # and there are queued transactions - self.__broadcast() - - # After one work cycle, return to idle mode - # if the machine is not busy. - if not self.busy: - self.__idle_mode() - - @property - def busy(self) -> bool: - """Returns True if the machine is busy.""" - if self._state.pending: - return True - if len(self._state.queue) > 0: - return True - return False - - def start(self, now: bool = True) -> Deferred: - if now: - self.log.info("[atxm] starting async transaction machine now") - else: - self.log.info( - f"[atxm] starting async transaction machine in {self.INTERVAL} seconds" - ) - return super().start(now=now) diff --git a/atxm/main.py b/atxm/main.py index f16560f..297e19f 100644 --- a/atxm/main.py +++ b/atxm/main.py @@ -1,3 +1,4 @@ +from copy import copy from typing import List, Set from eth_account.signers.local import LocalAccount @@ -13,6 +14,29 @@ class AutomaticTxMachine(_Machine): + def start(self, now: bool = False) -> None: + """Start the machine. if now is True, start immediately.""" + super()._start(now=now) + + def stop(self) -> None: + """Stop the machine.""" + super()._stop() + + @property + def running(self) -> bool: + """Return True if the machine is running.""" + return bool(self._task.running) + + @property + def paused(self) -> bool: + """Return True if the machine is paused.""" + return bool(self.__pause) + + @property + def busy(self) -> bool: + """Returns True if the machine is busy.""" + return super()._busy + @property def queued(self) -> List[FutureTx]: """Return a list of queued transactions.""" @@ -21,12 +45,12 @@ def queued(self) -> List[FutureTx]: @property def pending(self) -> PendingTx: """Return the active transaction if there is one.""" - return self._state.pending or None + return copy(self._state.pending or None) @property def finalized(self) -> Set[FinalizedTx]: """Return a set of finalized transactions.""" - return self._state.finalized + return set(self._state.finalized) @property def faults(self) -> List[AsyncTx]: diff --git a/atxm/tx.py b/atxm/tx.py index 70eb686..cd6a109 100644 --- a/atxm/tx.py +++ b/atxm/tx.py @@ -185,32 +185,3 @@ def _deserialize_tx_receipt(receipt: Dict) -> TxReceipt: "status": receipt["status"], } ) - - -def _make_tx_params(data: TxData) -> TxParams: - """ - TxData -> TxParams: Creates a transaction parameters - object from a transaction data object for broadcast. - - This operation is performed in order to "turnaround" the transaction - data object as queried from the RPC provider (eth_getTransaction) into a transaction - parameters object for strategics and re-broadcast (LocalAccount.sign_transaction). - """ - params = TxParams( - { - "nonce": data["nonce"], - "chainId": data["chainId"], - "gas": data["gas"], - "to": data["to"], - "value": data["value"], - "data": data.get("data", b""), - } - ) - if "gasPrice" in data: - params["type"] = "0x01" - params["gasPrice"] = data["gasPrice"] - elif "maxFeePerGas" in data: - params["type"] = "0x02" - params["maxFeePerGas"] = data["maxFeePerGas"] - params["maxPriorityFeePerGas"] = data["maxPriorityFeePerGas"] - return params diff --git a/atxm/utils.py b/atxm/utils.py index 4a63e13..a73a996 100644 --- a/atxm/utils.py +++ b/atxm/utils.py @@ -4,8 +4,8 @@ from twisted.internet import reactor from web3 import Web3 from web3.exceptions import TransactionNotFound -from web3.types import PendingTx as PendingTxData -from web3.types import RPCError, TxData, TxReceipt, Wei +from web3.types import PendingTx as PendingTxData, TxData, TxParams +from web3.types import RPCError, TxReceipt, Wei from atxm.exceptions import ( InsufficientFunds, @@ -74,6 +74,35 @@ def _handle_rpc_error(e: Exception, tx: FutureTx) -> None: if error["code"] == -32000: if "insufficient funds" in error["message"]: raise InsufficientFunds - hook = tx.on_error + hook = tx.on_fault if hook: fire_hook(hook=hook, tx=tx, error=e) + + +def _make_tx_params(data: TxData) -> TxParams: + """ + TxData -> TxParams: Creates a transaction parameters + object from a transaction data object for broadcast. + + This operation is performed in order to "turnaround" the transaction + data object as queried from the RPC provider (eth_getTransaction) into a transaction + parameters object for strategics and re-broadcast (LocalAccount.sign_transaction). + """ + params = TxParams( + { + "nonce": data["nonce"], + "chainId": data["chainId"], + "gas": data["gas"], + "to": data["to"], + "value": data["value"], + "data": data.get("data", b""), + } + ) + if "gasPrice" in data: + params["type"] = "0x01" + params["gasPrice"] = data["gasPrice"] + elif "maxFeePerGas" in data: + params["type"] = "0x02" + params["maxFeePerGas"] = data["maxFeePerGas"] + params["maxPriorityFeePerGas"] = data["maxPriorityFeePerGas"] + return params