Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling of FaultedTxs after tx broadcasted #17

Merged
merged 25 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9935080
Rename module from tacker -> tracker.
derekpierre Feb 28, 2024
17829a7
Clean up logging.
derekpierre Feb 28, 2024
b71b110
Raise ValueError if tx type unrecognized. Ensures that we always hand…
derekpierre Feb 28, 2024
da297a3
Remove the use of on_pause hook - pauses are user-controlled so no us…
derekpierre Feb 28, 2024
7660740
Strategy raising Wait should not cause pause, but rather simply skip …
derekpierre Feb 28, 2024
887c589
Pause can be activated from either Busy or Idle states.
derekpierre Feb 28, 2024
ea62ab6
Wake should immediately run the looping call - either we are idle and…
derekpierre Feb 28, 2024
c29c9d4
Improved testing of machine.
derekpierre Feb 28, 2024
03b49dd
Wait for hook to be called.
derekpierre Feb 28, 2024
31267f5
Add test that calls pause while already paused, and resume when not p…
derekpierre Feb 28, 2024
cb7f893
Don't calll _wake if previous state was busy/pause after queueing a tx.
derekpierre Feb 28, 2024
96e1435
Force a move to paused state instead of waiting for next iteration.
derekpierre Feb 29, 2024
5540405
Improve testing around pausing/waking given that pause now forces a m…
derekpierre Feb 29, 2024
7a98fe6
Don't unpause based on strategies - pauses are explicitly user-directed.
derekpierre Feb 29, 2024
4fc1845
Rename FaultyTx to FaultedTx.
derekpierre Feb 29, 2024
4108ada
Remove tracking of faulted txs.
derekpierre Feb 29, 2024
9fd6729
Change logging annotation from [state] -> [tracker] for clarity.
derekpierre Feb 29, 2024
a73a662
Always clear a faulted tx - don't give the option not to clear it.
derekpierre Feb 29, 2024
2fac537
Add typehints for hooks provided when queuing a transaction.
derekpierre Feb 29, 2024
836cae2
TransactionReverted is really just a subclass of TransactionFaulted; …
derekpierre Feb 29, 2024
299940a
fault() now takes a TransactionFaulted exception and constructs the F…
derekpierre Feb 29, 2024
25606cd
Add typehints for on_fault callback.
derekpierre Feb 29, 2024
65c2148
Improve logging when fault occurs.
derekpierre Feb 29, 2024
4bb93a9
Make strategies a protected member for safety.
derekpierre Feb 29, 2024
14a66e2
Add tests for faulted tx conditions - revert or faulted from strategy.
derekpierre Feb 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ Hooks are fired in a dedicated thread for lifecycle events.

- `on_broadcast`: When a transaction is broadcasted.
- `on_finalized`: When a transaction is finalized.
- `on_pause`: When a transaction is halted by the strategy.
- `on_fault`: When a transaction reverted or another error occurred.


Expand Down
17 changes: 10 additions & 7 deletions atxm/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum

from web3.types import RPCError
from web3.types import PendingTx, RPCError, TxReceipt


