diff --git a/atxm/exceptions.py b/atxm/exceptions.py index d09142f..05dfa17 100644 --- a/atxm/exceptions.py +++ b/atxm/exceptions.py @@ -1,6 +1,6 @@ from enum import Enum -from web3.types import PendingTx, RPCError, TxReceipt +from web3.types import PendingTx, TxReceipt class Fault(Enum): @@ -13,9 +13,6 @@ class Fault(Enum): # Strategy has been running for too long TIMEOUT = "timeout" - # Transaction has been capped and subsequently timed out - PAUSE = "pause" - # Transaction reverted REVERT = "revert" @@ -26,17 +23,10 @@ class Fault(Enum): INSUFFICIENT_FUNDS = "insufficient_funds" -class InsufficientFunds(RPCError): +class InsufficientFunds(Exception): """raised when a transaction exceeds the spending cap""" -class Wait(Exception): - """ - Raised when a strategy exceeds a limitation. - Used to mark a pending transaction as "wait, don't retry". - """ - - class TransactionFaulted(Exception): """Raised when a transaction has been faulted.""" diff --git a/atxm/machine.py b/atxm/machine.py index bb0b160..738e741 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -1,23 +1,30 @@ -from copy import deepcopy +from copy import copy, deepcopy from typing import List, Optional, Type from eth_account.signers.local import LocalAccount +from eth_utils import ValidationError from statemachine import State, StateMachine from twisted.internet import reactor from twisted.internet.defer import Deferred from twisted.internet.task import LoopingCall from web3 import Web3 +from web3.exceptions import Web3Exception from web3.types import TxParams -from atxm.exceptions import TransactionFaulted, TransactionReverted, Wait +from atxm.exceptions import ( + Fault, + InsufficientFunds, + TransactionFaulted, + TransactionReverted, +) from atxm.strategies import ( AsyncTxStrategy, - FixedRateSpeedUp, - InsufficientFundsPause, + ExponentialSpeedupStrategy, TimeoutStrategy, ) from atxm.tracker import _TxTracker from atxm.tx import ( + AsyncTx, FutureTx, PendingTx, TxHash, @@ -26,7 +33,7 @@ _get_average_blocktime, _get_confirmations, _get_receipt, - _handle_rpc_error, + _is_recoverable_send_tx_error, _make_tx_params, fire_hook, ) @@ -90,11 +97,13 @@ class _Machine(StateMachine): _BLOCK_SAMPLE_SIZE = 10_000 # blocks STRATEGIES: List[Type[AsyncTxStrategy]] = [ - InsufficientFundsPause, TimeoutStrategy, - FixedRateSpeedUp, + ExponentialSpeedupStrategy, ] + # max requeues/retries + _MAX_REDO_ATTEMPTS = 3 + class LogObserver: """StateMachine observer for logging information about state/transitions.""" @@ -240,7 +249,7 @@ def _sleep(self) -> None: # Lifecycle # - def __handle_active_transaction(self) -> bool: + def __handle_active_transaction(self) -> None: """ Handles the currently tracked pending transaction. @@ -249,7 +258,7 @@ def __handle_active_transaction(self) -> bool: 1. paused 2. reverted (fault) 3. finalized - 4. strategize: retry, wait, or fault + 4. strategize: retry, do nothing and wait, or fault Returns True if the next queued transaction can be broadcasted right now. """ @@ -262,7 +271,7 @@ def __handle_active_transaction(self) -> bool: # Outcome 2: the pending transaction was reverted (final error) except TransactionReverted as e: self._tx_tracker.fault(fault_error=e) - return True + return # Outcome 3: pending transaction is finalized (final success) if receipt: @@ -273,14 +282,13 @@ def __handle_active_transaction(self) -> bool: f"with {confirmations} confirmation(s) txhash: {final_txhash.hex()}" ) self._tx_tracker.finalize_active_tx(receipt=receipt) - return True + return # Outcome 4: re-strategize the pending transaction - pending_tx = self.__strategize() - return pending_tx is not None + self.__strategize() # - # Broadcast + # Broadcast tx # def __get_signer(self, address: str) -> LocalAccount: @@ -290,7 +298,7 @@ def __get_signer(self, address: str) -> LocalAccount: raise ValueError(f"Signer {address} not found") return signer - def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]: + def __fire(self, tx: AsyncTx, msg: str) -> TxHash: """ Signs and broadcasts a transaction, handling RPC errors and internal state changes. @@ -301,34 +309,34 @@ def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]: Morphs a `FutureTx` into a `PendingTx` and advances it into the active transaction slot if broadcast is successful. """ - signer: LocalAccount = self.__get_signer(tx._from) + signer: LocalAccount = self.__get_signer(tx.params["from"]) try: txhash = self.w3.eth.send_raw_transaction( signer.sign_transaction(tx.params).rawTransaction ) - except ValueError as e: - _handle_rpc_error(e, tx=tx) - return + except ValidationError as e: + # special case for insufficient funds + if "Sender does not have enough" in str(e): + # TODO raised exception should be handled in some way #13. + raise InsufficientFunds + + raise e + self.log.info( f"[{msg}] fired transaction #atx-{tx.id}|{tx.params['nonce']}|{txhash.hex()}" ) - pending_tx = self._tx_tracker.morph(tx=tx, txhash=txhash) - if tx.on_broadcast: - fire_hook(hook=tx.on_broadcast, tx=pending_tx) - return pending_tx + return txhash - def __strategize(self) -> Optional[PendingTx]: + def __strategize(self) -> None: """Retry the currently tracked pending transaction with the configured strategy.""" if not self._tx_tracker.pending: raise RuntimeError("No active transaction to strategize") _active_copy = deepcopy(self._tx_tracker.pending) + params_updated = False for strategy in self._strategies: try: params = strategy.execute(pending=_active_copy) - except Wait as e: - log.info(f"[wait] strategy {strategy.__class__} signalled wait: {e}") - return except TransactionFaulted as e: self._tx_tracker.fault(fault_error=e) return @@ -336,35 +344,123 @@ def __strategize(self) -> Optional[PendingTx]: # in case the strategy accidentally returns None # keep the parameters as they are. _active_copy.params.update(params) + params_updated = True + + if not params_updated: + # TODO is this a potential forever wait - this is really controlled by strategies + # who can no longer do anything. if we limit the wait here then the TimeoutStrategy + # becomes useless - something to think about. #14 + log.info( + f"[wait] strategies made no suggested updates to " + f"pending tx #{_active_copy.id} - skipping retry round" + ) + return # (!) retry the transaction with the new parameters - retry_params = TxParams(_active_copy.params) _names = " -> ".join(s.name for s in self._strategies) - pending_tx = self.__fire(tx=retry_params, msg=_names) + + try: + txhash = self.__fire(tx=_active_copy, msg=_names) + except InsufficientFunds: + # special case re-raise insufficient funds (for now) + # TODO #13 + # TODO should the following also be done? + # self._tx_tracker.update_failed_retry_attempt(_active_copy) + raise + except (ValidationError, Web3Exception, ValueError) as e: + self._tx_tracker.update_failed_retry_attempt(_active_copy) + self.__handle_retry_failure(_active_copy, e) + return + + _active_copy.txhash = txhash + self._tx_tracker.update_after_retry(_active_copy) + + pending_tx = self._tx_tracker.pending self.log.info(f"[retry] transaction #{pending_tx.id} has been re-broadcasted") + if pending_tx.on_broadcast: + fire_hook(hook=pending_tx.on_broadcast, tx=pending_tx) - return pending_tx + def __handle_retry_failure(self, pending_tx: PendingTx, e: Exception): + log.warn( + f"[retry] transaction #atx-{pending_tx.id}|{pending_tx.params['nonce']} " + f"failed with updated params - {str(e)}; retry again next round" + ) + + if pending_tx.retries >= self._MAX_REDO_ATTEMPTS: + log.error( + f"[retry] transaction #atx-{pending_tx.id}|{pending_tx.params['nonce']} " + f"failed for { pending_tx.retries} attempts; tx will no longer be retried" + ) + + fault_error = TransactionFaulted( + tx=pending_tx, + fault=Fault.ERROR, + message=str(e), + ) + self._tx_tracker.fault(fault_error=fault_error) - def __broadcast(self) -> Optional[TxHash]: + def __broadcast(self): """ Attempts to broadcast the next `FutureTx` in the queue. If the broadcast is not successful, it is re-queued. """ - future_tx = self._tx_tracker._pop() # popleft + future_tx = self._tx_tracker.pop() # popleft future_tx.params = _make_tx_params(future_tx.params) + + # update nonce as necessary signer = self.__get_signer(future_tx._from) nonce = self.w3.eth.get_transaction_count(signer.address, "latest") if nonce > future_tx.params["nonce"]: self.log.warn( f"[broadcast] nonce {future_tx.params['nonce']} has been front-run " - f"by another transaction. Updating queued tx nonce {future_tx.params['nonce']} -> {nonce}" + f"by another transaction. Updating queued tx " + f"nonce {future_tx.params['nonce']} -> {nonce}" ) future_tx.params["nonce"] = nonce - pending_tx = self.__fire(tx=future_tx, msg="broadcast") - if not pending_tx: - self._tx_tracker._requeue(future_tx) + + try: + txhash = self.__fire(tx=future_tx, msg="broadcast") + except InsufficientFunds: + # special case re-raise insufficient funds (for now) + # TODO #13 + raise + except (ValidationError, Web3Exception, ValueError) as e: + # either requeue OR fail and move on to subsequent txs + self.__handle_broadcast_failure(future_tx, e) return - return pending_tx.txhash + + self._tx_tracker.morph(tx=future_tx, txhash=txhash) + pending_tx = self._tx_tracker.pending + if pending_tx.on_broadcast: + fire_hook(hook=pending_tx.on_broadcast, tx=pending_tx) + + def __handle_broadcast_failure(self, future_tx: FutureTx, e: Exception): + is_broadcast_failure = False + if _is_recoverable_send_tx_error(e): + if future_tx.requeues >= self._MAX_REDO_ATTEMPTS: + is_broadcast_failure = True + log.error( + f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} " + f"failed for {future_tx.requeues} attempts; tx will not be requeued" + ) + else: + log.warn( + f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} " + f"failed - {str(e)}; requeueing tx" + ) + self._tx_tracker.requeue(future_tx) + else: + # non-recoverable + is_broadcast_failure = True + log.error( + f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} " + f"has non-recoverable failure - {str(e)}; tx will not be requeued" + ) + + if is_broadcast_failure: + hook = future_tx.on_broadcast_failure + if hook: + fire_hook(hook, future_tx, e) # # Monitoring @@ -379,7 +475,7 @@ def __monitor_finalized(self) -> None: if tx in self._tx_tracker.finalized: self._tx_tracker.finalized.remove(tx) self.log.info( - f"[clear] stopped tracking {tx.txhash.hex()} after {confirmations} confirmations" + f"[monitor] stopped tracking {tx.txhash.hex()} after {confirmations} confirmations" ) continue self.log.info( @@ -418,9 +514,17 @@ def queue_transaction( if signer.address not in self.signers: self.signers[signer.address] = signer - tx = self._tx_tracker._queue( - _from=signer.address, params=params, *args, **kwargs - ) + params_copy = copy(params) + + from_param = params_copy.get("from") + if from_param is None: + params_copy["from"] = signer.address + if from_param and from_param != signer.address: + raise ValueError( + f"Mismatched 'from' value ({from_param}) and 'signer' account ({signer.address})" + ) + + tx = self._tx_tracker.queue_tx(params=params_copy, *args, **kwargs) if not previously_busy_or_paused: self._wake() diff --git a/atxm/strategies.py b/atxm/strategies.py index b44fa96..9ac5b33 100644 --- a/atxm/strategies.py +++ b/atxm/strategies.py @@ -1,13 +1,13 @@ +import math from abc import ABC from datetime import datetime, timedelta -from typing import Tuple, Optional +from typing import Optional, Tuple from web3 import Web3 -from web3.types import Gwei, TxParams, Wei, PendingTx +from web3.types import PendingTx, TxParams from atxm.exceptions import ( Fault, - Wait, TransactionFaulted, ) from atxm.logging import log @@ -30,7 +30,7 @@ def name(self) -> str: """Used to identify the strategy in logs.""" return self._NAME - def execute(self, pending: PendingTx) -> TxParams: + def execute(self, pending: PendingTx) -> Optional[TxParams]: """ Execute the strategy. @@ -40,9 +40,10 @@ def execute(self, pending: PendingTx) -> TxParams: (like tx.txhash, tx.params, tx.created, etc). This method must do one of the following: - - Raise `Wait` to pause retries and wait around for a bit. - - Raise `Fault`to signal the transaction cannot be retried. - - Returns a TxParams dictionary to use in the next attempt. + - Raise `TransactionFaulted`to signal the transaction cannot be retried. + - Returns an updated TxParams dictionary to use in the next attempt. + - Returns None if the strategy makes no changes to the existing TxParams and + signal that the machine should just wait for the existing tx NOTE: Do not mutate the input `tx` object. Return a new TxParams dictionary with the updated transaction parameters. The input @@ -63,27 +64,6 @@ def execute(self, pending: PendingTx) -> TxParams: raise NotImplementedError -class InsufficientFundsPause(AsyncTxStrategy): - """Pause strategy for pending transactions.""" - - _NAME = "insufficient-funds" - - def execute(self, pending: PendingTx) -> TxParams: - balance = self.w3.eth.get_balance(pending._from) - if balance == 0: - self.log.warn( - f"Insufficient funds for transaction #{pending.params['nonce']}" - ) - raise TransactionFaulted( - tx=pending, - fault=Fault.INSUFFICIENT_FUNDS, - message="Insufficient funds", - ) - # log.warn(f"Insufficient funds for transaction #{pending.params['nonce']}") - # raise Wait("Insufficient funds") - return pending.params - - class TimeoutStrategy(AsyncTxStrategy): """Timeout strategy for pending transactions.""" @@ -91,11 +71,13 @@ class TimeoutStrategy(AsyncTxStrategy): _TIMEOUT = 60 * 60 # 1 hour in seconds - _WARN_FACTOR = 0.05 # 10% of timeout remaining + _WARN_FACTOR = 0.15 # 15% of timeout remaining def __init__(self, w3: Web3, timeout: Optional[int] = None): super().__init__(w3) self.timeout = timeout or self._TIMEOUT + # use 30s as default in case timeout is too small for warn factor + self._warn_threshold = max(30, self.timeout * self._WARN_FACTOR) def __active_timed_out(self, pending: PendingTx) -> bool: """Returns True if the active transaction has timed out.""" @@ -110,21 +92,21 @@ def __active_timed_out(self, pending: PendingTx) -> bool: end_time = creation_time + timedelta(seconds=self.timeout) time_remaining = end_time - now human_end_time = end_time.strftime("%Y-%m-%d %H:%M:%S") - if time_remaining.seconds < (self.timeout * self._WARN_FACTOR): + if time_remaining.seconds < self._warn_threshold: self.log.warn( - f"[pending_timeout] Transaction {pending.txhash.hex()} will timeout in " + f"[timeout] Transaction {pending.txhash.hex()} will timeout in " f"{time_remaining} at {human_end_time}" ) else: self.log.info( - f"[pending] {pending.txhash.hex()} " + f"[timeout] {pending.txhash.hex()} " f"{elapsed_time.seconds}s Elapsed | " f"{time_remaining} Remaining | " f"Timeout at {human_end_time}" ) return False - def execute(self, pending: PendingTx) -> TxParams: + def execute(self, pending: PendingTx) -> Optional[TxParams]: if not pending: # should never get here raise RuntimeError("pending tx should not be None") @@ -136,57 +118,147 @@ def execute(self, pending: PendingTx) -> TxParams: fault=Fault.TIMEOUT, message="Transaction has timed out", ) - return pending.params + return None -class FixedRateSpeedUp(AsyncTxStrategy): - """Speedup strategy for pending transactions.""" +class ExponentialSpeedupStrategy(AsyncTxStrategy): + """ + Speedup strategy for pending transactions that increases fees by a + percentage over the prior value every time it is used. + """ - SPEEDUP_FACTOR = 1.125 # 12.5% increase - MAX_TIP = Gwei(1) # gwei maxPriorityFeePerGas per transaction + _SPEEDUP_INCREASE_PERCENTAGE = 0.125 # 12.5% - _NAME = f"speedup-{SPEEDUP_FACTOR}%" + _MIN_SPEEDUP_INCREASE = 0.10 # mandated by eth standard - def _calculate_speedup_fee(self, pending: TxParams) -> Tuple[Wei, Wei]: - base_fee = self.w3.eth.get_block("latest")["baseFeePerGas"] + _MAX_TIP_FACTOR = 3 # max 3x over suggested tip + + _NAME = "speedup" + + _GAS_PRICE_FIELD = "gasPrice" + _MAX_FEE_PER_GAS_FIELD = "maxFeePerGas" + _MAX_PRIORITY_FEE_PER_GAS_FIELD = "maxPriorityFeePerGas" + + def __init__( + self, + w3: Web3, + speedup_increase_percentage: float = _SPEEDUP_INCREASE_PERCENTAGE, + max_tip_factor: int = _MAX_TIP_FACTOR, + ): + super().__init__(w3) + + if ( + speedup_increase_percentage < self._MIN_SPEEDUP_INCREASE + or speedup_increase_percentage > 1 + ): + raise ValueError( + f"Invalid speedup increase percentage {speedup_increase_percentage}; " + f"must be in range [0.10, 1]" + ) + if max_tip_factor <= 1: + raise ValueError(f"Invalid max tip factor {max_tip_factor}; must be > 1") + + self.speedup_factor = 1 + speedup_increase_percentage + self.max_tip_factor = max_tip_factor + + def _calculate_eip1559_speedup_fee(self, params: TxParams) -> Tuple[int, int, int]: + current_base_fee = self.w3.eth.get_block("latest")["baseFeePerGas"] suggested_tip = self.w3.eth.max_priority_fee - _log_gas_weather(base_fee, suggested_tip) - max_priority_fee = round( - max(pending["maxPriorityFeePerGas"], suggested_tip) * self.SPEEDUP_FACTOR + _log_gas_weather(current_base_fee, suggested_tip) + + # default to 1 if not specified in tx + prior_max_priority_fee = params.get(self._MAX_PRIORITY_FEE_PER_GAS_FIELD, 0) + updated_max_priority_fee = math.ceil( + max(prior_max_priority_fee, suggested_tip) * self.speedup_factor ) - max_fee_per_gas = round( - max( - pending["maxFeePerGas"] * self.SPEEDUP_FACTOR, - (base_fee * 2) + max_priority_fee, + + current_max_fee_per_gas = params.get(self._MAX_FEE_PER_GAS_FIELD) + if current_max_fee_per_gas: + # already previously set, just increase by factor but ensure base fee hasn't + # also increased. The defaults used by web3py for this value is already pretty + # high so don't overdo the multiplication factor. + updated_max_fee_per_gas = math.ceil( + max( + # last attempt param + current_max_fee_per_gas * self.speedup_factor, + # OR take current conditions and speedup + (current_base_fee * self.speedup_factor) + updated_max_priority_fee, + ) ) - ) - return max_priority_fee, max_fee_per_gas + else: + # not previously set, set to same default as web3py (transactions.py) + updated_max_fee_per_gas = math.ceil( + updated_max_priority_fee + (current_base_fee * 2) + ) + + return suggested_tip, updated_max_priority_fee, updated_max_fee_per_gas + + def _calculate_legacy_speedup_fee(self, params: TxParams) -> int: + generated_gas_price = ( + self.w3.eth.generate_gas_price(params) or 0 + ) # 0 means no gas strategy + old_gas_price = params[self._GAS_PRICE_FIELD] + + base_price_to_increase = old_gas_price + if generated_gas_price > old_gas_price: + log.info( + f"[speedup] increase gas price based on updated generated " + f"gas price value {generated_gas_price} vs prior value {old_gas_price}" + ) + base_price_to_increase = generated_gas_price + + return math.ceil(base_price_to_increase * self.speedup_factor) - def execute(self, pending: PendingTx) -> TxParams: + def execute(self, pending: PendingTx) -> Optional[TxParams]: params = pending.params - old_tip, old_max_fee = params["maxPriorityFeePerGas"], params["maxFeePerGas"] - new_tip, new_max_fee = self._calculate_speedup_fee(params) - tip_increase = round(Web3.from_wei(new_tip - old_tip, "gwei"), 4) - fee_increase = round(Web3.from_wei(new_max_fee - old_max_fee, "gwei"), 4) - - if new_tip > self.MAX_TIP: - raise Wait( - f"Pending transaction maxPriorityFeePerGas exceeds spending cap {self.MAX_TIP}" + + if self._GAS_PRICE_FIELD in pending.params: + old_gas_price = params[self._GAS_PRICE_FIELD] + new_gas_price = self._calculate_legacy_speedup_fee(pending.params) + log.info( + f"[speedup] Speeding up legacy transaction #atx-{pending.id} (nonce={params['nonce']}) \n" + f"gasPrice {old_gas_price} -> {new_gas_price}" + ) + params[self._GAS_PRICE_FIELD] = new_gas_price + else: + old_tip, old_max_fee = ( + params.get(self._MAX_PRIORITY_FEE_PER_GAS_FIELD), + params.get(self._MAX_FEE_PER_GAS_FIELD), + ) + suggested_tip, new_tip, new_max_fee = self._calculate_eip1559_speedup_fee( + params ) - latest_nonce = self.w3.eth.get_transaction_count(params["from"], "latest") - pending_nonce = self.w3.eth.get_transaction_count(params["from"], "pending") - if pending_nonce - latest_nonce > 0: - log.warn("Overriding pending transaction!") + # TODO: is this the best way of setting a cap? + if new_tip > (suggested_tip * self.max_tip_factor): + # nothing the strategy can do here - don't change the params + log.warn( + f"[speedup] Increasing pending transaction's {self._MAX_PRIORITY_FEE_PER_GAS_FIELD} " + f"to {round(Web3.from_wei(new_tip, 'gwei'), 4)} gwei will exceed " + f"spending cap factor {self.max_tip_factor}x over suggested tip " + f"({round(Web3.from_wei(suggested_tip, 'gwei'), 4)} gwei); " + f"don't speed up" + ) + return None + + tip_increase_message = ( + f"(~+{round(Web3.from_wei(new_tip - old_tip, 'gwei'), 4)} gwei) {old_tip}" + if old_tip + else "undefined" + ) + fee_increase_message = ( + f"(~+{round(Web3.from_wei(new_max_fee - old_max_fee, 'gwei'), 4)} gwei) {old_max_fee}" + if old_max_fee + else "undefined" + ) + log.info( + f"[speedup] Speeding up transaction #atx-{pending.id} (nonce={params['nonce']}) \n" + f"{self._MAX_PRIORITY_FEE_PER_GAS_FIELD} {tip_increase_message} -> {new_tip} \n" + f"{self._MAX_FEE_PER_GAS_FIELD} {fee_increase_message} -> {new_max_fee}" + ) + params = dict(params) + params[self._MAX_PRIORITY_FEE_PER_GAS_FIELD] = new_tip + params[self._MAX_FEE_PER_GAS_FIELD] = new_max_fee - log.info( - f"Speeding up transaction #{params['nonce']} \n" - f"maxPriorityFeePerGas (~+{tip_increase} gwei) {old_tip} -> {new_tip} \n" - f"maxFeePerGas (~+{fee_increase} gwei) {old_max_fee} -> {new_max_fee}" - ) - params = dict(params) - params["maxPriorityFeePerGas"] = new_tip - params["maxFeePerGas"] = new_max_fee - params["nonce"] = latest_nonce params = TxParams(params) return params diff --git a/atxm/tracker.py b/atxm/tracker.py index f5632be..40d5227 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -6,7 +6,6 @@ from pathlib import Path from typing import Callable, Deque, Dict, Optional, Set, Tuple -from eth_typing import ChecksumAddress from web3.types import TxParams, TxReceipt from atxm.exceptions import TransactionFaulted @@ -101,6 +100,27 @@ def __set_active(self, tx: PendingTx) -> None: return log.debug(f"[tracker] tracked active transaction {tx.txhash.hex()}") + def update_after_retry(self, tx: PendingTx) -> PendingTx: + if tx.id != self.__active.id: + raise RuntimeError( + f"Trying to update unexpected active tx: from {self.__active.id} to {tx.id}" + ) + + self.__active.txhash = tx.txhash + self.__active.params = tx.params + + return self.pending + + def update_failed_retry_attempt(self, tx: PendingTx): + if tx.id != self.__active.id: + raise RuntimeError( + f"Trying to update unexpected active tx: from {self.__active.id} to {tx.id}" + ) + self.__active.retries += 1 + # safety check + if tx is not self.__active: + tx.retries += 1 + def morph(self, tx: FutureTx, txhash: TxHash) -> PendingTx: """ Morphs a future transaction into a pending transaction. @@ -180,12 +200,13 @@ def queue(self) -> Tuple[FutureTx, ...]: """Return the queue of transactions.""" return tuple(self.__queue) - def _pop(self) -> FutureTx: + def pop(self) -> FutureTx: """Pop the next transaction from the queue.""" return self.__queue.popleft() - def _requeue(self, tx: FutureTx) -> None: + def requeue(self, tx: FutureTx) -> None: """Re-queue a transaction for broadcast and subsequent tracking.""" + tx.requeues += 1 self.__queue.append(tx) self.commit() log.info( @@ -193,18 +214,17 @@ def _requeue(self, tx: FutureTx) -> None: f"priority {len(self.__queue)}" ) - def _queue( + def queue_tx( self, params: TxParams, - _from: ChecksumAddress, info: Dict[str, str] = None, on_broadcast: Optional[Callable[[PendingTx], None]] = None, + on_broadcast_failure: Optional[Callable[[FutureTx, Exception], None]] = None, on_finalized: Optional[Callable[[FinalizedTx], None]] = None, on_fault: Optional[Callable[[FaultedTx], None]] = None, ) -> FutureTx: """Queue a new transaction for broadcast and subsequent tracking.""" tx = FutureTx( - _from=_from, id=self.__COUNTER, params=params, info=info, @@ -212,6 +232,7 @@ def _queue( # configure hooks tx.on_broadcast = on_broadcast + tx.on_broadcast_failure = on_broadcast_failure tx.on_finalized = on_finalized tx.on_fault = on_fault diff --git a/atxm/tx.py b/atxm/tx.py index 3320aa7..174ff0a 100644 --- a/atxm/tx.py +++ b/atxm/tx.py @@ -5,7 +5,7 @@ from eth_typing import ChecksumAddress from eth_utils import encode_hex from hexbytes import HexBytes -from web3.types import PendingTx, TxData, TxParams, TxReceipt +from web3.types import TxData, TxParams, TxReceipt from atxm.exceptions import Fault @@ -15,9 +15,13 @@ @dataclass class AsyncTx(ABC): id: int + params: TxParams final: bool = field(default=None, init=False) fault: Optional[Fault] = field(default=None, init=False) - on_broadcast: Optional[Callable[[PendingTx], None]] = field( + on_broadcast: Optional[Callable[["PendingTx"], None]] = field( + default=None, init=False + ) + on_broadcast_failure: Optional[Callable[["FutureTx", Exception], None]] = field( default=None, init=False ) on_finalized: Optional[Callable[["FinalizedTx"], None]] = field( @@ -48,18 +52,20 @@ def from_dict(cls, data: Dict): @dataclass class FutureTx(AsyncTx): + requeues: int = field(default=0, init=False) final: bool = field(default=False, init=False) - params: TxParams - _from: ChecksumAddress info: Optional[Dict] = None def __hash__(self): return hash(self.id) + @property + def _from(self) -> ChecksumAddress: + return self.params["from"] + def to_dict(self) -> Dict: return { "id": self.id, - "from": self._from, "params": _serialize_tx_params(self.params), "info": self.info, } @@ -68,7 +74,6 @@ def to_dict(self) -> Dict: def from_dict(cls, data: Dict): return cls( id=int(data["id"]), - _from=data["from"], params=TxParams(data["params"]), info=dict(data["info"]), ) @@ -76,6 +81,7 @@ def from_dict(cls, data: Dict): @dataclass class PendingTx(AsyncTx): + retries: int = field(default=0, init=False) final: bool = field(default=False, init=False) txhash: TxHash created: int diff --git a/atxm/utils.py b/atxm/utils.py index b444068..1159d12 100644 --- a/atxm/utils.py +++ b/atxm/utils.py @@ -4,16 +4,20 @@ from cytoolz import memoize from twisted.internet import reactor from web3 import Web3 -from web3.exceptions import TransactionNotFound +from web3.exceptions import ( + ProviderConnectionError, + TimeExhausted, + TooManyRequests, + TransactionNotFound, +) from web3.types import TxData, TxParams -from web3.types import RPCError, TxReceipt, Wei +from web3.types import TxReceipt, Wei from atxm.exceptions import ( - InsufficientFunds, TransactionReverted, ) from atxm.logging import log -from atxm.tx import AsyncTx, FinalizedTx, FutureTx, PendingTx, TxHash +from atxm.tx import AsyncTx, FinalizedTx, PendingTx, TxHash @memoize @@ -31,10 +35,12 @@ def _get_average_blocktime(w3: Web3, sample_size: int) -> float: return average_block_time -def _log_gas_weather(base_fee: Wei, tip: Wei) -> None: +def _log_gas_weather(base_fee: Wei, suggested_tip: Wei) -> None: base_fee_gwei = Web3.from_wei(base_fee, "gwei") - tip_gwei = Web3.from_wei(tip, "gwei") - log.info(f"Gas conditions: base {base_fee_gwei} gwei | tip {tip_gwei} gwei") + tip_gwei = Web3.from_wei(suggested_tip, "gwei") + log.info( + f"[gas] Gas conditions: base {base_fee_gwei} gwei | suggested tip {tip_gwei} gwei" + ) def _get_receipt_from_txhash(w3: Web3, txhash: TxHash) -> Optional[TxReceipt]: @@ -96,7 +102,7 @@ def _get_confirmations(w3: Web3, tx: Union[PendingTx, FinalizedTx]) -> int: return confirmations -def fire_hook(hook: Callable, tx: AsyncTx, *args, **kwargs) -> None: +def fire_hook(hook: Callable, tx: AsyncTx, *args) -> None: """ Fire a callable in a separate thread. Try exceptionally hard not to crash the async tasks during dispatch. @@ -106,7 +112,7 @@ def fire_hook(hook: Callable, tx: AsyncTx, *args, **kwargs) -> None: def _hook() -> None: """I'm inside a thread!""" try: - hook(tx, *args, **kwargs) + hook(tx, *args) except Exception as e: log.warn(f"[hook] raised {e}") @@ -114,25 +120,6 @@ def _hook() -> None: log.info(f"[hook] fired hook {hook} for transaction #atx-{tx.id}") -def _handle_rpc_error(e: Exception, tx: FutureTx) -> None: - try: - error = RPCError(**e.args[0]) - except TypeError: - log.critical( - f"[error] transaction #atx-{tx.id}|{tx.params['nonce']} failed with {e}" - ) - else: - log.critical( - f"[error] transaction #atx-{tx.id}|{tx.params['nonce']} failed with {error['code']} | {error['message']}" - ) - if error["code"] == -32000: - if "insufficient funds" in error["message"]: - raise InsufficientFunds - hook = tx.on_fault - if hook: - fire_hook(hook=hook, tx=tx, error=e) - - def _make_tx_params(data: TxData) -> TxParams: """ TxData -> TxParams: Creates a transaction parameters @@ -147,6 +134,7 @@ def _make_tx_params(data: TxData) -> TxParams: "nonce": data["nonce"], "chainId": data["chainId"], "gas": data["gas"], + "from": data["from"], "to": data["to"], "value": data["value"], "data": data.get("data", b""), @@ -165,3 +153,7 @@ def _make_tx_params(data: TxData) -> TxParams: raise ValueError(f"unrecognized tx data: {data}") return params + + +def _is_recoverable_send_tx_error(e: Exception) -> bool: + return isinstance(e, (TooManyRequests, ProviderConnectionError, TimeExhausted)) diff --git a/tests/conftest.py b/tests/conftest.py index 2b8c43d..ec51efc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,7 @@ import pytest from eth_account import Account +from eth_tester import EthereumTester from statemachine import State from twisted.internet.task import Clock from twisted.logger import globalLogPublisher, textFileLogObserver @@ -86,6 +87,18 @@ def mock_wake_sleep(machine, mocker): return wake, sleep +@pytest.fixture +def disable_auto_mining(ethereum_tester): + ethereum_tester.disable_auto_mine_transactions() + yield + ethereum_tester.enable_auto_mine_transactions() + + +@pytest.fixture +def ethereum_tester(w3) -> EthereumTester: + return w3.provider.ethereum_tester + + class StateObserver: def __init__(self): self.transitions: List[Tuple[State, State]] = [] diff --git a/tests/test_faults.py b/tests/test_faults.py index 0ffd3ef..3fe125b 100644 --- a/tests/test_faults.py +++ b/tests/test_faults.py @@ -1,4 +1,7 @@ -from web3.exceptions import TransactionNotFound +import pytest +import pytest_twisted +from twisted.internet import reactor +from twisted.internet.task import deferLater from web3.types import TxReceipt from atxm.exceptions import Fault, TransactionFaulted @@ -26,10 +29,14 @@ def _broadcast_tx(machine, eip1559_transaction, account, mocker): return atx, fault_hook +@pytest_twisted.inlineCallbacks def _verify_tx_faulted(machine, atx, fault_hook, expected_fault: Fault): - while fault_hook.call_count == 0: - # ensure tx processed - machine._cycle() + machine._cycle() + + # ensure hook is called + yield deferLater(reactor, 0.2, lambda: None) + assert fault_hook.call_count == 1 + fault_hook.assert_called_with(atx) assert atx.final is False assert isinstance(atx, FaultedTx) @@ -42,12 +49,8 @@ def _verify_tx_faulted(machine, atx, fault_hook, expected_fault: Fault): assert machine.pending is None assert atx.final is False - assert fault_hook.call_count == 1 - fault_hook.assert_called_with(atx) - def test_revert( - chain, w3, machine, clock, @@ -61,9 +64,7 @@ def test_revert( assert machine.pending - chain.mine(1) - - # force receipt to symbolize a revert of the tx + # tx auto-mined; force receipt to symbolize a revert of the tx receipt = _get_receipt_from_txhash(w3, atx.txhash) revert_receipt = dict(receipt) revert_receipt["status"] = 0 @@ -75,10 +76,13 @@ def test_revert( _verify_tx_faulted(machine, atx, fault_hook, expected_fault=Fault.REVERT) +@pytest.mark.usefixtures("disable_auto_mining") def test_strategy_fault( w3, machine, clock, eip1559_transaction, account, interval, mock_wake_sleep, mocker ): faulty_strategy = mocker.Mock(spec=AsyncTxStrategy) + + # TODO: consider whether strategies should just be overridden through the constructor machine._strategies.insert(0, faulty_strategy) # add first atx, fault_hook = _broadcast_tx(machine, eip1559_transaction, account, mocker) @@ -88,20 +92,18 @@ def test_strategy_fault( tx=atx, fault=Fault.ERROR, message=faulty_message ) - mocker.patch.object( - w3.eth, "get_transaction_receipt", side_effect=TransactionNotFound - ) - + # tx not mined _verify_tx_faulted(machine, atx, fault_hook, expected_fault=Fault.ERROR) assert atx.error == faulty_message +@pytest.mark.usefixtures("disable_auto_mining") def test_timeout_strategy_fault( w3, machine, clock, eip1559_transaction, account, interval, mock_wake_sleep, mocker ): atx, fault_hook = _broadcast_tx(machine, eip1559_transaction, account, mocker) atx.created -= 9999999999 - mocker.patch.object(w3.eth, "get_transaction", side_effect=TransactionNotFound) + # tx not mined _verify_tx_faulted(machine, atx, fault_hook, expected_fault=Fault.TIMEOUT) diff --git a/tests/test_machine.py b/tests/test_machine.py index d7f369e..9e4aa49 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -1,10 +1,22 @@ +import math + import pytest import pytest_twisted +from eth_account import Account +from eth_utils import ValidationError from twisted.internet import reactor from twisted.internet.task import deferLater +from web3.exceptions import ( + ProviderConnectionError, + TimeExhausted, + TooManyRequests, + Web3Exception, +) -from atxm.tx import FutureTx, PendingTx +from atxm.strategies import AsyncTxStrategy, TimeoutStrategy +from atxm.tx import FaultedTx, FinalizedTx, FutureTx, PendingTx +from atxm.utils import _is_recoverable_send_tx_error @pytest.fixture() @@ -36,6 +48,49 @@ def test_no_rpc_calls_when_idle(clock, machine, state_observer, rpc_spy): machine.stop() +def test_queue_from_parameter_handling( + machine, + account, + eip1559_transaction, + mock_wake_sleep, +): + # 1. "from" parameter does not match account + with pytest.raises(ValueError): + tx_params = dict(eip1559_transaction) + + # use random checksum address + random_account = Account.create( + "YOUTH IS WASTED ON THE YOUNG" + ) # - George Bernard Shaw + assert random_account.address != account.address, "addresses don't match" + + tx_params["from"] = random_account.address + _ = machine.queue_transaction( + params=tx_params, + signer=account, + ) + + # 2. no "from" parameter + tx_params = dict(eip1559_transaction) + if "from" in eip1559_transaction: + del tx_params["from"] + + atx = machine.queue_transaction( + params=tx_params, + signer=account, + ) + assert atx.params["from"] == account.address, "same as signer account" + + # 3. matching "from" parameter + tx_params = dict(eip1559_transaction) + tx_params["from"] = account.address + atx = machine.queue_transaction( + params=tx_params, + signer=account, + ) + assert atx.params["from"] == account.address + + def test_queue( machine, state_observer, @@ -197,6 +252,7 @@ def test_wake_no_call_after_queuing_when_already_paused( @pytest_twisted.inlineCallbacks +@pytest.mark.usefixtures("disable_auto_mining") def test_broadcast( clock, machine, @@ -245,6 +301,8 @@ def test_broadcast( yield deferLater(reactor, 0.2, lambda: None) assert hook.call_count == 1 + assert atx.retries == 0 + # tx only broadcasted and not finalized, so we are still busy assert machine.current_state == machine._BUSY @@ -255,8 +313,243 @@ def test_broadcast( @pytest_twisted.inlineCallbacks +@pytest.mark.usefixtures("disable_auto_mining") +@pytest.mark.parametrize( + "non_recoverable_error", [ValidationError, ValueError, Web3Exception] +) +def test_broadcast_non_recoverable_error( + non_recoverable_error, + clock, + w3, + machine, + state_observer, + eip1559_transaction, + account, + mocker, + mock_wake_sleep, +): + wake, _ = mock_wake_sleep + + assert machine.current_state == machine._IDLE + assert not machine.busy + + # Queue a transaction + broadcast_failure_hook = mocker.Mock() + broadcast_hook = mocker.Mock() + atx = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + on_broadcast=broadcast_hook, + on_broadcast_failure=broadcast_failure_hook, + info={"message": "something wonderful is happening..."}, + ) + + assert wake.call_count == 1 + + # There is one queued transaction + assert len(machine.queued) == 1 + + # make firing of transaction fail + error = non_recoverable_error("non-recoverable error") + assert not _is_recoverable_send_tx_error(error) + mocker.patch.object(w3.eth, "send_raw_transaction", side_effect=error) + + machine.start(now=True) + yield clock.advance(1) + + # wait for the hook to be called + yield deferLater(reactor, 0.2, lambda: None) + assert broadcast_failure_hook.call_count == 1 + broadcast_failure_hook.assert_called_with(atx, error) + + # The transaction failed and is not requeued + assert len(machine.queued) == 0 + + # run a few cycles + for i in range(2): + yield clock.advance(1) + + assert broadcast_hook.call_count == 0 + + assert atx.requeues == 0 + + # tx failed and not requeued + assert machine.current_state == machine._IDLE + + assert len(state_observer.transitions) == 2 + assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) + assert state_observer.transitions[1] == (machine._BUSY, machine._IDLE) + + machine.stop() + + +@pytest_twisted.inlineCallbacks +@pytest.mark.usefixtures("disable_auto_mining") +@pytest.mark.parametrize( + "recoverable_error", [TooManyRequests, ProviderConnectionError, TimeExhausted] +) +def test_broadcast_recoverable_error( + recoverable_error, + clock, + w3, + machine, + state_observer, + eip1559_transaction, + account, + mocker, + mock_wake_sleep, +): + # need more freedom with redo attempts for test + mocker.patch.object(machine, "_MAX_REDO_ATTEMPTS", 10) + + wake, _ = mock_wake_sleep + + assert machine.current_state == machine._IDLE + assert not machine.busy + + # Queue a transaction + broadcast_failure_hook = mocker.Mock() + broadcast_hook = mocker.Mock() + atx = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + on_broadcast=broadcast_hook, + on_broadcast_failure=broadcast_failure_hook, + info={"message": "something wonderful is happening..."}, + ) + + assert wake.call_count == 1 + + # There is one queued transaction + assert len(machine.queued) == 1 + + real_method = w3.eth.send_raw_transaction + # make firing of transaction fail but with recoverable error + error = recoverable_error("recoverable error") + assert _is_recoverable_send_tx_error(error) + mocker.patch.object(w3.eth, "send_raw_transaction", side_effect=error) + + # repeat some cycles; tx fails then gets requeued since error is "recoverable" + machine.start(now=True) + for i in range(5): + yield clock.advance(1) + assert len(machine.queued) == 1 # remains in queue and not broadcasted + assert atx.requeues >= i + + # call real method from now on + mocker.patch.object(w3.eth, "send_raw_transaction", side_effect=real_method) + while machine.pending is None: + yield clock.advance(1) + + assert machine.current_state == machine._BUSY + + # The transaction is broadcasted and no longer queued + assert len(machine.queued) == 0 + + assert machine.pending == atx + assert isinstance(machine.pending, PendingTx) + + assert isinstance(atx, PendingTx) + assert not atx.final + assert atx.txhash + + # wait for the hook to be called + yield deferLater(reactor, 0.2, lambda: None) + assert broadcast_hook.call_count == 1 + assert broadcast_failure_hook.call_count == 0 + + # tx only broadcasted and not finalized, so we are still busy + assert machine.current_state == machine._BUSY + + assert len(state_observer.transitions) == 1 + assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) + + machine.stop() + + +@pytest_twisted.inlineCallbacks +@pytest.mark.usefixtures("disable_auto_mining") +@pytest.mark.parametrize( + "recoverable_error", [TooManyRequests, ProviderConnectionError, TimeExhausted] +) +def test_broadcast_recoverable_error_requeues_exceeded( + recoverable_error, + clock, + w3, + machine, + state_observer, + eip1559_transaction, + account, + mocker, + mock_wake_sleep, +): + wake, _ = mock_wake_sleep + + assert machine.current_state == machine._IDLE + assert not machine.busy + + # Queue a transaction + broadcast_failure_hook = mocker.Mock() + broadcast_hook = mocker.Mock() + atx = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + on_broadcast=broadcast_hook, + on_broadcast_failure=broadcast_failure_hook, + info={"message": "something wonderful is happening..."}, + ) + + assert wake.call_count == 1 + + # There is one queued transaction + assert len(machine.queued) == 1 + + # make firing of transaction fail but with recoverable error + error = recoverable_error("recoverable error") + assert _is_recoverable_send_tx_error(error) + mocker.patch.object(w3.eth, "send_raw_transaction", side_effect=error) + + # repeat some cycles; tx fails then gets requeued since error is "recoverable" + machine.start(now=True) + # one less than max attempts + for i in range(machine._MAX_REDO_ATTEMPTS - 1): + assert len(machine.queued) == 1 # remains in queue and not broadcasted + yield clock.advance(1) + assert atx.requeues >= i + + # push over the retry limit + yield clock.advance(1) + + # wait for the hook to be called + yield deferLater(reactor, 0.2, lambda: None) + assert broadcast_failure_hook.call_count == 1 + broadcast_failure_hook.assert_called_with(atx, error) + + assert atx.requeues == machine._MAX_REDO_ATTEMPTS + + # The transaction failed and is not requeued + assert len(machine.queued) == 0 + + # run a few cycles + for i in range(2): + yield clock.advance(1) + + assert broadcast_hook.call_count == 0 + + # tx failed and not requeued + assert machine.current_state == machine._IDLE + + assert len(state_observer.transitions) == 2 + assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) + assert state_observer.transitions[1] == (machine._BUSY, machine._IDLE) + + machine.stop() + + +@pytest_twisted.inlineCallbacks +@pytest.mark.usefixtures("disable_auto_mining") def test_finalize( - chain, + ethereum_tester, clock, machine, state_observer, @@ -290,7 +583,7 @@ def test_finalize( # advance to finalize the transaction while machine.pending: - yield chain.mine(1) + yield ethereum_tester.mine_block() yield clock.advance(1) # The transaction is no longer pending @@ -321,30 +614,121 @@ def test_finalize( @pytest_twisted.inlineCallbacks +@pytest.mark.usefixtures("disable_auto_mining") def test_follow( - chain, machine, state_observer, clock, eip1559_transaction, account, mock_wake_sleep + ethereum_tester, + machine, + state_observer, + clock, + eip1559_transaction, + account, + mock_wake_sleep, +): + machine.start() + assert machine.current_state == machine._IDLE + + atx = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + ) + + # advance to broadcast the transaction + while machine.pending is None: + yield clock.advance(1) + + assert machine.current_state == machine._BUSY + + while not machine.finalized: + yield ethereum_tester.mine_block() + yield clock.advance(1) + + assert atx.final is True + + while len(machine.finalized) > 0: + yield ethereum_tester.mine_block() + yield clock.advance(1) + + assert len(machine.finalized) == 0 + assert len(machine.queued) == 0 + assert machine.pending is None + + assert not machine.busy + + assert machine.current_state == machine._IDLE + + assert len(state_observer.transitions) == 2 + assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) + assert state_observer.transitions[1] == (machine._BUSY, machine._IDLE) + + machine.stop() + + +@pytest_twisted.inlineCallbacks +@pytest.mark.usefixtures("disable_auto_mining") +def test_use_strategies_speedup_used( + ethereum_tester, + machine, + state_observer, + clock, + eip1559_transaction, + account, + mocker, + mock_wake_sleep, ): machine.start() assert machine.current_state == machine._IDLE + old_max_fee = eip1559_transaction["maxFeePerGas"] + old_priority_fee = eip1559_transaction["maxPriorityFeePerGas"] + + broadcast_hook = mocker.Mock() atx = machine.queue_transaction( params=eip1559_transaction, signer=account, + on_broadcast=broadcast_hook, ) + update_spy = mocker.spy(machine._tx_tracker, "update_after_retry") + # advance to broadcast the transaction while machine.pending is None: yield clock.advance(1) + # ensure that hook is called + yield deferLater(reactor, 0.2, lambda: None) + assert broadcast_hook.call_count == 1 + broadcast_hook.assert_called_with(atx), "called with correct parameter" + assert machine.current_state == machine._BUSY + original_params = dict(atx.params) + + # need some cycles while tx unmined for strategies to kick in + for i in range(2): + yield clock.advance(1) + + # ensure that hook is called + yield deferLater(reactor, 0.2, lambda: None) + assert ( + broadcast_hook.call_count > 1 + ), "additional calls to broadcast since tx retried/replaced" + broadcast_hook.assert_called_with(atx), "called with correct parameter" + + assert atx.params != original_params, "params changed" + assert update_spy.call_count > 0, "params updated" + while not machine.finalized: + yield ethereum_tester.mine_block() yield clock.advance(1) assert atx.final is True + # speed up strategy kicked in + assert atx.params["maxFeePerGas"] > old_max_fee + assert atx.params["maxPriorityFeePerGas"] > old_priority_fee + while len(machine.finalized) > 0: - yield chain.mine(1) + yield ethereum_tester.mine_block() yield clock.advance(1) assert len(machine.finalized) == 0 @@ -362,6 +746,368 @@ def test_follow( machine.stop() +@pytest_twisted.inlineCallbacks +@pytest.mark.usefixtures("disable_auto_mining") +def test_use_strategies_timeout_used( + ethereum_tester, + w3, + machine, + state_observer, + clock, + eip1559_transaction, + account, + mocker, + mock_wake_sleep, +): + fault_hook = mocker.Mock() + + machine.start() + assert machine.current_state == machine._IDLE + + atx = machine.queue_transaction( + params=eip1559_transaction, signer=account, on_fault=fault_hook + ) + + # advance to broadcast the transaction + while machine.pending is None: + yield clock.advance(1) + + assert machine.current_state == machine._BUSY + + # don't mine transaction at all; wait for hook to be called when timeout occurs + + # need some cycles while tx unmined for strategies to kick in + num_cycles = 4 + for i in range(num_cycles): + # reduce creation time by timeout to force timeout + atx.created -= math.ceil(TimeoutStrategy._TIMEOUT / (num_cycles - 1)) + yield clock.advance(1) + + # ensure switch back to IDLE + yield clock.advance(1) + + # ensure that hook is called + yield deferLater(reactor, 0.2, lambda: None) + + assert fault_hook.call_count == 1 + fault_hook.assert_called_with(atx) + + assert len(machine.queued) == 0 + assert machine.pending is None + + assert not machine.busy + assert not atx.final + + assert machine.current_state == machine._IDLE + + assert len(state_observer.transitions) == 2 + assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) + assert state_observer.transitions[1] == (machine._BUSY, machine._IDLE) + + machine.stop() + + +@pytest_twisted.inlineCallbacks +@pytest.mark.usefixtures("disable_auto_mining") +def test_use_strategies_that_dont_make_updates( + ethereum_tester, + w3, + machine, + state_observer, + clock, + eip1559_transaction, + account, + mocker, + mock_wake_sleep, +): + # TODO consider whether this should just be provided to constructor - #23 + machine._strategies.clear() + + # strategies that don't make updates + strategy_1 = mocker.Mock(spec=AsyncTxStrategy) + strategy_1.execute.return_value = None + strategy_2 = mocker.Mock(spec=AsyncTxStrategy) + strategy_2.execute.return_value = None + + machine._strategies = [strategy_1, strategy_2] + + update_spy = mocker.spy(machine._tx_tracker, "update_after_retry") + + machine.start() + assert machine.current_state == machine._IDLE + + broadcast_hook = mocker.Mock() + atx = machine.queue_transaction( + params=eip1559_transaction, signer=account, on_broadcast=broadcast_hook + ) + + # advance to broadcast the transaction + while machine.pending is None: + yield clock.advance(1) + + # ensure that hook is called + yield deferLater(reactor, 0.2, lambda: None) + assert broadcast_hook.call_count == 1 + broadcast_hook.assert_called_with(atx), "called with correct parameter" + + original_params = dict(atx.params) + + assert machine.current_state == machine._BUSY + + # need some cycles while tx unmined for strategies to kick in + num_cycles = 4 + for i in range(num_cycles): + yield clock.advance(1) + # params remained unchanged since strategies don't make updates + assert atx.params == original_params, "params remain unchanged" + + assert strategy_1.execute.call_count > 0, "strategy #1 was called" + assert strategy_2.execute.call_count > 0, "strategy #2 was called" + + assert atx.params == original_params, "params remain unchanged" + assert update_spy.call_count == 0, "update never called because no retry" + + # mine tx + ethereum_tester.mine_block() + yield clock.advance(1) + + # ensure switch back to IDLE + yield clock.advance(1) + + assert len(machine.queued) == 0 + assert machine.pending is None + + assert not machine.busy + assert atx.final + + assert machine.current_state == machine._IDLE + + assert len(state_observer.transitions) == 2 + assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) + assert state_observer.transitions[1] == (machine._BUSY, machine._IDLE) + + machine.stop() + + +@pytest_twisted.inlineCallbacks +@pytest.mark.usefixtures("disable_auto_mining") +@pytest.mark.parametrize( + "retry_error", + [ + ValidationError, + ValueError, + Web3Exception, + TooManyRequests, + ProviderConnectionError, + TimeExhausted, + ], +) +def test_retry_with_errors_but_recovers( + retry_error, + ethereum_tester, + w3, + machine, + state_observer, + clock, + eip1559_transaction, + account, + mocker, + mock_wake_sleep, +): + # need more freedom with redo attempts for test + mocker.patch.object(machine, "_MAX_REDO_ATTEMPTS", 10) + + # TODO consider whether this should just be provided to constructor - #23 + machine._strategies.clear() + + # strategies that don't make updates + strategy_1 = mocker.Mock(spec=AsyncTxStrategy) + strategy_1.name = "mock_strategy" + # return non-None so retry is attempted + strategy_1.execute.return_value = dict(eip1559_transaction) + + machine._strategies = [strategy_1] + + update_spy = mocker.spy(machine._tx_tracker, "update_after_retry") + + machine.start() + assert machine.current_state == machine._IDLE + + broadcast_hook = mocker.Mock() + fault_hook = mocker.Mock() + atx = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + on_fault=fault_hook, + on_broadcast=broadcast_hook, + ) + + # advance to broadcast the transaction + while machine.pending is None: + yield clock.advance(1) + + # ensure that hook is called + yield deferLater(reactor, 0.2, lambda: None) + assert broadcast_hook.call_count == 1 + + assert machine.current_state == machine._BUSY + + real_method = w3.eth.send_raw_transaction + + # make firing of retry transaction fail with non-recoverable error + error = retry_error("retry error") + mocker.patch.object(w3.eth, "send_raw_transaction", side_effect=error) + + # run a cycle while tx unmined for strategies to kick in + for i in range(5): + yield clock.advance(1) + assert ( + machine.pending + ), "tx is being retried but encounters retry error and remains pending" + assert atx.retries >= i + + assert strategy_1.execute.call_count > 0, "strategy #1 was called" + # retries failed, so params shouldn't have been updated + assert update_spy.call_count == 0, "update never called because each retry failed" + + # call real method from now on + mocker.patch.object(w3.eth, "send_raw_transaction", side_effect=real_method) + + while not machine.finalized: + yield ethereum_tester.mine_block() + yield clock.advance(1) + + yield clock.advance(1) + assert atx.final is True + + # ensure that hook is not called + yield deferLater(reactor, 0.2, lambda: None) + assert fault_hook.call_count == 0 + + # ensure switch back to IDLE + yield clock.advance(1) + + assert len(machine.queued) == 0 + assert machine.pending is None + + assert not machine.busy + assert atx.final + assert isinstance(atx, FinalizedTx) + + assert machine.current_state == machine._IDLE + + assert len(state_observer.transitions) == 2 + assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) + assert state_observer.transitions[1] == (machine._BUSY, machine._IDLE) + + machine.stop() + + +@pytest_twisted.inlineCallbacks +@pytest.mark.usefixtures("disable_auto_mining") +@pytest.mark.parametrize( + "retry_error", + [ + ValidationError, + ValueError, + Web3Exception, + TooManyRequests, + ProviderConnectionError, + TimeExhausted, + ], +) +def test_retry_with_errors_retries_exceeded( + retry_error, + ethereum_tester, + w3, + machine, + state_observer, + clock, + eip1559_transaction, + account, + mocker, + mock_wake_sleep, +): + # TODO consider whether this should just be provided to constructor - #23 + machine._strategies.clear() + + # strategies that don't make updates + strategy_1 = mocker.Mock(spec=AsyncTxStrategy) + strategy_1.name = "mock_strategy" + # return non-None so retry is attempted + strategy_1.execute.return_value = dict(eip1559_transaction) + + machine._strategies = [strategy_1] + + update_spy = mocker.spy(machine._tx_tracker, "update_after_retry") + + machine.start() + assert machine.current_state == machine._IDLE + + broadcast_hook = mocker.Mock() + fault_hook = mocker.Mock() + atx = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + on_fault=fault_hook, + on_broadcast=broadcast_hook, + ) + + # advance to broadcast the transaction + while machine.pending is None: + yield clock.advance(1) + + # ensure that hook is called + yield deferLater(reactor, 0.2, lambda: None) + assert broadcast_hook.call_count == 1 + + assert machine.current_state == machine._BUSY + + # make firing of retry transaction fail with non-recoverable error + error = retry_error("retry error") + mocker.patch.object(w3.eth, "send_raw_transaction", side_effect=error) + + # retry max attempts + for i in range(machine._MAX_REDO_ATTEMPTS): + assert machine.pending is not None + yield clock.advance(1) + assert atx.retries >= i + + # push over retry limit + yield clock.advance(1) + + assert strategy_1.execute.call_count > 0, "strategy #1 was called" + # retries failed, so params shouldn't have been updated + assert update_spy.call_count == 0, "update never called because each retry failed" + + # wait for the hook to be called + yield deferLater(reactor, 0.2, lambda: None) + assert fault_hook.call_count == 1 + fault_hook.assert_called_with(atx) + + assert atx.retries == machine._MAX_REDO_ATTEMPTS + + assert len(machine.queued) == 0 + assert atx.final is False + assert isinstance(atx, FaultedTx) + + # ensure switch back to IDLE + yield clock.advance(1) + + assert len(machine.queued) == 0 + assert machine.pending is None + + assert not machine.busy + + assert machine.current_state == machine._IDLE + + assert len(state_observer.transitions) == 2 + assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) + assert state_observer.transitions[1] == (machine._BUSY, machine._IDLE) + + machine.stop() + + @pytest_twisted.inlineCallbacks def test_pause_when_idle(clock, machine, mocker): machine.start() @@ -431,8 +1177,9 @@ def test_pause_when_busy(clock, machine, eip1559_transaction, account, mocker): machine.stop() +@pytest.mark.usefixtures("disable_auto_mining") def test_simple_state_transitions( - chain, machine, eip1559_transaction, account, mock_wake_sleep + ethereum_tester, machine, eip1559_transaction, account, mock_wake_sleep ): assert machine.current_state == machine._IDLE @@ -468,6 +1215,8 @@ def test_simple_state_transitions( assert machine.current_state == machine._BUSY assert machine.busy + ethereum_tester.mine_block() + # busy -> pause machine.pause() assert machine.current_state == machine._PAUSED @@ -481,9 +1230,10 @@ def test_simple_state_transitions( # finalize tx while machine.busy: - chain.mine(1) + ethereum_tester.mine_block() machine._cycle() + ethereum_tester.mine_block() assert atx.final is True # transition to idle diff --git a/tests/test_strategy.py b/tests/test_strategy.py index 1966314..53c1c2a 100644 --- a/tests/test_strategy.py +++ b/tests/test_strategy.py @@ -1,4 +1,7 @@ +import math +import random from datetime import datetime, timedelta +from unittest.mock import PropertyMock import pytest from hexbytes import HexBytes @@ -6,20 +9,33 @@ from web3.types import TxParams from atxm.exceptions import Fault, TransactionFaulted -from atxm.strategies import TimeoutStrategy +from atxm.strategies import ExponentialSpeedupStrategy, TimeoutStrategy from atxm.tx import PendingTx def test_timeout_strategy(w3, mocker): - TIMEOUT = 600 # 10 mins + TIMEOUT = 900 # 15 mins # default timeout timeout_strategy = TimeoutStrategy(w3) assert timeout_strategy.timeout == TimeoutStrategy._TIMEOUT + assert ( + timeout_strategy._warn_threshold + == TimeoutStrategy._TIMEOUT * TimeoutStrategy._WARN_FACTOR + ) + + # specific timeout - low timeout + low_timeout = 60 # 60s + timeout_strategy = TimeoutStrategy(w3, timeout=low_timeout) + assert timeout_strategy.timeout == low_timeout + assert ( + timeout_strategy._warn_threshold == 30 + ) # timeout so low that max warn threshold hit # specific timeout timeout_strategy = TimeoutStrategy(w3, timeout=TIMEOUT) assert timeout_strategy.timeout == TIMEOUT + assert timeout_strategy._warn_threshold == TIMEOUT * TimeoutStrategy._WARN_FACTOR assert timeout_strategy.name == timeout_strategy._NAME @@ -39,7 +55,7 @@ def test_timeout_strategy(w3, mocker): # 1) tx just created a does not time out for i in range(3): - assert timeout_strategy.execute(pending_tx) == params + assert timeout_strategy.execute(pending_tx) is None # no change to params # 2) remaining time is < warn factor; still doesn't time out but we warn about it pending_tx.created = ( @@ -53,19 +69,16 @@ def warning_trapper(event): warnings.append(event) globalLogPublisher.addObserver(warning_trapper) - assert timeout_strategy.execute(pending_tx) == params + assert timeout_strategy.execute(pending_tx) is None # no change to params globalLogPublisher.removeObserver(warning_trapper) assert len(warnings) == 1 warning = warnings[0]["log_format"] - assert ( - f"[pending_timeout] Transaction {pending_tx.txhash.hex()} will timeout in" - in warning - ) + assert f"[timeout] Transaction {pending_tx.txhash.hex()} will timeout in" in warning # 3) close to timeout but not quite (5s short) pending_tx.created = (now - timedelta(seconds=(TIMEOUT - 5))).timestamp() - assert timeout_strategy.execute(pending_tx) == params + assert timeout_strategy.execute(pending_tx) is None # no change to params # 4) timeout pending_tx.created = (now - timedelta(seconds=(TIMEOUT + 1))).timestamp() @@ -75,3 +88,342 @@ def warning_trapper(event): e.tx = pending_tx e.fault = Fault.TIMEOUT e.message = "Transaction has timed out" + + +def test_speedup_strategy_constructor(w3): + # invalid increase percentage + for speedup_perc in [-1, -0.24, 0, 1.01, 1.1]: + with pytest.raises(ValueError): + _ = ExponentialSpeedupStrategy( + w3=w3, speedup_increase_percentage=speedup_perc + ) + + # invalid max tip + for max_tip in [-1, 0, 0.5, 0.9, 1]: + with pytest.raises(ValueError): + _ = ExponentialSpeedupStrategy(w3=w3, max_tip_factor=max_tip) + + # defaults + speedup_strategy = ExponentialSpeedupStrategy(w3=w3) + assert speedup_strategy.speedup_factor == ( + 1 + ExponentialSpeedupStrategy._SPEEDUP_INCREASE_PERCENTAGE + ) + assert speedup_strategy.max_tip_factor == ExponentialSpeedupStrategy._MAX_TIP_FACTOR + + # other values + speedup_increase = 0.223 + max_tip_factor = 4 + speedup_strategy = ExponentialSpeedupStrategy( + w3=w3, + speedup_increase_percentage=speedup_increase, + max_tip_factor=max_tip_factor, + ) + assert speedup_strategy.speedup_factor == (1 + speedup_increase) + assert speedup_strategy.max_tip_factor == max_tip_factor + + assert speedup_strategy.name == "speedup" + + +def test_speedup_strategy_legacy_tx(w3, legacy_transaction, mocker): + speedup_percentage = 0.112 # 11.2% + speedup_strategy = ExponentialSpeedupStrategy( + w3, speedup_increase_percentage=speedup_percentage + ) + + pending_tx = mocker.Mock(spec=PendingTx) + pending_tx.id = 1 + + # generated gas price < existing tx gas price + generated_gas_price = legacy_transaction["gasPrice"] - 1 # < what is in tx + mocker.patch.object(w3.eth, "generate_gas_price", return_value=generated_gas_price) + + assert legacy_transaction["gasPrice"] + tx_params = dict(legacy_transaction) + for i in range(3): + pending_tx.params = tx_params + old_gas_price = tx_params["gasPrice"] + old_nonce = tx_params["nonce"] + tx_params = speedup_strategy.execute(pending_tx) + + current_gas_price = tx_params["gasPrice"] + assert current_gas_price != old_gas_price + assert current_gas_price == math.ceil(old_gas_price * (1 + speedup_percentage)) + assert tx_params["nonce"] == old_nonce + + # generated gas price is None - same results as before + mocker.patch.object(w3.eth, "generate_gas_price", return_value=None) # set to None + for i in range(3): + tx_params = dict(legacy_transaction) + pending_tx.params = tx_params + old_gas_price = tx_params["gasPrice"] + old_nonce = tx_params["nonce"] + tx_params = speedup_strategy.execute(pending_tx) + + current_gas_price = tx_params["gasPrice"] + assert current_gas_price != old_gas_price + assert current_gas_price == math.ceil(old_gas_price * (1 + speedup_percentage)) + assert tx_params["nonce"] == old_nonce + + # increase generated gas price more than existing gas price in legacy tx + tx_params = dict(legacy_transaction) + pending_tx.params = tx_params + generated_gas_price = tx_params["gasPrice"] * 2 # > what is in tx + mocker.patch.object(w3.eth, "generate_gas_price", return_value=generated_gas_price) + + old_gas_price = tx_params["gasPrice"] + old_nonce = tx_params["nonce"] + updated_tx_params = speedup_strategy.execute(pending_tx) + + current_gas_price = updated_tx_params["gasPrice"] + assert current_gas_price != old_gas_price + assert current_gas_price == math.ceil( + generated_gas_price * (1 + speedup_percentage) + ) + assert updated_tx_params["nonce"] == old_nonce + + +def test_speedup_strategy_eip1559_tx_no_blockchain_change( + w3, eip1559_transaction, mocker +): + # blockchain conditions have not changed since + ( + current_base_fee, + max_tip_factor, + pending_tx, + speedup_percentage, + speedup_strategy, + suggested_tip, + ) = eip1559_setup(mocker, w3) + + old_max_priority_fee_per_gas = eip1559_transaction["maxPriorityFeePerGas"] + old_max_fee_per_gas = eip1559_transaction["maxFeePerGas"] + + pending_tx.params = dict(eip1559_transaction) + tx_params = speedup_strategy.execute(pending_tx) + + updated_max_priority_fee_per_gas = tx_params["maxPriorityFeePerGas"] + assert updated_max_priority_fee_per_gas > old_max_priority_fee_per_gas + assert updated_max_priority_fee_per_gas == math.ceil( + old_max_priority_fee_per_gas * (1 + speedup_percentage) + ) + + updated_max_fee_per_gas = tx_params["maxFeePerGas"] + assert updated_max_fee_per_gas > old_max_fee_per_gas + assert updated_max_fee_per_gas == math.ceil( + old_max_fee_per_gas * (1 + speedup_percentage) + ) + assert updated_max_fee_per_gas >= ( + current_base_fee + updated_max_priority_fee_per_gas + ) + + +def test_speedup_strategy_eip1559_tx_base_fee_decreased( + w3, eip1559_transaction, mocker +): + # 2) suppose current base fee decreased; calc remains simple + ( + current_base_fee, + max_tip_factor, + pending_tx, + speedup_percentage, + speedup_strategy, + suggested_tip, + ) = eip1559_setup(mocker, w3) + + old_max_priority_fee_per_gas = eip1559_transaction["maxPriorityFeePerGas"] + old_max_fee_per_gas = eip1559_transaction["maxFeePerGas"] + + new_current_base_fee = math.ceil(current_base_fee * 0.95) + mocker.patch.object( + w3.eth, "get_block", return_value={"baseFeePerGas": new_current_base_fee} + ) + + pending_tx.params = dict(eip1559_transaction) + tx_params = speedup_strategy.execute(pending_tx) + + updated_max_priority_fee_per_gas = tx_params["maxPriorityFeePerGas"] + assert updated_max_priority_fee_per_gas > old_max_priority_fee_per_gas + assert updated_max_priority_fee_per_gas == math.ceil( + old_max_priority_fee_per_gas * (1 + speedup_percentage) + ) + + updated_max_fee_per_gas = tx_params["maxFeePerGas"] + assert updated_max_fee_per_gas > old_max_fee_per_gas + assert updated_max_fee_per_gas == math.ceil( + old_max_fee_per_gas * (1 + speedup_percentage) + ) + assert updated_max_fee_per_gas >= ( + new_current_base_fee + updated_max_priority_fee_per_gas + ) + + +def test_speedup_strategy_eip1559_tx_base_fee_increased( + w3, eip1559_transaction, mocker +): + # suppose current base fee increase; it gets used instead of prior value + ( + current_base_fee, + max_tip_factor, + pending_tx, + speedup_percentage, + speedup_strategy, + suggested_tip, + ) = eip1559_setup(mocker, w3) + + old_max_priority_fee_per_gas = eip1559_transaction["maxPriorityFeePerGas"] + old_max_fee_per_gas = eip1559_transaction["maxFeePerGas"] + + new_current_base_fee = math.ceil(current_base_fee * 1.1) + mocker.patch.object( + w3.eth, "get_block", return_value={"baseFeePerGas": new_current_base_fee} + ) + + pending_tx.params = dict(eip1559_transaction) + tx_params = speedup_strategy.execute(pending_tx) + + updated_max_priority_fee_per_gas = tx_params["maxPriorityFeePerGas"] + assert updated_max_priority_fee_per_gas > old_max_priority_fee_per_gas + assert updated_max_priority_fee_per_gas == math.ceil( + old_max_priority_fee_per_gas * (1 + speedup_percentage) + ) + + updated_max_fee_per_gas = tx_params["maxFeePerGas"] + assert updated_max_fee_per_gas > old_max_fee_per_gas + assert updated_max_fee_per_gas > math.ceil( + old_max_fee_per_gas * (1 + speedup_percentage) + ) + assert updated_max_fee_per_gas == math.ceil( + new_current_base_fee * (1 + speedup_percentage) + + updated_max_priority_fee_per_gas + ) + + +def test_speedup_strategy_eip1559_tx_no_max_fee(w3, eip1559_transaction, mocker): + # suppose no maxFeePerGas specified - use default calc (same as web3py) + ( + current_base_fee, + max_tip_factor, + pending_tx, + speedup_percentage, + speedup_strategy, + suggested_tip, + ) = eip1559_setup(mocker, w3) + + old_max_priority_fee_per_gas = eip1559_transaction["maxPriorityFeePerGas"] + + mocker.patch.object( + w3.eth, "get_block", return_value={"baseFeePerGas": current_base_fee} + ) + tx_params = dict(eip1559_transaction) + del tx_params["maxFeePerGas"] + + pending_tx.params = tx_params + tx_params = speedup_strategy.execute(pending_tx) + + updated_max_priority_fee_per_gas = tx_params["maxPriorityFeePerGas"] + assert updated_max_priority_fee_per_gas == math.ceil( + old_max_priority_fee_per_gas * (1 + speedup_percentage) + ) + + updated_max_fee_per_gas = tx_params["maxFeePerGas"] + assert updated_max_fee_per_gas == math.ceil( + (current_base_fee * 2) + updated_max_priority_fee_per_gas + ) + + +def test_speedup_strategy_eip1559_tx_no_max_priority_fee( + w3, eip1559_transaction, mocker +): + # suppose no maxPriorityFeePerGas specified - use suggested tip + ( + current_base_fee, + max_tip_factor, + pending_tx, + speedup_percentage, + speedup_strategy, + suggested_tip, + ) = eip1559_setup(mocker, w3) + + mocker.patch.object( + w3.eth, "get_block", return_value={"baseFeePerGas": current_base_fee} + ) + tx_params = dict(eip1559_transaction) + del tx_params["maxPriorityFeePerGas"] + + pending_tx.params = tx_params + tx_params = speedup_strategy.execute(pending_tx) + + updated_max_priority_fee_per_gas = tx_params["maxPriorityFeePerGas"] + assert updated_max_priority_fee_per_gas == math.ceil( + suggested_tip * (1 + speedup_percentage) + ) + + updated_max_fee_per_gas = tx_params["maxFeePerGas"] + assert updated_max_fee_per_gas == math.ceil( + current_base_fee * (1 + speedup_percentage) + updated_max_priority_fee_per_gas + ) + + +def test_speedup_strategy_eip1559_tx_hit_max_tip(w3, eip1559_transaction, mocker): + ( + current_base_fee, + max_tip_factor, + pending_tx, + speedup_percentage, + speedup_strategy, + suggested_tip, + ) = eip1559_setup(mocker, w3) + + pending_tx.params = dict(eip1559_transaction) + expected_num_iterations_before_hitting_max_tip = math.ceil( + math.log(max_tip_factor) / math.log(1 + speedup_percentage) + ) + + # do one less iteration than expected + for i in range(expected_num_iterations_before_hitting_max_tip - 1): + tx_params = speedup_strategy.execute(pending_tx) + assert tx_params is not None + assert tx_params["maxPriorityFeePerGas"] <= (suggested_tip * max_tip_factor) + # update params + pending_tx.params = tx_params + + # next attempt should cause tip to exceed max tip factor + assert (pending_tx.params["maxPriorityFeePerGas"] * (1 + speedup_percentage)) > ( + suggested_tip * max_tip_factor + ) + tx_params = speedup_strategy.execute(pending_tx) + # hit max factor so no more changes - return None + assert ( + tx_params is None + ), f"no updates after {expected_num_iterations_before_hitting_max_tip} iterations" + + +def eip1559_setup(mocker, w3): + speedup_percentage = round(random.randint(110, 230) / 1000, 3) # [11%, 23%] + max_tip_factor = random.randint(2, 5) + speedup_strategy = ExponentialSpeedupStrategy( + w3, + speedup_increase_percentage=speedup_percentage, + max_tip_factor=max_tip_factor, + ) + pending_tx = mocker.Mock(spec=PendingTx) + pending_tx.id = 1 + pending_tx.txhash = HexBytes("0xdeadbeef") + + # use consistent mocked values + current_base_fee = w3.eth.get_block("latest")["baseFeePerGas"] + suggested_tip = w3.eth.max_priority_fee + mocker.patch.object( + w3.eth, "get_block", return_value={"baseFeePerGas": current_base_fee} + ) + mocker.patch( + "web3.eth.eth.Eth.max_priority_fee", PropertyMock(return_value=suggested_tip) + ) + return ( + current_base_fee, + max_tip_factor, + pending_tx, + speedup_percentage, + speedup_strategy, + suggested_tip, + )