Skip to content

Commit

Permalink
Track num requeues and num retries directly on FutureTx and PendingTx…
Browse files Browse the repository at this point in the history
…. This allows for not needing extra logic for clearing out the dictionary used in the original tracking logic.

Instead requeues and retries can be tracked on the tx object itself. Requeues for FutureTx can easily be incremented via the requeue() call on the tracker.It's a little trickier with PendingTx since we only deal with copies in the machine, but added a method to the tracker to make this easier.
Updated tests.
  • Loading branch information
derekpierre committed Mar 7, 2024
1 parent f820474 commit a2ea01c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 41 deletions.
41 changes: 9 additions & 32 deletions atxm/machine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from copy import copy, deepcopy
from typing import Dict, List, Optional, Type
from typing import List, Optional, Type

from eth_account.signers.local import LocalAccount
from eth_utils import ValidationError
Expand Down Expand Up @@ -139,10 +139,6 @@ def __init__(
self._task.clock = self.__CLOCK
self._task.interval = self._IDLE_INTERVAL

# requeues/retries
self._requeue_counter: Dict[int, int] = dict()
self._retry_failure_counter: Dict[int, int] = dict()

super().__init__()

self.add_observer(_Machine.LogObserver())
Expand Down Expand Up @@ -274,17 +270,11 @@ def __handle_active_transaction(self) -> None:

# Outcome 2: the pending transaction was reverted (final error)
except TransactionReverted as e:
# clear entry if exists
self._retry_failure_counter.pop(pending_tx.id, "None")

self._tx_tracker.fault(fault_error=e)
return

# Outcome 3: pending transaction is finalized (final success)
if receipt:
# clear entry if exists
self._retry_failure_counter.pop(pending_tx.id, "None")

final_txhash = receipt["transactionHash"]
confirmations = _get_confirmations(w3=self.w3, tx=pending_tx)
self.log.info(
Expand Down Expand Up @@ -374,14 +364,14 @@ def __strategize(self) -> None:
except InsufficientFunds:
# special case re-raise insufficient funds (for now)
# TODO #13
# TODO should the following also be done?
# self._tx_tracker.update_failed_retry_attempt(_active_copy)
raise
except (ValidationError, Web3Exception, ValueError) as e:
self._tx_tracker.update_failed_retry_attempt(_active_copy)
self.__handle_retry_failure(_active_copy, e)
return

# clear failed retries since successful
self._retry_failure_counter.pop(_active_copy.id, None)

_active_copy.txhash = txhash
self._tx_tracker.update_after_retry(_active_copy)

Expand All @@ -395,25 +385,19 @@ def __handle_retry_failure(self, pending_tx: PendingTx, e: Exception):
f"[retry] transaction #atx-{pending_tx.id}|{pending_tx.params['nonce']} "
f"failed with updated params - {str(e)}; retry again next round"
)
num_failed_retries = self._retry_failure_counter.get(pending_tx.id, 0)
num_failed_retries += 1
if num_failed_retries > self._MAX_REDO_ATTEMPTS:

if pending_tx.retries >= self._MAX_REDO_ATTEMPTS:
log.error(
f"[retry] transaction #atx-{pending_tx.id}|{pending_tx.params['nonce']} "
f"failed for {num_failed_retries} attempts; tx will no longer be retried"
f"failed for { pending_tx.retries} attempts; tx will no longer be retried"
)

# remove entry since no longer needed
self._retry_failure_counter.pop(pending_tx.id, None)

fault_error = TransactionFaulted(
tx=pending_tx,
fault=Fault.ERROR,
message=str(e),
)
self._tx_tracker.fault(fault_error=fault_error)
else:
self._retry_failure_counter[pending_tx.id] = num_failed_retries

def __broadcast(self):
"""
Expand Down Expand Up @@ -445,9 +429,6 @@ def __broadcast(self):
self.__handle_broadcast_failure(future_tx, e)
return

# clear requeue counter since successful
self._requeue_counter.pop(future_tx.id, None)

self._tx_tracker.morph(tx=future_tx, txhash=txhash)
pending_tx = self._tx_tracker.pending
if pending_tx.on_broadcast:
Expand All @@ -456,20 +437,18 @@ def __broadcast(self):
def __handle_broadcast_failure(self, future_tx: FutureTx, e: Exception):
is_broadcast_failure = False
if _is_recoverable_send_tx_error(e):
num_requeues = self._requeue_counter.get(future_tx.id, 0)
if num_requeues >= self._MAX_REDO_ATTEMPTS:
if future_tx.requeues >= self._MAX_REDO_ATTEMPTS:
is_broadcast_failure = True
log.error(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"failed for {num_requeues} attempts; tx will not be requeued"
f"failed for {future_tx.requeues} attempts; tx will not be requeued"
)
else:
log.warn(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"failed - {str(e)}; requeueing tx"
)
self._tx_tracker.requeue(future_tx)
self._requeue_counter[future_tx.id] = num_requeues + 1
else:
# non-recoverable
is_broadcast_failure = True
Expand All @@ -479,8 +458,6 @@ def __handle_broadcast_failure(self, future_tx: FutureTx, e: Exception):
)

if is_broadcast_failure:
# remove entry since no longer needed
self._requeue_counter.pop(future_tx.id, None)
hook = future_tx.on_broadcast_failure
if hook:
fire_hook(hook, future_tx, e)
Expand Down
11 changes: 11 additions & 0 deletions atxm/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ def update_after_retry(self, tx: PendingTx) -> PendingTx:

return self.pending

def update_failed_retry_attempt(self, tx: PendingTx):
if tx.id != self.__active.id:
raise RuntimeError(
f"Trying to update unexpected active tx: from {self.__active.id} to {tx.id}"
)
self.__active.retries += 1
# safety check
if tx is not self.__active:
tx.retries += 1

def morph(self, tx: FutureTx, txhash: TxHash) -> PendingTx:
"""
Morphs a future transaction into a pending transaction.
Expand Down Expand Up @@ -196,6 +206,7 @@ def pop(self) -> FutureTx:

def requeue(self, tx: FutureTx) -> None:
"""Re-queue a transaction for broadcast and subsequent tracking."""
tx.requeues += 1
self.__queue.append(tx)
self.commit()
log.info(
Expand Down
2 changes: 2 additions & 0 deletions atxm/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def from_dict(cls, data: Dict):

@dataclass
class FutureTx(AsyncTx):
requeues: int = field(default=0, init=False)
final: bool = field(default=False, init=False)
info: Optional[Dict] = None

Expand Down Expand Up @@ -80,6 +81,7 @@ def from_dict(cls, data: Dict):

@dataclass
class PendingTx(AsyncTx):
retries: int = field(default=0, init=False)
final: bool = field(default=False, init=False)
txhash: TxHash
created: int
Expand Down
17 changes: 8 additions & 9 deletions tests/test_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def test_broadcast(
yield deferLater(reactor, 0.2, lambda: None)
assert hook.call_count == 1

assert machine._requeue_counter.get(atx.id) is None # not tracked
assert atx.retries == 0

# tx only broadcasted and not finalized, so we are still busy
assert machine.current_state == machine._BUSY
Expand Down Expand Up @@ -371,7 +371,7 @@ def test_broadcast_non_recoverable_error(

assert broadcast_hook.call_count == 0

assert machine._requeue_counter.get(atx.id) is None # not tracked
assert atx.requeues == 0

# tx failed and not requeued
assert machine.current_state == machine._IDLE
Expand Down Expand Up @@ -434,6 +434,7 @@ def test_broadcast_recoverable_error(
for i in range(5):
yield clock.advance(1)
assert len(machine.queued) == 1 # remains in queue and not broadcasted
assert atx.requeues >= i

# call real method from now on
mocker.patch.object(w3.eth, "send_raw_transaction", side_effect=real_method)
Expand All @@ -457,8 +458,6 @@ def test_broadcast_recoverable_error(
assert broadcast_hook.call_count == 1
assert broadcast_failure_hook.call_count == 0

assert machine._requeue_counter.get(atx.id) is None # no longer tracked

# tx only broadcasted and not finalized, so we are still busy
assert machine.current_state == machine._BUSY

Expand Down Expand Up @@ -516,7 +515,7 @@ def test_broadcast_recoverable_error_requeues_exceeded(
for i in range(machine._MAX_REDO_ATTEMPTS - 1):
assert len(machine.queued) == 1 # remains in queue and not broadcasted
yield clock.advance(1)
assert machine._requeue_counter.get(atx.id, 0) >= i
assert atx.requeues >= i

# push over the retry limit
yield clock.advance(1)
Expand All @@ -526,7 +525,7 @@ def test_broadcast_recoverable_error_requeues_exceeded(
assert broadcast_failure_hook.call_count == 1
broadcast_failure_hook.assert_called_with(atx, error)

assert machine._requeue_counter.get(atx.id) is None # no longer tracked
assert atx.requeues == machine._MAX_REDO_ATTEMPTS

# The transaction failed and is not requeued
assert len(machine.queued) == 0
Expand Down Expand Up @@ -965,6 +964,7 @@ def test_retry_with_errors_but_recovers(
assert (
machine.pending
), "tx is being retried but encounters retry error and remains pending"
assert atx.retries >= i

assert strategy_1.execute.call_count > 0, "strategy #1 was called"
# retries failed, so params shouldn't have been updated
Expand All @@ -989,7 +989,6 @@ def test_retry_with_errors_but_recovers(

assert len(machine.queued) == 0
assert machine.pending is None
assert machine._retry_failure_counter.get(atx.id) is None # no longer tracked

assert not machine.busy
assert atx.final
Expand Down Expand Up @@ -1072,7 +1071,7 @@ def test_retry_with_errors_retries_exceeded(
for i in range(machine._MAX_REDO_ATTEMPTS):
assert machine.pending is not None
yield clock.advance(1)
assert machine._retry_failure_counter.get(atx.id, 0) >= i
assert atx.retries >= i

# push over retry limit
yield clock.advance(1)
Expand All @@ -1086,7 +1085,7 @@ def test_retry_with_errors_retries_exceeded(
assert fault_hook.call_count == 1
fault_hook.assert_called_with(atx)

assert machine._retry_failure_counter.get(atx.id) is None # no longer tracked
assert atx.retries == machine._MAX_REDO_ATTEMPTS

assert len(machine.queued) == 0
assert atx.final is False
Expand Down

0 comments on commit a2ea01c

Please sign in to comment.