class Fault(Enum):
Expand Down Expand Up @@ -37,16 +37,19 @@ class Wait(Exception):
"""


class TransactionFault(Exception):
"""raised when a transaction has been faulted"""
class TransactionFaulted(Exception):
"""Raised when a transaction has been faulted."""

def __init__(self, tx, fault: Fault, clear: bool, message: str):
def __init__(self, tx: PendingTx, fault: Fault, message: str):
self.tx = tx
self.fault = fault
self.message = message
self.clear = clear
super().__init__(message)


class TransactionReverted(Exception):
"""raised when a transaction has been reverted"""
class TransactionReverted(TransactionFaulted):
"""Raised when a transaction has been reverted."""

def __init__(self, tx: PendingTx, receipt: TxReceipt, message: str):
self.receipt = receipt
super().__init__(tx=tx, fault=Fault.REVERT, message=message)
164 changes: 86 additions & 78 deletions atxm/machine.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
from copy import deepcopy
from typing import Optional, List, Type
from typing import List, Optional, Type

from eth_account.signers.local import LocalAccount
from statemachine import StateMachine, State
from statemachine import State, StateMachine
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall
from web3 import Web3
from web3.types import TxParams

from atxm.exceptions import (
Wait,
TransactionReverted,
TransactionFault,
Fault,
)
from atxm.tacker import _TxTracker
from atxm.exceptions import TransactionFaulted, TransactionReverted, Wait
from atxm.strategies import (
AsyncTxStrategy,
FixedRateSpeedUp,
InsufficientFundsPause,
TimeoutPause,
FixedRateSpeedUp,
)
from atxm.tracker import _TxTracker
from atxm.tx import (
FutureTx,
PendingTx,
Expand All @@ -32,8 +27,8 @@
_get_confirmations,
_get_receipt,
_handle_rpc_error,
fire_hook,
_make_tx_params,
fire_hook,
)
from .logging import log

Expand All @@ -48,33 +43,39 @@ class _Machine(StateMachine):
#
# State Machine:
#
# Idle <---> Busy <---> Paused
# | ^ | ^ | ^
# V | V | V |
# _ _ _
# Pause
# ^ ^
# / \
# V v
# Idle <---> Busy
# | ^ | ^
# V | V |
# _ _
#
_BUSY = State("Busy")
_IDLE = State("Idle", initial=True)
_PAUSED = State("Paused")

# - State Transitions -
_transition_to_idle = _BUSY.to(_IDLE, unless="_busy") # Busy -> Idle
_transition_to_paused = _BUSY.to(_PAUSED, cond="_pause") # Busy -> Pause
_transition_to_paused = _BUSY.to(_PAUSED, cond="_pause") | _IDLE.to(
_PAUSED, cond="_pause"
) # Busy/Idle -> Pause
_transition_to_idle = _BUSY.to(_IDLE, unless=["_busy", "_pause"]) | _PAUSED.to(
_IDLE, unless=["_busy", "_pause"]
) # Busy/Paused -> Idle
_transition_to_busy = _IDLE.to(_BUSY, cond="_busy") | _PAUSED.to(
_BUSY, unless="_pause"
) # Idle/Pause -> Busy
# self transitions i.e. remain in same state
_remain_busy = _BUSY.to.itself(cond="_busy", unless="_pause")
_remain_idle = _IDLE.to.itself(unless="_busy")
_remain_paused = _PAUSED.to.itself(cond="_pause")
_remain_idle = _IDLE.to.itself(unless=["_busy", "_pause"])

_cycle_state = (
_transition_to_idle
| _transition_to_paused
| _transition_to_busy
| _remain_busy
| _remain_idle
| _remain_paused
)

# internal
Expand Down Expand Up @@ -114,9 +115,9 @@ def __init__(
self.w3 = w3
self.signers = {}
self.log = log
self.strategies = [s(w3) for s in self.STRATEGIES]
self._strategies = [s(w3) for s in self.STRATEGIES]
if strategies:
self.strategies.extend(list(strategies))
self._strategies.extend(list(strategies))

# state
self._pause = False
Expand Down Expand Up @@ -157,30 +158,17 @@ def _handle_errors(self, *args, **kwargs):
@_transition_to_paused.before
def _enter_pause_mode(self):
self.log.info("[pause] pause mode activated")
return

@_PAUSED.enter
def _process_paused(self):
# TODO what action should be taken to check if we leave the pause state?
# Perhaps a call to self.__strategize()
return

@_IDLE.enter
def _process_idle(self):
"""Return to idle mode if not already there (slow down)"""
if self._task.interval != self._IDLE_INTERVAL:
# TODO does changing the interval value actually update the LoopingCall?
self._task.interval = self._IDLE_INTERVAL
self.log.info(
f"[done] returning to idle mode with "
f"{self._task.interval} second interval"
)
self._sleep()

# TODO
# 1. don't always sleep (idle for some number of cycles?)
# 2. careful sleeping - potential concurrency concerns
# 3. there is currently no difference between sleep/idle ...
self._sleep()
@_transition_to_idle.before
def _enter_idle_mode(self):
self._task.interval = self._IDLE_INTERVAL
self.log.info(
f"[idle] returning to idle mode with {self._task.interval} second interval"
)

@_transition_to_busy.before
def _enter_busy_mode(self):
Expand All @@ -191,7 +179,7 @@ def _enter_busy_mode(self):
self._task.interval = max(
round(average_block_time * self._BLOCK_INTERVAL), self._MIN_INTERVAL
)
self.log.info(f"[working] cycle interval is {self._task.interval} seconds")
self.log.info(f"[working] cycle interval is now {self._task.interval} seconds")

@_BUSY.enter
def _process_busy(self):
Expand Down Expand Up @@ -234,9 +222,14 @@ def _stop(self):
self._task.stop()

def _wake(self) -> None:
if not self._task.running:
log.info("[wake] waking up")
self._start(now=True)
"""Runs the looping call immediately."""
log.info("[wake] running looping call now.")
if self._task.running:
# TODO instead of stopping/starting, can you set interval to 0
# and call reset to have looping call immediately?
self._stop()

self._start(now=True)

def _sleep(self) -> None:
if self._task.running:
Expand Down Expand Up @@ -267,12 +260,8 @@ def __handle_active_transaction(self) -> bool:
receipt = _get_receipt(w3=self.w3, pending_tx=pending_tx)

# Outcome 2: the pending transaction was reverted (final error)
except TransactionReverted:
self._tx_tracker.fault(
error=pending_tx.txhash.hex(),
fault=Fault.REVERT,
clear_active=True,
)
except TransactionReverted as e:
self._tx_tracker.fault(fault_error=e)
return True

# Outcome 3: pending transaction is finalized (final success)
Expand Down Expand Up @@ -328,50 +317,29 @@ def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]:
fire_hook(hook=tx.on_broadcast, tx=pending_tx)
return pending_tx

def pause(self) -> None:
self._pause = True
self.log.warn(
f"[pause] pending transaction {self._tx_tracker.pending.txhash.hex()} has been paused."
)
hook = self._tx_tracker.pending.on_pause
if hook:
fire_hook(hook=hook, tx=self._tx_tracker.pending)

def resume(self) -> None:
self.log.info("[pause] pause lifted by strategy")
self._pause = False # resume

def __strategize(self) -> Optional[PendingTx]:
"""Retry the currently tracked pending transaction with the configured strategy."""
if not self._tx_tracker.pending:
raise RuntimeError("No active transaction to strategize")

_active_copy = deepcopy(self._tx_tracker.pending)
for strategy in self.strategies:
for strategy in self._strategies:
try:
params = strategy.execute(pending=_active_copy)
except Wait as e:
log.info(f"[pause] strategy {strategy.__class__} signalled pause: {e}")
self.pause()
log.info(f"[wait] strategy {strategy.__class__} signalled wait: {e}")
return
except TransactionFault as e:
self._tx_tracker.fault(
error=self._tx_tracker.pending.txhash.hex(),
fault=e.fault,
clear_active=e.clear,
)
except TransactionFaulted as e:
self._tx_tracker.fault(fault_error=e)
return
if params:
# in case the strategy accidentally returns None
# keep the parameters as they are.
_active_copy.params.update(params)

if self._pause:
self.resume()

# (!) retry the transaction with the new parameters
retry_params = TxParams(_active_copy.params)
_names = " -> ".join(s.name for s in self.strategies)
_names = " -> ".join(s.name for s in self._strategies)
pending_tx = self.__fire(tx=retry_params, msg=_names)
self.log.info(f"[retry] transaction #{pending_tx.id} has been re-broadcasted")

Expand Down Expand Up @@ -417,3 +385,43 @@ def __monitor_finalized(self) -> None:
self.log.info(
f"[monitor] transaction {tx.txhash.hex()} has {confirmations} confirmations"
)

#
# Exposed functions
#
def pause(self) -> None:
"""
Pause the machine's tx processing loop; no txs are processed until unpaused (resume()).
"""
if not self._pause:
self._pause = True
self.log.info("[pause] pause mode requested")
self._cycle_state() # force a move to PAUSED state (don't wait for next iteration)

def resume(self) -> None:
"""Unpauses (resumes) the machine's tx processing loop."""
if self._pause:
self._pause = False
self.log.info("[pause] pause mode deactivated")
self._wake()

