From f8025f817593c5e8fffed99c53ac18ead19246df Mon Sep 17 00:00:00 2001 From: KPrasch Date: Wed, 7 Feb 2024 11:20:19 +0100 Subject: [PATCH] introduce strategy chains --- atxm/_task.py | 1 + atxm/exceptions.py | 44 +++++++- atxm/machine.py | 244 ++++++++++++++++-------------------------- atxm/main.py | 6 +- atxm/state.py | 31 +++--- atxm/strategies.py | 128 ++++++++++++++++++---- atxm/tx.py | 37 ++----- pyproject.toml | 3 +- tests/conftest.py | 5 + tests/test_machine.py | 13 ++- 10 files changed, 289 insertions(+), 223 deletions(-) diff --git a/atxm/_task.py b/atxm/_task.py index 0875238..e2b79b5 100644 --- a/atxm/_task.py +++ b/atxm/_task.py @@ -17,6 +17,7 @@ def __init__(self, interval: float = INTERVAL): self.log = Logger(self.__class__.__name__) self._task = LoopingCall(self.run) self._task.clock = self.CLOCK + self._task.interval = self.interval @property def running(self) -> bool: diff --git a/atxm/exceptions.py b/atxm/exceptions.py index 9d98e51..a9dcae7 100644 --- a/atxm/exceptions.py +++ b/atxm/exceptions.py @@ -1,6 +1,31 @@ +from enum import Enum + from web3.types import RPCError +class Faults(Enum): + """ + Fault codes for transaction processing. + These are alternate states that a transaction can enter + other than "finalized". + """ + + # Strategy has been running for too long + TIMEOUT = "timeout" + + # Transaction has been capped and subsequently timed out + PAUSE = "pause" + + # Transaction reverted + REVERT = "revert" + + # Something went wrong + ERROR = "error" + + # ... + INSUFFICIENT_FUNDS = "insufficient_funds" + + class TransactionFinalized(Exception): """raised when a transaction has been included in a block""" @@ -9,12 +34,29 @@ class InsufficientFunds(RPCError): """raised when a transaction exceeds the spending cap""" -class Halt(Exception): +class Wait(Exception): """ Raised when a strategy exceeds a limitation. Used to mark a pending transaction as "wait, don't retry". """ +class Fault(Exception): + """raised when a transaction has been faulted""" + + def __init__( + self, + tx: "AsyncTx", + fault: Faults, + clear: bool, + message: str + ): + self.tx = tx + self.fault = fault + self.message = message + self.clear = clear + super().__init__(message) + + class TransactionReverted(Exception): """raised when a transaction has been reverted""" diff --git a/atxm/machine.py b/atxm/machine.py index c5a1a2f..12c826d 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -1,21 +1,23 @@ import time -from typing import Optional, Union +from copy import deepcopy +from typing import Optional, Union, List, Type from eth_account.signers.local import LocalAccount from twisted.internet.defer import Deferred from web3 import Web3 from web3.exceptions import TransactionNotFound -from web3.types import TxReceipt +from web3.types import TxReceipt, TxParams from atxm.exceptions import ( - Halt, - TransactionReverted, - InsufficientFunds, + Wait, + TransactionReverted, Fault, Faults, ) from atxm.state import _State from atxm.strategies import ( AsyncTxStrategy, - SpeedupStrategy, + InsufficientFundsPause, + TimeoutPause, + FixedRateSpeedUp, ) from atxm.tx import ( FinalizedTx, @@ -23,7 +25,6 @@ PendingTx, TxHash, _make_tx_params, - Fault, ) from atxm.utils import ( _get_average_blocktime, @@ -31,16 +32,14 @@ _handle_rpc_error, fire_hook, ) -from .logging import log from ._task import SimpleTask +from .logging import log class _Machine(SimpleTask): """Do not import this - use the public `AutomaticTxMachine` instead.""" # tweaks - _STRATEGY = SpeedupStrategy - _TIMEOUT = 60 * 60 # 1 hour _TRACKING_CONFIRMATIONS = 300 # blocks until clearing a finalized transaction _RPC_THROTTLE = 1 # min. seconds between RPC calls (>1 recommended) _MIN_INTERVAL = ( @@ -55,18 +54,26 @@ class _Machine(SimpleTask): BLOCK_INTERVAL = 20 # ~20 blocks BLOCK_SAMPLE_SIZE = 10_000 # blocks + __DEFAULT_STRATEGIES: List[Type[AsyncTxStrategy]] = [ + InsufficientFundsPause, + TimeoutPause, + FixedRateSpeedUp, + ] + def __init__( self, w3: Web3, - timeout: Optional[int] = None, - strategy: Optional[AsyncTxStrategy] = None, + strategies: Optional[List[AsyncTxStrategy]] = None, disk_cache: bool = False, ): # public self.w3 = w3 self.signers = {} - self.timeout = timeout or self._TIMEOUT - self.strategy = strategy or self._STRATEGY(w3=w3) + self.strategies = [ + s(w3) for s in self.__DEFAULT_STRATEGIES + ] + if strategies: + self.strategies.extend(list(strategies)) # internal self._state = _State(disk_cache=disk_cache) @@ -111,45 +118,35 @@ def _sleep(self) -> None: # Lifecycle # - def __handle_active_tx(self) -> bool: + def __handle_active_transaction(self) -> bool: """ Handles the currently tracked pending transaction. - The 5 possible outcomes for the pending ("active") transaction in one cycle: + The 4 possible outcomes for the pending ("active") transaction in one cycle: - 0. paused - 1. timeout - 2. reverted + 1. paused + 2. reverted (fault) 3. finalized - 4. strategize: retry or wait - 5. insufficient funds + 4. strategize: retry, wait, or fault Returns True if the next queued transaction can be broadcasted right now. """ - # Outcome 0: the machine is paused + # Outcome 1: the pending transaction is paused by strategies if self.__pause: - sender = self._state.active._from - self.log.critical( - f"[pause] insufficient funds! {sender} has been paused by the {self.strategy.name} strategy" - ) - self._state.fault( - error=sender, fault=Fault.INSUFFICIENT_FUNDS, clear_active=False + self.log.warn( + f"[pause] transaction #{self._state.pending.id} is paused by strategies" ) return False - # Outcome 1: pending transaction has timed out - if self.__active_timed_out(): - return True - try: receipt = self.__get_receipt() # Outcome 2: the pending transaction was reverted (final error) except TransactionReverted: self._state.fault( - error=self._state.active.txhash.hex(), - fault=Fault.REVERT, + error=self._state.pending.txhash.hex(), + fault=Faults.REVERT, clear_active=True, ) return True @@ -157,28 +154,17 @@ def __handle_active_tx(self) -> bool: # Outcome 3: pending transaction is finalized (final success) if receipt: final_txhash = receipt["transactionHash"] - confirmations = self.__get_confirmations(tx=self._state.active) + confirmations = self.__get_confirmations(tx=self._state.pending) self.log.info( - f"[finalized] Transaction #atx-{self._state.active.id} has been finalized " + f"[finalized] Transaction #atx-{self._state.pending.id} has been finalized " f"with {confirmations} confirmations txhash: {final_txhash.hex()}" ) - # clear the active transaction slot self._state.finalize_active_tx(receipt=receipt) return True - # Outcome 5: retry the pending transaction with the strategy - try: - pending_tx = self.__strategize() - if pending_tx: - return False - - # Outcome 6: pending transaction has insufficient funds - except InsufficientFunds: - self.__pause = True - self.log.critical( - f"[fault] pending transaction {self._state.active.txhash.hex()} has insufficient funds!" - ) - return False + # Outcome 4: re-strategize the pending transaction + pending_tx = self.__strategize() + return pending_tx is not None # # Broadcast @@ -214,39 +200,49 @@ def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]: f"[{msg}] fired transaction #atx-{tx.id}|{tx.params['nonce']}|{txhash.hex()}" ) pending_tx = self._state.morph(tx=tx, txhash=txhash) - log.info(f"[state] #atx-{pending_tx.id} queued -> pending") if tx.on_broadcast: fire_hook(hook=tx.on_broadcast, tx=tx) return pending_tx def __strategize(self) -> Optional[PendingTx]: """Retry the currently tracked pending transaction with the configured strategy.""" - if self._state.active.halt: - return - try: - params = self.strategy.execute( - params=_make_tx_params(self._state.active.data), - ) - except Halt as e: - self._state.active.halt = True - self.log.warn( - f"[cap] pending transaction {self._state.active.txhash.hex()} has been capped: {e}" - ) - # It would be nice to re-queue the capped transaction however, - # if the pending tx has already reached the spending cap it cannot - # be sped up again (without increasing the cap). For now, we just - # let the transaction timeout and remove it, but this may cascade - # into a chain of capped transactions if the cap is not increased / is too low. - hook = self._state.active.on_halt - if hook: - fire_hook(hook=hook, tx=self._state.active) - return - - # use the strategy-generated transaction parameters - pending_tx = self.__fire(tx=params, msg=self.strategy.name) + if not self._state.pending: + raise RuntimeError("No active transaction to strategize") + + _active_copy = deepcopy(self._state.pending) + for strategy in self.strategies: + try: + params = strategy.execute(pending=_active_copy) + except Wait as e: + self.__pause = True + self.log.warn( + f"[pause] pending transaction {self._state.pending.txhash.hex()} has been paused: {e}" + ) + hook = self._state.pending.on_pause + if hook: + fire_hook(hook=hook, tx=self._state.pending) + return + except Fault as e: + self._state.fault( + error=self._state.pending.txhash.hex(), + fault=e.fault, + clear_active=e.clear, + ) + return + if params: + _active_copy.params.update(params) + if self.__pause: + self.log.info(f"[pause] pause lifted by strategy {strategy.name}") + self.__pause = False + + # (!) retry the transaction with the new parameters + retry_params = TxParams(_active_copy.params) + _names = ', '.join(s.name for s in self.strategies) + pending_tx = self.__fire(tx=retry_params, msg=_names) self.log.info( - f"[{self.strategy.name}] transaction #{pending_tx.id} has been re-broadcasted" + f"[retry] transaction #{pending_tx.id} has been re-broadcasted" ) + return pending_tx def __broadcast(self) -> Optional[TxHash]: @@ -254,7 +250,7 @@ def __broadcast(self) -> Optional[TxHash]: Attempts to broadcast the next `FutureTx` in the queue. If the broadcast is not successful, it is re-queued. """ - future_tx = self._state.pop() # popleft + future_tx = self._state._pop() # popleft future_tx.params = _make_tx_params(future_tx.params) signer = self.__get_signer(future_tx._from) nonce = self.w3.eth.get_transaction_count(signer.address, "latest") @@ -266,7 +262,7 @@ def __broadcast(self) -> Optional[TxHash]: future_tx.params["nonce"] = nonce pending_tx = self.__fire(tx=future_tx, msg="broadcast") if not pending_tx: - self._state.requeue(future_tx) + self._state._requeue(future_tx) return return pending_tx.txhash @@ -284,13 +280,12 @@ def __get_receipt(self) -> Optional[TxReceipt]: NOTE: Performs state changes """ try: - txdata = self.w3.eth.get_transaction(self._state.active.txhash) - self._state.active.data = txdata + txdata = self.w3.eth.get_transaction(self._state.pending.txhash) + self._state.pending.data = txdata except TransactionNotFound: self.log.error( - f"[error] Transaction {self._state.active.txhash.hex()} not found" + f"[error] Transaction {self._state.pending.txhash.hex()} not found" ) - self._state.clear_active() return receipt = _get_receipt(w3=self.w3, data=txdata) @@ -325,53 +320,6 @@ def __get_confirmations(self, tx: Union[PendingTx, FinalizedTx]) -> int: self.log.info(f"Transaction {txhash.hex()} is pending or unconfirmed") return 0 - def __timeout_fault(self) -> None: - if self._state.active.halt: - self.log.warn( - f"[timeout] Transaction {self._state.active.txhash.hex()} has been pending for more than" - f"{self.timeout} seconds and has been capped by the {self.strategy.name} strategy" - ) - self._state.fault( - error=self.strategy.name, fault=Fault.HALT, clear_active=True - ) - return - - self.log.warn( - f"[timeout] Transaction {self._state.active.txhash.hex()} has been pending for more than" - f"{self.timeout} seconds" - ) - self._state.fault(fault=Fault.TIMEOUT, clear_active=True) - - def __active_timed_out(self) -> bool: - """Returns True if the active transaction has timed out.""" - if not self._state.active: - return False - timeout = (time.time() - self._state.active.created) > self.timeout - if timeout: - self.__timeout_fault() - return True - - time_remaining = round( - self.timeout - (time.time() - self._state.active.created) - ) - minutes = round(time_remaining / 60) - remainder_seconds = time_remaining % 60 - end_time = time.time() + time_remaining - human_end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)) - if time_remaining < (60 * 2): - self.log.warn( - f"[timeout] Transaction {self._state.active.txhash.hex()} will timeout in " - f"{minutes}m{remainder_seconds}s at {human_end_time}" - ) - else: - self.log.info( - f"[pending] {self._state.active.txhash.hex()} \n" - f"[pending] {round(time.time() - self._state.active.created)}s Elapsed | " - f"{minutes}m{remainder_seconds}s Remaining | " - f"Timeout at {human_end_time}" - ) - return False - def __monitor_finalized(self) -> None: """Follow-up on finalized transactions for a little while.""" if not self._state.finalized: @@ -408,46 +356,44 @@ def handle_errors(self, *args, **kwargs) -> None: def run(self) -> None: """Execute one cycle""" + if self.__pause: + self.log.warn(f"[pause] paused") + return + self.__monitor_finalized() if not self.busy: self.log.info(f"[idle] cycle interval is {self._task.interval} seconds") return + # sample block time and adjust the interval + # every work cycle. self.__work_mode() self.log.info( - f"[working] tracking {len(self._state.waiting)} queued " - f"transaction{'s' if len(self._state.waiting) > 1 else ''} " - f"{'and 1 pending transaction' if self._state.active else ''}" + f"[working] tracking {len(self._state.queue)} queued " + f"transaction{'s' if len(self._state.queue) > 1 else ''} " + f"{'and 1 pending transaction' if self._state.pending else ''}" ) - if self._state.active: - clear = self.__handle_active_tx() - if not clear: - # active tx still pending. wait for next cycle. - return + if self._state.pending: + # There is an active transaction + self.__handle_active_transaction() - if self.fire: + elif self._state.queue: + # There is no active transaction + # and there are queued transactions self.__broadcast() + # After one work cycle, return to idle mode + # if the machine is not busy. if not self.busy: self.__idle_mode() - @property - def fire(self) -> bool: - """ - Returns True if the next queued transaction can be broadcasted right now. - - - Qualification: There is no active transaction and - there are transactions waiting in the queue. - """ - return self._state.waiting and not self._state.active - @property def busy(self) -> bool: """Returns True if the machine is busy.""" - if self._state.active: + if self._state.pending: return True - if len(self._state.waiting) > 0: + if len(self._state.queue) > 0: return True return False diff --git a/atxm/main.py b/atxm/main.py index 5c78ca6..f16560f 100644 --- a/atxm/main.py +++ b/atxm/main.py @@ -16,12 +16,12 @@ class AutomaticTxMachine(_Machine): @property def queued(self) -> List[FutureTx]: """Return a list of queued transactions.""" - return list(self._state.waiting) + return list(self._state.queue) @property def pending(self) -> PendingTx: """Return the active transaction if there is one.""" - return self._state.active or None + return self._state.pending or None @property def finalized(self) -> Set[FinalizedTx]: @@ -43,7 +43,7 @@ def queue_transaction( """ if signer.address not in self.signers: self.signers[signer.address] = signer - tx = self._state.queue(_from=signer.address, params=params, *args, **kwargs) + tx = self._state._queue(_from=signer.address, params=params, *args, **kwargs) if not self._task.running: self._wake() return tx diff --git a/atxm/state.py b/atxm/state.py index f4eaa89..85cf654 100644 --- a/atxm/state.py +++ b/atxm/state.py @@ -1,24 +1,25 @@ import json import time from collections import deque +from copy import deepcopy, copy from json import JSONDecodeError from pathlib import Path -from typing import Callable, Deque, Dict, Optional, Set +from typing import Callable, Deque, Dict, Optional, Set, Tuple from eth_typing import ChecksumAddress from web3.types import TxParams, TxReceipt +from atxm.exceptions import Faults +from atxm.logging import log from atxm.tx import ( FinalizedTx, FutureTx, PendingTx, TxHash, AsyncTx, - Fault, FaultyTx, ) from atxm.utils import fire_hook -from atxm.logging import log class _State: @@ -118,7 +119,7 @@ def morph(self, tx: FutureTx, txhash: TxHash) -> PendingTx: def fault( self, - fault: Fault, + fault: Faults, clear_active: bool, error: Optional[str] = None, ) -> None: @@ -164,25 +165,25 @@ def clear_active(self) -> None: self.commit() log.debug( f"[state] cleared 1 pending transaction \n" - f"[state] {len(self.waiting)} queued " - f"transaction{'s' if len(self.waiting) != 1 else ''} remaining" + f"[state] {len(self.queue)} queued " + f"transaction{'s' if len(self.queue) != 1 else ''} remaining" ) @property - def active(self) -> Optional[PendingTx]: + def pending(self) -> Optional[PendingTx]: """Return the active pending transaction if there is one.""" - return self.__active + return copy(self.__active) @property - def waiting(self) -> Deque[FutureTx]: + def queue(self) -> Tuple[FutureTx, ...]: """Return the queue of transactions.""" - return self.__queue + return tuple(self.__queue) - def pop(self) -> FutureTx: + def _pop(self) -> FutureTx: """Pop the next transaction from the queue.""" return self.__queue.popleft() - def requeue(self, tx: FutureTx) -> None: + def _requeue(self, tx: FutureTx) -> None: """Re-queue a transaction for broadcast and subsequent tracking.""" self.__queue.append(tx) self.commit() @@ -191,14 +192,14 @@ def requeue(self, tx: FutureTx) -> None: f"priority {len(self.__queue)}" ) - def queue( + def _queue( self, params: TxParams, _from: ChecksumAddress, info: Dict[str, str] = None, on_broadcast: Optional[Callable] = None, on_finalized: Optional[Callable] = None, - on_halt: Optional[Callable] = None, + on_pause: Optional[Callable] = None, on_fault: Optional[Callable] = None, ) -> FutureTx: """Queue a new transaction for broadcast and subsequent tracking.""" @@ -212,7 +213,7 @@ def queue( # configure hooks tx.on_broadcast = on_broadcast tx.on_finalized = on_finalized - tx.on_halt = on_halt + tx.on_pause = on_pause tx.on_fault = on_fault self.__queue.append(tx) diff --git a/atxm/strategies.py b/atxm/strategies.py index baed1df..06a37e4 100644 --- a/atxm/strategies.py +++ b/atxm/strategies.py @@ -1,16 +1,17 @@ +import time from abc import ABC -from typing import Tuple +from typing import Tuple, Optional from web3 import Web3 -from web3.types import Gwei, TxParams, Wei +from web3.types import Gwei, TxParams, Wei, PendingTx from atxm.exceptions import ( - Halt, + Wait, Fault, Faults, ) +from atxm.logging import log from atxm.utils import ( _log_gas_weather, ) -from atxm.logging import log class AsyncTxStrategy(ABC): @@ -20,28 +21,118 @@ class AsyncTxStrategy(ABC): def __init__(self, w3: Web3): self.w3 = w3 + self.log = log @property def name(self) -> str: """Used to identify the strategy in logs.""" return self._NAME - def execute(self, params: TxParams) -> TxParams: + def execute(self, pending: PendingTx) -> TxParams: """ Execute the strategy. - Will be called by the transaction machine when a - transaction is ready to be retried. Accepts a TxParams - dictionary containing transaction data from the most - recent previous attempt. + Called by the transaction machine when a + transaction is ready to be strategized. Accepts a PendingTx + object with data from the most recent previous attempt + (like tx.txhash, tx.params, tx.created, etc). + + This method must do one of the following: + - Raise `Wait` to pause retries and wait around for a bit. + - Raise `Fault`to signal the transaction cannot be retried. + - Returns a TxParams dictionary to use in the next attempt. + + NOTE: Do not mutate the input `tx` object. Return a new TxParams + dictionary with the updated transaction parameters. The input + object is already deeply copied to avoid accidental mutation, + but it's best to be mindful of this. + + CAUTION: please be mindful that the purpose of this middleware + is to mutate transaction parameters and not to broadcast + transactions. Broadcasting transactions is handled by the + transaction machine and not by the strategies. + + WARNING: The parameters returned by this method will be + signed by a hot wallet and broadcast to the network immediately. + Please be mindful of the security implications of the + parameters you return. - Must returns a new TxParams dictionary to use for the - next attempt. """ raise NotImplementedError -class SpeedupStrategy(AsyncTxStrategy): +class InsufficientFundsPause(AsyncTxStrategy): + """Pause strategy for pending transactions.""" + + _NAME = "insufficient-funds" + + def execute(self, pending: PendingTx) -> TxParams: + balance = self.w3.eth.get_balance(pending._from) + if balance == 0: + self.log.warn(f"Insufficient funds for transaction #{pending.params['nonce']}") + raise Fault( + tx=pending, + fault=Faults.INSUFFICIENT_FUNDS, + message="Insufficient funds", + clear=False, + ) + # log.warn(f"Insufficient funds for transaction #{pending.params['nonce']}") + # raise Wait("Insufficient funds") + return pending.params + + +class TimeoutPause(AsyncTxStrategy): + """Pause strategy for pending transactions.""" + + _NAME = "timeout" + _TIMEOUT = 60 * 60 # 1 hour in seconds + + def __init__(self, w3: Web3, timeout: Optional[int] = None): + super().__init__(w3) + self.timeout = timeout or self._TIMEOUT + + def __active_timed_out(self, pending: PendingTx) -> bool: + """Returns True if the active transaction has timed out.""" + if not pending: + return False + timeout = (time.time() - pending.created) > self.timeout + if timeout: + return True + + time_remaining = round( + self.timeout - (time.time() - pending.created) + ) + minutes = round(time_remaining / 60) + remainder_seconds = time_remaining % 60 + end_time = time.time() + time_remaining + human_end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)) + if time_remaining < (60 * 2): + self.log.warn( + f"[timeout] Transaction {pending.txhash.hex()} will timeout in " + f"{minutes}m{remainder_seconds}s at {human_end_time}" + ) + else: + self.log.info( + f"[pending] {pending.txhash.hex()} \n" + f"[pending] {round(time.time() - pending.created)}s Elapsed | " + f"{minutes}m{remainder_seconds}s Remaining | " + f"Timeout at {human_end_time}" + ) + return False + + def execute(self, pending: PendingTx) -> TxParams: + timeout = self.__active_timed_out(pending) + if timeout: + raise Fault( + tx=pending, + fault=Faults.TIMEOUT, + message="Transaction has timed out", + clear=True, # signal to clear the active transaction + ) + return pending.params + + +class FixedRateSpeedUp(AsyncTxStrategy): """Speedup strategy for pending transactions.""" SPEEDUP_FACTOR = 1.125 # 12.5% increase @@ -49,29 +140,30 @@ class SpeedupStrategy(AsyncTxStrategy): _NAME = f"speedup-{SPEEDUP_FACTOR}%" - def _calculate_speedup_fee(self, tx: TxParams) -> Tuple[Wei, Wei]: + def _calculate_speedup_fee(self, pending: TxParams) -> Tuple[Wei, Wei]: base_fee = self.w3.eth.get_block("latest")["baseFeePerGas"] suggested_tip = self.w3.eth.max_priority_fee _log_gas_weather(base_fee, suggested_tip) max_priority_fee = round( - max(tx["maxPriorityFeePerGas"], suggested_tip) * self.SPEEDUP_FACTOR + max(pending["maxPriorityFeePerGas"], suggested_tip) * self.SPEEDUP_FACTOR ) max_fee_per_gas = round( max( - tx["maxFeePerGas"] * self.SPEEDUP_FACTOR, + pending["maxFeePerGas"] * self.SPEEDUP_FACTOR, (base_fee * 2) + max_priority_fee, ) ) return max_priority_fee, max_fee_per_gas - def execute(self, params: TxParams) -> TxParams: + def execute(self, pending: PendingTx) -> TxParams: + params = pending.params old_tip, old_max_fee = params["maxPriorityFeePerGas"], params["maxFeePerGas"] - new_tip, new_max_fee = self._calculate_speedup_fee(tx=params) + new_tip, new_max_fee = self._calculate_speedup_fee(params) tip_increase = round(Web3.from_wei(new_tip - old_tip, "gwei"), 4) fee_increase = round(Web3.from_wei(new_max_fee - old_max_fee, "gwei"), 4) if new_tip > self.MAX_TIP: - raise Halt( + raise Wait( f"Pending transaction maxPriorityFeePerGas exceeds spending cap {self.MAX_TIP}" ) diff --git a/atxm/tx.py b/atxm/tx.py index 119247c..70eb686 100644 --- a/atxm/tx.py +++ b/atxm/tx.py @@ -1,6 +1,5 @@ from abc import ABC, abstractmethod from dataclasses import dataclass, field -from enum import Enum from typing import Callable, Dict, Optional from eth_typing import ChecksumAddress @@ -8,40 +7,19 @@ from hexbytes import HexBytes from web3.types import TxData, TxParams, TxReceipt -TxHash = HexBytes - - -class Fault(Enum): - """ - Fault codes for transaction processing. - These are alternate states that a transaction can enter - other than "finalized". - """ +from atxm.exceptions import Faults - # Strategy has been running for too long - TIMEOUT = "timeout" - - # Transaction has been capped and subsequently timed out - HALT = "halt" - - # Transaction reverted - REVERT = "revert" - - # Something went wrong - ERROR = "error" - - # ... - INSUFFICIENT_FUNDS = "insufficient_funds" +TxHash = HexBytes @dataclass class AsyncTx(ABC): id: int final: bool = field(default=None, init=False) - fault: Optional[Fault] = field(default=None, init=False) + fault: Optional[Faults] = field(default=None, init=False) on_broadcast: Optional[Callable] = field(default=None, init=False) on_finalized: Optional[Callable] = field(default=None, init=False) - on_halt: Optional[Callable] = field(default=None, init=False) + on_pause: Optional[Callable] = field(default=None, init=False) on_fault: Optional[Callable] = field(default=None, init=False) def __repr__(self): @@ -99,7 +77,6 @@ class PendingTx(AsyncTx): txhash: TxHash created: int data: Optional[TxData] = None - halt: bool = False def __hash__(self) -> int: return hash(self.txhash) @@ -110,7 +87,6 @@ def to_dict(self) -> Dict: "txhash": self.txhash.hex(), "created": self.created, "data": self.data, - "halt": self.halt, } @classmethod @@ -119,7 +95,6 @@ def from_dict(cls, data: Dict): id=int(data["id"]), txhash=HexBytes(data["txhash"]), created=int(data["created"]), - halt=bool(data["halt"]), data=dict(data) if data else dict(), ) @@ -144,7 +119,7 @@ def from_dict(cls, data: Dict): @dataclass class FaultyTx(AsyncTx): final: bool = field(default=False, init=False) - fault: Fault + fault: Faults error: Optional[str] = None def __hash__(self) -> int: @@ -156,7 +131,7 @@ def to_dict(self) -> Dict: @classmethod def from_dict(cls, data: Dict): return cls( - id=int(data["id"]), error=str(data["error"]), fault=Fault(data["fault"]) + id=int(data["id"]), error=str(data["error"]), fault=Faults(data["fault"]) ) diff --git a/pyproject.toml b/pyproject.toml index a246a24..c372290 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ [project.optional-dependencies] # Optional test = [ "pytest", + "pytest-mock", "pytest-twisted", "eth-ape", ] @@ -33,4 +34,4 @@ Issues = "https://github.com/nucypher/atxm/issues" # These are the assumed default build requirements from pip: # https://pip.pypa.io/en/stable/reference/pip/#pep-517-and-518-support requires = ["setuptools>=43.0.0", "wheel"] -build-backend = "setuptools.build_meta" \ No newline at end of file +build-backend = "setuptools.build_meta" diff --git a/tests/conftest.py b/tests/conftest.py index 9b58d92..df8542b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -64,6 +64,11 @@ def clock(machine): return machine._task.clock +@pytest.fixture +def interval(machine): + return 1 + + @pytest.fixture(autouse=True) def mock_wake_sleep(machine, mocker): wake = mocker.patch.object(machine, "_wake") diff --git a/tests/test_machine.py b/tests/test_machine.py index 3138043..5b10f67 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -83,10 +83,9 @@ def test_broadcast(machine, clock, eip1559_transaction, account, mocker): # The transaction is no longer queued assert len(machine.queued) == 0 - assert machine.pending is atx + assert machine.pending == atx assert isinstance(machine.pending, PendingTx) - assert atx is machine.pending assert isinstance(atx, PendingTx) assert not atx.final assert atx.txhash @@ -116,7 +115,7 @@ def test_finalize( while machine.pending is None: yield clock.advance(1) machine.stop() - assert machine.pending is atx + assert machine.pending == atx # advance time to finalize the transaction machine.start(now=True) @@ -147,14 +146,14 @@ def test_follow(chain, machine, clock, eip1559_transaction, account, mock_wake_s signer=account, ) - machine.start(now=True) + machine.start() while not machine.finalized: yield clock.advance(1) assert atx.final is True - while machine.finalized: + while len(machine.finalized) > 0: yield clock.advance(1) yield chain.mine(1) @@ -162,7 +161,11 @@ def test_follow(chain, machine, clock, eip1559_transaction, account, mock_wake_s assert len(machine.finalized) == 0 assert len(machine.queued) == 0 + assert len(machine.faults) == 0 assert machine.pending is None assert not machine.busy + + # wait for the hook to be called + yield deferLater(reactor, 0.2, lambda: None) assert sleep.call_count == 1