Skip to content

Commit

Permalink
Merge pull request #30 from derekpierre/additional-testing
Browse files Browse the repository at this point in the history
Additional testing for tracker and tx modules - mostly around serialization/deserialization
  • Loading branch information
KPrasch authored Mar 11, 2024
2 parents 6248d7d + 3245c0f commit 667030f
Show file tree
Hide file tree
Showing 11 changed files with 2,105 additions and 223 deletions.
14 changes: 2 additions & 12 deletions atxm/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum

from web3.types import PendingTx, RPCError, TxReceipt
from web3.types import PendingTx, TxReceipt


class Fault(Enum):
Expand All @@ -13,9 +13,6 @@ class Fault(Enum):
# Strategy has been running for too long
TIMEOUT = "timeout"

# Transaction has been capped and subsequently timed out
PAUSE = "pause"

# Transaction reverted
REVERT = "revert"

Expand All @@ -26,17 +23,10 @@ class Fault(Enum):
INSUFFICIENT_FUNDS = "insufficient_funds"


class InsufficientFunds(RPCError):
class InsufficientFunds(Exception):
"""raised when a transaction exceeds the spending cap"""


class Wait(Exception):
"""
Raised when a strategy exceeds a limitation.
Used to mark a pending transaction as "wait, don't retry".
"""


class TransactionFaulted(Exception):
"""Raised when a transaction has been faulted."""

Expand Down
193 changes: 144 additions & 49 deletions atxm/machine.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
from copy import deepcopy
from typing import List, Optional, Type
from copy import copy, deepcopy
from typing import List, Optional

from eth_account.signers.local import LocalAccount
from eth_utils import ValidationError
from statemachine import State, StateMachine
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall
from web3 import Web3
from web3.exceptions import Web3Exception
from web3.types import TxParams

from atxm.exceptions import TransactionFaulted, TransactionReverted, Wait
from atxm.strategies import (
AsyncTxStrategy,
FixedRateSpeedUp,
InsufficientFundsPause,
TimeoutStrategy,
from atxm.exceptions import (
Fault,
InsufficientFunds,
TransactionFaulted,
TransactionReverted,
)
from atxm.strategies import AsyncTxStrategy, TimeoutStrategy
from atxm.tracker import _TxTracker
from atxm.tx import (
AsyncTx,
FutureTx,
PendingTx,
TxHash,
Expand All @@ -26,7 +29,7 @@
_get_average_blocktime,
_get_confirmations,
_get_receipt,
_handle_rpc_error,
_is_recoverable_send_tx_error,
_make_tx_params,
fire_hook,
)
Expand Down Expand Up @@ -89,11 +92,8 @@ class _Machine(StateMachine):
_BLOCK_INTERVAL = 20 # ~20 blocks
_BLOCK_SAMPLE_SIZE = 10_000 # blocks

STRATEGIES: List[Type[AsyncTxStrategy]] = [
InsufficientFundsPause,
TimeoutStrategy,
FixedRateSpeedUp,
]
# max requeues/retries
_MAX_REDO_ATTEMPTS = 3

class LogObserver:
"""StateMachine observer for logging information about state/transitions."""
Expand All @@ -108,14 +108,16 @@ def on_transition(self, source, target):
def __init__(
self,
w3: Web3,
tx_exec_timeout: int = TimeoutStrategy.TIMEOUT,
strategies: Optional[List[AsyncTxStrategy]] = None,
disk_cache: bool = False,
):
# public
self.w3 = w3
self.signers = {}
self.log = log
self._strategies = [s(w3) for s in self.STRATEGIES]
# default TimeoutStrategy using provided timeout - guardrail for users
self._strategies = [TimeoutStrategy(w3, timeout=tx_exec_timeout)]
if strategies:
self._strategies.extend(list(strategies))

Expand Down Expand Up @@ -240,7 +242,7 @@ def _sleep(self) -> None:
# Lifecycle
#

def __handle_active_transaction(self) -> bool:
def __handle_active_transaction(self) -> None:
"""
Handles the currently tracked pending transaction.
Expand All @@ -249,7 +251,7 @@ def __handle_active_transaction(self) -> bool:
1. paused
2. reverted (fault)
3. finalized
4. strategize: retry, wait, or fault
4. strategize: retry, do nothing and wait, or fault
Returns True if the next queued transaction can be broadcasted right now.
"""
Expand All @@ -262,7 +264,7 @@ def __handle_active_transaction(self) -> bool:
# Outcome 2: the pending transaction was reverted (final error)
except TransactionReverted as e:
self._tx_tracker.fault(fault_error=e)
return True
return

# Outcome 3: pending transaction is finalized (final success)
if receipt:
Expand All @@ -273,14 +275,13 @@ def __handle_active_transaction(self) -> bool:
f"with {confirmations} confirmation(s) txhash: {final_txhash.hex()}"
)
self._tx_tracker.finalize_active_tx(receipt=receipt)
return True
return