def queue_transaction(
self, params: TxParams, signer: LocalAccount, *args, **kwargs
) -> FutureTx:
"""
Queue a new transaction for broadcast and subsequent tracking.
Optionally provide a dictionary of additional string data
to log during the transaction's lifecycle for identification.
"""
previously_busy_or_paused = self._busy or self._pause

if signer.address not in self.signers:
self.signers[signer.address] = signer

tx = self._tx_tracker._queue(
_from=signer.address, params=params, *args, **kwargs
)
if not previously_busy_or_paused:
self._wake()

return tx
23 changes: 0 additions & 23 deletions atxm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
FinalizedTx,
FutureTx,
PendingTx,
AsyncTx,
)


Expand Down Expand Up @@ -52,28 +51,6 @@ def finalized(self) -> Set[FinalizedTx]:
"""Return a set of finalized transactions."""
return set(self._tx_tracker.finalized)

@property
def faults(self) -> List[AsyncTx]:
"""Return a set of faulted transactions."""
return list(self._tx_tracker.faulty)

def queue_transaction(
self, params: TxParams, signer: LocalAccount, *args, **kwargs
) -> FutureTx:
"""
Queue a new transaction for broadcast and subsequent tracking.
Optionally provide a dictionary of additional string data
to log during the transaction's lifecycle for identification.
"""
if signer.address not in self.signers:
self.signers[signer.address] = signer
tx = self._tx_tracker._queue(
_from=signer.address, params=params, *args, **kwargs
)
if not self._task.running:
self._wake()
return tx

def queue_transactions(
self, params: List[TxParams], signer: LocalAccount, *args, **kwargs
) -> List[FutureTx]:
Expand Down
Loading
Loading