diff --git a/atxm/exceptions.py b/atxm/exceptions.py index 522b6b5..1e15e36 100644 --- a/atxm/exceptions.py +++ b/atxm/exceptions.py @@ -16,14 +16,18 @@ class Fault(Enum): # Something went wrong ERROR = "error" - # ... - INSUFFICIENT_FUNDS = "insufficient_funds" - class InsufficientFunds(Exception): """raised when a transaction exceeds the spending cap""" +class RPCException(Exception): + def __init__(self, error_code: int, error_message: str): + self.error_code = error_code + self.error_message = error_message + super().__init__(f"RPC Error [{error_code}]: {error_message}") + + class TransactionFaulted(Exception): """Raised when a transaction has been faulted.""" diff --git a/atxm/machine.py b/atxm/machine.py index 13e2a04..2a69f34 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -1,14 +1,12 @@ from copy import copy, deepcopy -from typing import List, Optional +from typing import List, Optional, Union from eth_account.signers.local import LocalAccount -from eth_utils import ValidationError from statemachine import State, StateMachine from twisted.internet import reactor from twisted.internet.defer import Deferred from twisted.internet.task import LoopingCall from web3 import Web3 -from web3.exceptions import Web3Exception from web3.types import TxParams from atxm.exceptions import ( @@ -19,7 +17,6 @@ from atxm.strategies import AsyncTxStrategy, TimeoutStrategy from atxm.tracker import _TxTracker from atxm.tx import ( - AsyncTx, FutureTx, PendingTx, TxHash, @@ -28,6 +25,7 @@ _get_average_blocktime, _get_confirmations, _get_receipt, + _process_send_raw_transaction_exception, _is_recoverable_send_tx_error, _make_tx_params, fire_hook, @@ -283,29 +281,18 @@ def __get_signer(self, address: str) -> LocalAccount: raise ValueError(f"Signer {address} not found") return signer - def __fire(self, tx: AsyncTx, msg: str) -> TxHash: + def __fire(self, tx: Union[FutureTx, PendingTx], msg: str) -> TxHash: """ Signs and broadcasts a transaction, handling RPC errors and internal state changes. - - On success, returns the `PendingTx` object. - On failure, returns None. - - Morphs a `FutureTx` into a `PendingTx` and advances it - into the active transaction slot if broadcast is successful. """ signer: LocalAccount = self.__get_signer(tx.params["from"]) try: txhash = self.w3.eth.send_raw_transaction( signer.sign_transaction(tx.params).rawTransaction ) - except ValidationError as e: - # special case for insufficient funds - if "Sender does not have enough" in str(e): - # TODO raised exception should be handled in some way #13. - raise InsufficientFunds - - raise e + except Exception as e: + raise _process_send_raw_transaction_exception(e) self.log.info( f"[{msg}] fired transaction #atx-{tx.id}|{tx.params['nonce']}|{txhash.hex()}" @@ -333,7 +320,7 @@ def __strategize(self) -> None: if not params_updated: # mandatory default timeout strategy prevents this from being a forever wait - log.info( + self.log.info( f"[wait] strategies made no suggested updates to " f"pending tx #{_active_copy.id} - skipping retry round" ) @@ -344,13 +331,17 @@ def __strategize(self) -> None: try: txhash = self.__fire(tx=_active_copy, msg=_names) - except InsufficientFunds: - # special case re-raise insufficient funds (for now) - # TODO #13 - # TODO should the following also be done? - # self._tx_tracker.update_failed_retry_attempt(_active_copy) - raise - except (ValidationError, Web3Exception, ValueError) as e: + except InsufficientFunds as e: + # special case + self.log.error( + f"[insufficient funds] transaction #atx-{_active_copy.id}|{_active_copy.params['nonce']} " + f"failed because of insufficient funds - {e}" + ) + # get hook from actual pending object (not a deep copy) + hook = self._tx_tracker.pending.on_insufficient_funds + fire_hook(hook, _active_copy, e) + return + except Exception as e: self._tx_tracker.update_active_after_failed_strategy_update(_active_copy) self.__handle_retry_failure(_active_copy, e) return @@ -366,13 +357,13 @@ def __strategize(self) -> None: fire_hook(hook=pending_tx.on_broadcast, tx=pending_tx) def __handle_retry_failure(self, attempted_tx: PendingTx, e: Exception): - log.warn( + self.log.warn( f"[retry] transaction #atx-{attempted_tx.id}|{attempted_tx.params['nonce']} " f"failed with updated params - {str(e)}; retry again next round" ) if attempted_tx.retries >= self._MAX_RETRY_ATTEMPTS: - log.error( + self.log.error( f"[retry] transaction #atx-{attempted_tx.id}|{attempted_tx.params['nonce']} " f"failed for { attempted_tx.retries} attempts; tx will no longer be retried" ) @@ -405,11 +396,15 @@ def __broadcast(self): try: txhash = self.__fire(tx=future_tx, msg="broadcast") - except InsufficientFunds: - # special case re-raise insufficient funds (for now) - # TODO #13 - raise - except (ValidationError, Web3Exception, ValueError) as e: + except InsufficientFunds as e: + # special case + self.log.error( + f"[insufficient funds] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} " + f"failed because of insufficient funds - {e}" + ) + fire_hook(future_tx.on_insufficient_funds, future_tx, e) + return + except Exception as e: # notify user of failure and have them decide self.__handle_broadcast_failure(future_tx, e) return @@ -424,22 +419,22 @@ def __handle_broadcast_failure(self, future_tx: FutureTx, e: Exception): if _is_recoverable_send_tx_error(e): if future_tx.retries >= self._MAX_RETRY_ATTEMPTS: is_broadcast_failure = True - log.error( + self.log.error( f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} " f"failed after {future_tx.retries} retry attempts" ) else: - log.warn( + self.log.warn( f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} " - f"failed - {str(e)}; tx will be retried" + f"failed - {e}; tx will be retried" ) self._tx_tracker.increment_broadcast_retries(future_tx) else: # non-recoverable is_broadcast_failure = True - log.error( + self.log.error( f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} " - f"has non-recoverable failure - {str(e)}; no automatic retries" + f"has non-recoverable failure - {e}" ) if is_broadcast_failure: diff --git a/atxm/tracker.py b/atxm/tracker.py index 6cd5941..85710ec 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -4,11 +4,11 @@ from copy import copy from json import JSONDecodeError from pathlib import Path -from typing import Callable, Deque, Dict, Optional, Set, Tuple +from typing import Callable, Deque, Dict, Optional, Set, Tuple, Union from web3.types import TxParams, TxReceipt -from atxm.exceptions import TransactionFaulted +from atxm.exceptions import InsufficientFunds, TransactionFaulted from atxm.logging import log from atxm.tx import ( FinalizedTx, @@ -239,6 +239,9 @@ def queue_tx( on_broadcast_failure: Callable[[FutureTx, Exception], None], on_fault: Callable[[FaultedTx], None], on_finalized: Callable[[FinalizedTx], None], + on_insufficient_funds: Callable[ + [Union[FutureTx, PendingTx], InsufficientFunds], None + ], info: Dict[str, str] = None, on_broadcast: Optional[Callable[[PendingTx], None]] = None, ) -> FutureTx: @@ -254,6 +257,7 @@ def queue_tx( tx.on_fault = on_fault tx.on_finalized = on_finalized tx.on_broadcast = on_broadcast + tx.on_insufficient_funds = on_insufficient_funds self.__queue.append(tx) self.commit() diff --git a/atxm/tx.py b/atxm/tx.py index ef2b44d..22542f4 100644 --- a/atxm/tx.py +++ b/atxm/tx.py @@ -1,7 +1,7 @@ import json from abc import ABC, abstractmethod from dataclasses import dataclass, field -from typing import Callable, Dict, Optional +from typing import Callable, Dict, Optional, Union from eth_typing import ChecksumAddress from eth_utils import encode_hex @@ -18,16 +18,19 @@ class AsyncTx(ABC): id: int final: bool = field(default=None, init=False) fault: Optional[Fault] = field(default=None, init=False) - on_broadcast: Optional[Callable[["PendingTx"], None]] = field( + on_broadcast_failure: Optional[Callable[["FutureTx", Exception], None]] = field( default=None, init=False ) - on_broadcast_failure: Optional[Callable[["FutureTx", Exception], None]] = field( + on_broadcast: Optional[Callable[["PendingTx"], None]] = field( default=None, init=False ) + on_fault: Optional[Callable[["FaultedTx"], None]] = field(default=None, init=False) on_finalized: Optional[Callable[["FinalizedTx"], None]] = field( default=None, init=False ) - on_fault: Optional[Callable[["FaultedTx"], None]] = field(default=None, init=False) + on_insufficient_funds: Optional[ + Callable[[Union["FutureTx", "PendingTx"]], None] + ] = field(default=None, init=False) def __repr__(self): return f"<{self.__class__.__name__} id={self.id} final={self.final}>" diff --git a/atxm/utils.py b/atxm/utils.py index 627fc1a..05c407d 100644 --- a/atxm/utils.py +++ b/atxm/utils.py @@ -2,6 +2,7 @@ from typing import Callable, Optional from cytoolz import memoize +from eth_utils import ValidationError from twisted.internet import reactor from web3 import Web3 from web3.exceptions import ( @@ -10,9 +11,10 @@ TooManyRequests, TransactionNotFound, ) -from web3.types import TxData, TxParams +from web3.types import RPCError, TxData, TxParams from web3.types import TxReceipt, Wei +from atxm.exceptions import InsufficientFunds, RPCException from atxm.logging import log from atxm.tx import AsyncTx, PendingTx, TxHash @@ -151,3 +153,24 @@ def _make_tx_params(data: TxData) -> TxParams: def _is_recoverable_send_tx_error(e: Exception) -> bool: return isinstance(e, (TooManyRequests, ProviderConnectionError, TimeExhausted)) + + +def _process_send_raw_transaction_exception(e: Exception): + try: + error = RPCError(**e.args[0]) + except TypeError: + # not an RPCError + if isinstance( + e, ValidationError + ) and "Sender does not have enough balance" in str(e): + raise InsufficientFunds(str(e)) from e + + raise e + else: + error_code = error["code"] + error_message = error["message"] + if error_code == -32000: # IPC Error + if "insufficient funds" in error_message: + raise InsufficientFunds(error_message) + + raise RPCException(error_code, error_message) from e