# Outcome 4: re-strategize the pending transaction
pending_tx = self.__strategize()
return pending_tx is not None
self.__strategize()

#
# Broadcast
# Broadcast tx
#

def __get_signer(self, address: str) -> LocalAccount:
Expand All @@ -290,7 +291,7 @@ def __get_signer(self, address: str) -> LocalAccount:
raise ValueError(f"Signer {address} not found")
return signer

def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]:
def __fire(self, tx: AsyncTx, msg: str) -> TxHash:
"""
Signs and broadcasts a transaction, handling RPC errors
and internal state changes.
Expand All @@ -301,70 +302,156 @@ def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]:
Morphs a `FutureTx` into a `PendingTx` and advances it
into the active transaction slot if broadcast is successful.
"""
signer: LocalAccount = self.__get_signer(tx._from)
signer: LocalAccount = self.__get_signer(tx.params["from"])
try:
txhash = self.w3.eth.send_raw_transaction(
signer.sign_transaction(tx.params).rawTransaction
)
except ValueError as e:
_handle_rpc_error(e, tx=tx)
return
except ValidationError as e:
# special case for insufficient funds
if "Sender does not have enough" in str(e):
# TODO raised exception should be handled in some way #13.
raise InsufficientFunds

raise e

self.log.info(
f"[{msg}] fired transaction #atx-{tx.id}|{tx.params['nonce']}|{txhash.hex()}"
)
pending_tx = self._tx_tracker.morph(tx=tx, txhash=txhash)
if tx.on_broadcast:
fire_hook(hook=tx.on_broadcast, tx=pending_tx)
return pending_tx
return txhash

def __strategize(self) -> Optional[PendingTx]:
def __strategize(self) -> None:
"""Retry the currently tracked pending transaction with the configured strategy."""
if not self._tx_tracker.pending:
raise RuntimeError("No active transaction to strategize")

_active_copy = deepcopy(self._tx_tracker.pending)
params_updated = False
for strategy in self._strategies:
try:
params = strategy.execute(pending=_active_copy)
except Wait as e:
log.info(f"[wait] strategy {strategy.__class__} signalled wait: {e}")
return
except TransactionFaulted as e:
self._tx_tracker.fault(fault_error=e)
return
if params:
# in case the strategy accidentally returns None
# keep the parameters as they are.
_active_copy.params.update(params)
params_updated = True

if not params_updated:
# mandatory default timeout strategy prevents this from being a forever wait
log.info(
f"[wait] strategies made no suggested updates to "
f"pending tx #{_active_copy.id} - skipping retry round"
)
return

# (!) retry the transaction with the new parameters
retry_params = TxParams(_active_copy.params)
_names = " -> ".join(s.name for s in self._strategies)
pending_tx = self.__fire(tx=retry_params, msg=_names)

try:
txhash = self.__fire(tx=_active_copy, msg=_names)
except InsufficientFunds:
# special case re-raise insufficient funds (for now)
# TODO #13
# TODO should the following also be done?
# self._tx_tracker.update_failed_retry_attempt(_active_copy)
raise
except (ValidationError, Web3Exception, ValueError) as e:
self._tx_tracker.update_failed_retry_attempt(_active_copy)
self.__handle_retry_failure(_active_copy, e)
return

_active_copy.txhash = txhash
self._tx_tracker.update_after_retry(_active_copy)

pending_tx = self._tx_tracker.pending
self.log.info(f"[retry] transaction #{pending_tx.id} has been re-broadcasted")
if pending_tx.on_broadcast:
fire_hook(hook=pending_tx.on_broadcast, tx=pending_tx)

return pending_tx
def __handle_retry_failure(self, attempted_tx: PendingTx, e: Exception):
log.warn(
f"[retry] transaction #atx-{attempted_tx.id}|{attempted_tx.params['nonce']} "
f"failed with updated params - {str(e)}; retry again next round"
)

def __broadcast(self) -> Optional[TxHash]:
if attempted_tx.retries >= self._MAX_REDO_ATTEMPTS:
log.error(
f"[retry] transaction #atx-{attempted_tx.id}|{attempted_tx.params['nonce']} "
f"failed for { attempted_tx.retries} attempts; tx will no longer be retried"
)

fault_error = TransactionFaulted(
tx=attempted_tx,
fault=Fault.ERROR,
message=str(e),
)
self._tx_tracker.fault(fault_error=fault_error)

def __broadcast(self):
"""
Attempts to broadcast the next `FutureTx` in the queue.
If the broadcast is not successful, it is re-queued.
"""
future_tx = self._tx_tracker._pop() # popleft
future_tx = self._tx_tracker.pop() # popleft
future_tx.params = _make_tx_params(future_tx.params)

# update nonce as necessary
signer = self.__get_signer(future_tx._from)
nonce = self.w3.eth.get_transaction_count(signer.address, "latest")
if nonce > future_tx.params["nonce"]:
self.log.warn(
f"[broadcast] nonce {future_tx.params['nonce']} has been front-run "
f"by another transaction. Updating queued tx nonce {future_tx.params['nonce']} -> {nonce}"
f"by another transaction. Updating queued tx "
f"nonce {future_tx.params['nonce']} -> {nonce}"
)
future_tx.params["nonce"] = nonce
pending_tx = self.__fire(tx=future_tx, msg="broadcast")
if not pending_tx:
self._tx_tracker._requeue(future_tx)

try:
txhash = self.__fire(tx=future_tx, msg="broadcast")
except InsufficientFunds:
# special case re-raise insufficient funds (for now)
# TODO #13
raise
except (ValidationError, Web3Exception, ValueError) as e:
# either requeue OR fail and move on to subsequent txs
self.__handle_broadcast_failure(future_tx, e)
return
return pending_tx.txhash

self._tx_tracker.morph(tx=future_tx, txhash=txhash)
pending_tx = self._tx_tracker.pending
if pending_tx.on_broadcast:
fire_hook(hook=pending_tx.on_broadcast, tx=pending_tx)

def __handle_broadcast_failure(self, future_tx: FutureTx, e: Exception):
is_broadcast_failure = False
if _is_recoverable_send_tx_error(e):
if future_tx.requeues >= self._MAX_REDO_ATTEMPTS:
is_broadcast_failure = True
log.error(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"failed for {future_tx.requeues} attempts; tx will not be requeued"
)
else:
log.warn(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"failed - {str(e)}; requeueing tx"
)
self._tx_tracker.requeue(future_tx)
else:
# non-recoverable
is_broadcast_failure = True
log.error(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"has non-recoverable failure - {str(e)}; tx will not be requeued"
)

if is_broadcast_failure:
hook = future_tx.on_broadcast_failure
if hook:
fire_hook(hook, future_tx, e)

#
# Monitoring
Expand All @@ -379,7 +466,7 @@ def __monitor_finalized(self) -> None:
if tx in self._tx_tracker.finalized:
self._tx_tracker.finalized.remove(tx)
self.log.info(
f"[clear] stopped tracking {tx.txhash.hex()} after {confirmations} confirmations"
f"[monitor] stopped tracking {tx.txhash.hex()} after {confirmations} confirmations"
)
continue
self.log.info(
Expand Down Expand Up @@ -418,9 +505,17 @@ def queue_transaction(
if signer.address not in self.signers:
self.signers[signer.address] = signer

tx = self._tx_tracker._queue(
_from=signer.address, params=params, *args, **kwargs
)
params_copy = copy(params)

from_param = params_copy.get("from")
if from_param is None:
params_copy["from"] = signer.address
if from_param and from_param != signer.address:
raise ValueError(
f"Mismatched 'from' value ({from_param}) and 'signer' account ({signer.address})"
)

tx = self._tx_tracker.queue_tx(params=params_copy, *args, **kwargs)
if not previously_busy_or_paused:
self._wake()

Expand Down
Loading

0 comments on commit 667030f

Please sign in to comment.