From 9935080ff0e744f7f6b437855017ea90fb2131a8 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 09:35:31 -0500 Subject: [PATCH 01/25] Rename module from tacker -> tracker. --- atxm/machine.py | 2 +- atxm/{tacker.py => tracker.py} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename atxm/{tacker.py => tracker.py} (100%) diff --git a/atxm/machine.py b/atxm/machine.py index e7c3488..cc77640 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -15,7 +15,7 @@ TransactionFault, Fault, ) -from atxm.tacker import _TxTracker +from atxm.tracker import _TxTracker from atxm.strategies import ( AsyncTxStrategy, InsufficientFundsPause, diff --git a/atxm/tacker.py b/atxm/tracker.py similarity index 100% rename from atxm/tacker.py rename to atxm/tracker.py From 17829a7625bc25ac697ff618d7ebe79c700c318f Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 09:36:57 -0500 Subject: [PATCH 02/25] Clean up logging. --- atxm/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atxm/utils.py b/atxm/utils.py index 067dc8e..0a210a1 100644 --- a/atxm/utils.py +++ b/atxm/utils.py @@ -71,7 +71,7 @@ def _get_receipt(w3: Web3, pending_tx: PendingTx) -> Optional[TxReceipt]: # If it is equals 0 the transaction was reverted by EVM. # https://web3py.readthedocs.io/en/stable/web3.eth.html#web3.eth.Eth.get_transaction_receipt log.warn( - f"Transaction {txdata['hash'].hex()} was reverted by EVM with status {status}" + f"[reverted] Transaction {txdata['hash'].hex()} was reverted by EVM with status {status}" ) raise TransactionReverted(receipt) From b71b1106ad86d60a116ac64736f6d54587c82deb Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 09:37:37 -0500 Subject: [PATCH 03/25] Raise ValueError if tx type unrecognized. Ensures that we always handling the different types of txs appropriately. --- atxm/utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/atxm/utils.py b/atxm/utils.py index 0a210a1..ef4c0d3 100644 --- a/atxm/utils.py +++ b/atxm/utils.py @@ -159,4 +159,7 @@ def _make_tx_params(data: TxData) -> TxParams: params["type"] = "0x02" params["maxFeePerGas"] = data["maxFeePerGas"] params["maxPriorityFeePerGas"] = data["maxPriorityFeePerGas"] + else: + raise ValueError(f"unrecognized tx data: {data}") + return params From da297a3f9f5f91a403e0663471d9713890d8d34f Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 14:22:36 -0500 Subject: [PATCH 04/25] Remove the use of on_pause hook - pauses are user-controlled so no use in providing a callback to alert the user about pausing. Move queue_transaction to _Machine instead - more applicable there since internals of _Machine are used. Move exposed functions to their own section in _Machine. --- README.md | 1 - atxm/machine.py | 51 ++++++++++++++++++++++++++++++++++++------------- atxm/main.py | 17 ----------------- atxm/tracker.py | 2 -- atxm/tx.py | 1 - 5 files changed, 38 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index b2b4e39..fee30a9 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,6 @@ Hooks are fired in a dedicated thread for lifecycle events. - `on_broadcast`: When a transaction is broadcasted. - `on_finalized`: When a transaction is finalized. -- `on_pause`: When a transaction is halted by the strategy. - `on_fault`: When a transaction reverted or another error occurred. diff --git a/atxm/machine.py b/atxm/machine.py index cc77640..1276ddf 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -328,19 +328,6 @@ def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]: fire_hook(hook=tx.on_broadcast, tx=pending_tx) return pending_tx - def pause(self) -> None: - self._pause = True - self.log.warn( - f"[pause] pending transaction {self._tx_tracker.pending.txhash.hex()} has been paused." - ) - hook = self._tx_tracker.pending.on_pause - if hook: - fire_hook(hook=hook, tx=self._tx_tracker.pending) - - def resume(self) -> None: - self.log.info("[pause] pause lifted by strategy") - self._pause = False # resume - def __strategize(self) -> Optional[PendingTx]: """Retry the currently tracked pending transaction with the configured strategy.""" if not self._tx_tracker.pending: @@ -417,3 +404,41 @@ def __monitor_finalized(self) -> None: self.log.info( f"[monitor] transaction {tx.txhash.hex()} has {confirmations} confirmations" ) + + # + # Exposed functions + # + def pause(self) -> None: + """ + Pause the machine's tx processing loop; no txs are processed until unpaused (resume()). + """ + self._pause = True + self.log.info("[pause] pause mode requested") + + def resume(self) -> None: + """Unpauses (resumes) the machine's tx processing loop.""" + if self._pause: + self._pause = False + self.log.info("[pause] pause mode deactivated") + self._wake() + + def queue_transaction( + self, params: TxParams, signer: LocalAccount, *args, **kwargs + ) -> FutureTx: + """ + Queue a new transaction for broadcast and subsequent tracking. + Optionally provide a dictionary of additional string data + to log during the transaction's lifecycle for identification. + """ + previously_busy = self._busy + + if signer.address not in self.signers: + self.signers[signer.address] = signer + + tx = self._tx_tracker._queue( + _from=signer.address, params=params, *args, **kwargs + ) + if not previously_busy: + self._wake() + + return tx diff --git a/atxm/main.py b/atxm/main.py index ba79acf..b6eccd5 100644 --- a/atxm/main.py +++ b/atxm/main.py @@ -57,23 +57,6 @@ def faults(self) -> List[AsyncTx]: """Return a set of faulted transactions.""" return list(self._tx_tracker.faulty) - def queue_transaction( - self, params: TxParams, signer: LocalAccount, *args, **kwargs - ) -> FutureTx: - """ - Queue a new transaction for broadcast and subsequent tracking. - Optionally provide a dictionary of additional string data - to log during the transaction's lifecycle for identification. - """ - if signer.address not in self.signers: - self.signers[signer.address] = signer - tx = self._tx_tracker._queue( - _from=signer.address, params=params, *args, **kwargs - ) - if not self._task.running: - self._wake() - return tx - def queue_transactions( self, params: List[TxParams], signer: LocalAccount, *args, **kwargs ) -> List[FutureTx]: diff --git a/atxm/tracker.py b/atxm/tracker.py index 94cc7d4..2d71e85 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -199,7 +199,6 @@ def _queue( info: Dict[str, str] = None, on_broadcast: Optional[Callable] = None, on_finalized: Optional[Callable] = None, - on_pause: Optional[Callable] = None, on_fault: Optional[Callable] = None, ) -> FutureTx: """Queue a new transaction for broadcast and subsequent tracking.""" @@ -213,7 +212,6 @@ def _queue( # configure hooks tx.on_broadcast = on_broadcast tx.on_finalized = on_finalized - tx.on_pause = on_pause tx.on_fault = on_fault self.__queue.append(tx) diff --git a/atxm/tx.py b/atxm/tx.py index 7b6b4ce..a262dc4 100644 --- a/atxm/tx.py +++ b/atxm/tx.py @@ -23,7 +23,6 @@ class AsyncTx(ABC): on_finalized: Optional[Callable[["FinalizedTx"], None]] = field( default=None, init=False ) - on_pause: Optional[Callable[[PendingTx], None]] = field(default=None, init=False) on_fault: Optional[Callable] = field(default=None, init=False) def __repr__(self): From 766074096de4d91358f4181225cdb9f284382e88 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 14:25:06 -0500 Subject: [PATCH 05/25] Strategy raising Wait should not cause pause, but rather simply skip any processing for this execution round of the machine; reasses during next execution round. --- atxm/machine.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index 1276ddf..e0256bb 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -338,8 +338,7 @@ def __strategize(self) -> Optional[PendingTx]: try: params = strategy.execute(pending=_active_copy) except Wait as e: - log.info(f"[pause] strategy {strategy.__class__} signalled pause: {e}") - self.pause() + log.info(f"[wait] strategy {strategy.__class__} signalled wait: {e}") return except TransactionFault as e: self._tx_tracker.fault( From 887c589a9c7aa235a541fcf5437a9d50b2eefbf7 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 14:27:32 -0500 Subject: [PATCH 06/25] Pause can be activated from either Busy or Idle states. Pause will cause the looping task to stop (sleep), and resume will cause the looping task to start (wake). Idle state itself does not need processing, just setting of the interval when it is transitioned into. --- atxm/machine.py | 51 +++++++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index e0256bb..03271fa 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -48,25 +48,32 @@ class _Machine(StateMachine): # # State Machine: # - # Idle <---> Busy <---> Paused - # | ^ | ^ | ^ - # V | V | V | - # _ _ _ + # Pause + # ^ ^ + # / \ + # V v + # Idle <---> Busy + # | ^ | ^ + # V | V | + # _ _ # _BUSY = State("Busy") _IDLE = State("Idle", initial=True) _PAUSED = State("Paused") # - State Transitions - - _transition_to_idle = _BUSY.to(_IDLE, unless="_busy") # Busy -> Idle - _transition_to_paused = _BUSY.to(_PAUSED, cond="_pause") # Busy -> Pause + _transition_to_paused = _BUSY.to(_PAUSED, cond="_pause") | _IDLE.to( + _PAUSED, cond="_pause" + ) # Busy/Idle -> Pause + _transition_to_idle = _BUSY.to(_IDLE, unless=["_busy", "_pause"]) | _PAUSED.to( + _IDLE, unless=["_busy", "_pause"] + ) # Busy/Paused -> Idle _transition_to_busy = _IDLE.to(_BUSY, cond="_busy") | _PAUSED.to( _BUSY, unless="_pause" ) # Idle/Pause -> Busy # self transitions i.e. remain in same state _remain_busy = _BUSY.to.itself(cond="_busy", unless="_pause") - _remain_idle = _IDLE.to.itself(unless="_busy") - _remain_paused = _PAUSED.to.itself(cond="_pause") + _remain_idle = _IDLE.to.itself(unless=["_busy", "_pause"]) _cycle_state = ( _transition_to_idle @@ -74,7 +81,6 @@ class _Machine(StateMachine): | _transition_to_busy | _remain_busy | _remain_idle - | _remain_paused ) # internal @@ -157,30 +163,17 @@ def _handle_errors(self, *args, **kwargs): @_transition_to_paused.before def _enter_pause_mode(self): self.log.info("[pause] pause mode activated") - return @_PAUSED.enter def _process_paused(self): - # TODO what action should be taken to check if we leave the pause state? - # Perhaps a call to self.__strategize() - return - - @_IDLE.enter - def _process_idle(self): - """Return to idle mode if not already there (slow down)""" - if self._task.interval != self._IDLE_INTERVAL: - # TODO does changing the interval value actually update the LoopingCall? - self._task.interval = self._IDLE_INTERVAL - self.log.info( - f"[done] returning to idle mode with " - f"{self._task.interval} second interval" - ) + self._sleep() - # TODO - # 1. don't always sleep (idle for some number of cycles?) - # 2. careful sleeping - potential concurrency concerns - # 3. there is currently no difference between sleep/idle ... - self._sleep() + @_transition_to_idle.before + def _enter_idle_mode(self): + self._task.interval = self._IDLE_INTERVAL + self.log.info( + f"[idle] returning to idle mode with {self._task.interval} second interval" + ) @_transition_to_busy.before def _enter_busy_mode(self): From ea62ab650edbabaaeba240f6a71d1051d139b8a9 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 14:29:32 -0500 Subject: [PATCH 07/25] Wake should immediately run the looping call - either we are idle and a tx got queued, OR we are in pause and need to be woken up. --- atxm/machine.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index 03271fa..6f53ab5 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -184,7 +184,7 @@ def _enter_busy_mode(self): self._task.interval = max( round(average_block_time * self._BLOCK_INTERVAL), self._MIN_INTERVAL ) - self.log.info(f"[working] cycle interval is {self._task.interval} seconds") + self.log.info(f"[working] cycle interval is now {self._task.interval} seconds") @_BUSY.enter def _process_busy(self): @@ -227,9 +227,14 @@ def _stop(self): self._task.stop() def _wake(self) -> None: - if not self._task.running: - log.info("[wake] waking up") - self._start(now=True) + """Runs the looping call immediately.""" + log.info("[reset] running looping call now.") + if self._task.running: + # TODO instead of stopping/starting, can you set interval to 0 + # and call reset to have looping call immediately? + self._stop() + + self._start(now=True) def _sleep(self) -> None: if self._task.running: From c29c9d44dffffc5e0cf454d19a09bd7d2f82aa45 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 14:56:57 -0500 Subject: [PATCH 08/25] Improved testing of machine. --- tests/conftest.py | 4 ++- tests/test_machine.py | 83 +++++++++++++++++++++++++++---------------- 2 files changed, 55 insertions(+), 32 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index afea01e..27d7ae3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -64,7 +64,9 @@ def machine(w3): clock = Clock() _machine = AutomaticTxMachine(w3=w3) _machine._task.clock = clock - return _machine + yield _machine + + _machine.stop() @pytest.fixture diff --git a/tests/test_machine.py b/tests/test_machine.py index 3cd4d3c..c696385 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -1,8 +1,5 @@ import pytest - import pytest_twisted -from twisted.internet import reactor -from twisted.internet.task import deferLater from atxm.tx import FutureTx, PendingTx @@ -14,14 +11,15 @@ def rpc_spy(mocker, w3): @pytest_twisted.inlineCallbacks -def test_no_rpc_calls_when_idle(machine, state_observer, clock, rpc_spy): +def test_no_rpc_calls_when_idle(clock, machine, state_observer, rpc_spy): assert machine.current_state == machine._IDLE assert not machine.busy assert len(machine.queued) == 0 machine.start(now=True) - yield clock.advance(machine._task.interval * 3) - machine.stop() + for i in range(3): + yield clock.advance(1) + assert machine.current_state == machine._IDLE # Verify that no RPC calls were made assert rpc_spy.call_count == 0 @@ -32,11 +30,12 @@ def test_no_rpc_calls_when_idle(machine, state_observer, clock, rpc_spy): assert machine.current_state == machine._IDLE assert len(state_observer.transitions) == 0 # remained idle + machine.stop() + def test_queue( machine, state_observer, - clock, rpc_spy, account, eip1559_transaction, @@ -76,8 +75,16 @@ def test_queue( @pytest_twisted.inlineCallbacks def test_broadcast( - machine, state_observer, clock, eip1559_transaction, account, mocker + clock, + machine, + state_observer, + eip1559_transaction, + account, + mocker, + mock_wake_sleep, ): + wake, _ = mock_wake_sleep + assert machine.current_state == machine._IDLE assert not machine.busy @@ -90,14 +97,16 @@ def test_broadcast( info={"message": "something wonderful is happening..."}, ) + assert wake.call_count == 1 + # There is one queued transaction assert len(machine.queued) == 1 - # distort the time-space continuum machine.start(now=True) while machine.pending is None: yield clock.advance(1) - machine.stop() + + assert machine.current_state == machine._BUSY # The transaction is no longer queued assert len(machine.queued) == 0 @@ -110,20 +119,21 @@ def test_broadcast( assert atx.txhash # wait for the hook to be called - yield deferLater(reactor, 0.2, lambda: None) assert hook.call_count == 1 # tx only broadcasted and not finalized, so we are still busy assert machine.current_state == machine._BUSY + assert len(state_observer.transitions) == 1 assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) @pytest_twisted.inlineCallbacks def test_finalize( + chain, + clock, machine, state_observer, - clock, eip1559_transaction, account, mock_wake_sleep, @@ -142,16 +152,19 @@ def test_finalize( # There is one queued transaction assert len(machine.queued) == 1 - # advance time to broadcast the transaction machine.start(now=True) + + # advance to broadcast the transaction while machine.pending is None: yield clock.advance(1) - machine.stop() + + assert machine.current_state == machine._BUSY + assert machine.pending == atx - # advance time to finalize the transaction - machine.start(now=True) + # advance to finalize the transaction while machine.pending: + yield chain.mine(1) yield clock.advance(1) # The transaction is no longer pending @@ -166,8 +179,6 @@ def test_finalize( assert atx.final assert atx.receipt - # wait for the hook to be called - yield deferLater(reactor, 0.2, lambda: None) assert hook.call_count == 1 yield clock.advance(1) @@ -195,16 +206,20 @@ def test_follow( signer=account, ) + # advance to broadcast the transaction + while machine.pending is None: + yield clock.advance(1) + + assert machine.current_state == machine._BUSY + while not machine.finalized: yield clock.advance(1) assert atx.final is True while len(machine.finalized) > 0: - yield clock.advance(1) yield chain.mine(1) - - machine.stop() + yield clock.advance(1) assert len(machine.finalized) == 0 assert len(machine.queued) == 0 @@ -213,16 +228,14 @@ def test_follow( assert not machine.busy - # wait for the hook to be called - yield deferLater(reactor, 0.2, lambda: None) - assert sleep.call_count == 1 - assert machine.current_state == machine._IDLE assert len(state_observer.transitions) == 2 assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) assert state_observer.transitions[1] == (machine._BUSY, machine._IDLE) + machine.stop() + def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, account): assert machine.current_state == machine._IDLE @@ -232,6 +245,19 @@ def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, ac # no change in state assert machine.current_state == machine._IDLE + # idle -> pause + machine.pause() + machine._cycle() + assert machine.current_state == machine._PAUSED + assert machine._pause + + # resume after pausing + machine.resume() + machine._cycle() + assert machine.current_state == machine._IDLE + assert not machine._pause + assert not machine.busy + atx = machine.queue_transaction( params=eip1559_transaction, signer=account, @@ -242,17 +268,12 @@ def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, ac assert machine.current_state == machine._BUSY assert machine.busy - # pause + # busy -> pause machine.pause() machine._cycle() assert machine.current_state == machine._PAUSED assert machine._pause - for i in range(3): - machine._cycle() - # no change in state - assert machine.current_state == machine._PAUSED - # resume after pausing machine.resume() machine._cycle() From 03b49ddcfacd5eaff101b1f4481d96630976504a Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 15:05:16 -0500 Subject: [PATCH 09/25] Wait for hook to be called. --- tests/test_machine.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_machine.py b/tests/test_machine.py index c696385..19fc036 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -1,5 +1,8 @@ import pytest + import pytest_twisted +from twisted.internet import reactor +from twisted.internet.task import deferLater from atxm.tx import FutureTx, PendingTx @@ -119,6 +122,7 @@ def test_broadcast( assert atx.txhash # wait for the hook to be called + yield deferLater(reactor, 0.2, lambda: None) assert hook.call_count == 1 # tx only broadcasted and not finalized, so we are still busy @@ -127,6 +131,8 @@ def test_broadcast( assert len(state_observer.transitions) == 1 assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) + machine.stop() + @pytest_twisted.inlineCallbacks def test_finalize( @@ -179,6 +185,8 @@ def test_finalize( assert atx.final assert atx.receipt + # wait for the hook to be called + yield deferLater(reactor, 0.2, lambda: None) assert hook.call_count == 1 yield clock.advance(1) From 31267f582a0d26e8047f8dbfa09a2955e8c13a7d Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 15:45:32 -0500 Subject: [PATCH 10/25] Add test that calls pause while already paused, and resume when not paused; all is still fine. --- tests/test_machine.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/tests/test_machine.py b/tests/test_machine.py index 19fc036..01f3e0a 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -245,7 +245,7 @@ def test_follow( machine.stop() -def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, account): +def test_simple_state_transitions(chain, machine, eip1559_transaction, account): assert machine.current_state == machine._IDLE for i in range(3): @@ -257,13 +257,18 @@ def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, ac machine.pause() machine._cycle() assert machine.current_state == machine._PAUSED - assert machine._pause + assert machine.paused + + # calling pause has no effect if already paused + assert machine.paused + for i in range(3): + machine.pause() # resume after pausing machine.resume() machine._cycle() assert machine.current_state == machine._IDLE - assert not machine._pause + assert not machine.paused assert not machine.busy atx = machine.queue_transaction( @@ -280,13 +285,13 @@ def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, ac machine.pause() machine._cycle() assert machine.current_state == machine._PAUSED - assert machine._pause + assert machine.paused # resume after pausing machine.resume() machine._cycle() assert machine.current_state == machine._BUSY - assert not machine._pause + assert not machine.paused # finalize tx while machine.busy: @@ -298,3 +303,9 @@ def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, ac # transition to idle machine._cycle() assert machine.current_state == machine._IDLE + + # resume has no effect if not paused + assert not machine.paused + for i in range(3): + machine.resume() + assert machine.current_state == machine._IDLE From cb7f893c563a58373a5abf2f67964ea33283e6a8 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 16:32:11 -0500 Subject: [PATCH 11/25] Don't calll _wake if previous state was busy/pause after queueing a tx. Add tests for _wake. --- atxm/machine.py | 4 +- tests/conftest.py | 2 +- tests/test_api.py | 10 +++- tests/test_machine.py | 129 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 137 insertions(+), 8 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index 6f53ab5..1d2a8f3 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -427,7 +427,7 @@ def queue_transaction( Optionally provide a dictionary of additional string data to log during the transaction's lifecycle for identification. """ - previously_busy = self._busy + previously_busy_or_paused = self._busy or self._pause if signer.address not in self.signers: self.signers[signer.address] = signer @@ -435,7 +435,7 @@ def queue_transaction( tx = self._tx_tracker._queue( _from=signer.address, params=params, *args, **kwargs ) - if not previously_busy: + if not previously_busy_or_paused: self._wake() return tx diff --git a/tests/conftest.py b/tests/conftest.py index 27d7ae3..2b8c43d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -79,7 +79,7 @@ def interval(machine): return 1 -@pytest.fixture(autouse=True) +@pytest.fixture def mock_wake_sleep(machine, mocker): wake = mocker.patch.object(machine, "_wake") sleep = mocker.patch.object(machine, "_sleep") diff --git a/tests/test_api.py b/tests/test_api.py index 3d18f69..f0489ac 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -4,7 +4,15 @@ @pytest_twisted.inlineCallbacks -def test_machine(account, w3, legacy_transaction, eip1559_transaction, machine, clock): +def test_machine( + account, + w3, + legacy_transaction, + eip1559_transaction, + machine, + clock, + mock_wake_sleep, +): assert not machine.busy async_txs = machine.queue_transactions( params=[legacy_transaction, eip1559_transaction], diff --git a/tests/test_machine.py b/tests/test_machine.py index 01f3e0a..09e936c 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -44,7 +44,7 @@ def test_queue( eip1559_transaction, mock_wake_sleep, ): - wake, sleep = mock_wake_sleep + wake, _ = mock_wake_sleep # The machine is idle assert machine.current_state == machine._IDLE @@ -76,6 +76,127 @@ def test_queue( assert len(state_observer.transitions) == 0 # nothing actually executed +def test_wake_after_queuing_when_idle_and_not_already_running( + machine, + eip1559_transaction, + account, + mocker, +): + assert machine.current_state == machine._IDLE + assert not machine.busy + + stop_spy = mocker.spy(machine._task, "stop") + start_spy = mocker.spy(machine._task, "start") + + assert not machine.running + + # Queue a transaction + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + info={"message": "something wonderful is happening..."}, + ) + + assert stop_spy.call_count == 0, "no task to stop" + assert start_spy.call_count == 1, "task started" + + assert machine.running + assert machine.current_state == machine._BUSY + + machine.stop() + + +def test_wake_after_queuing_when_idle_and_already_running( + machine, + eip1559_transaction, + account, + mocker, +): + machine.start(now=True) + + stop_spy = mocker.spy(machine._task, "stop") + start_spy = mocker.spy(machine._task, "start") + + assert machine.current_state == machine._IDLE + assert not machine.busy + + assert machine.running + + # Queue a transaction + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + info={"message": "something wonderful is happening..."}, + ) + + assert stop_spy.call_count == 1, "task stopped" + assert start_spy.call_count == 1, "task started" + + assert machine.running + assert machine.current_state == machine._BUSY + + machine.stop() + + +def test_wake_no_call_after_queuing_when_already_busy( + machine, + eip1559_transaction, + account, + mock_wake_sleep, +): + wake, _ = mock_wake_sleep + + assert machine.current_state == machine._IDLE + + # Queue a transaction + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + info={"message": "something wonderful is happening..."}, + ) + + assert wake.call_count == 1 + + machine._cycle() + assert machine.current_state == machine._BUSY + + # Queue another transaction while busy + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + info={"message": "something wonderful is happening..."}, + ) + + assert wake.call_count == 1 # remains unchanged + + +def test_wake_no_call_after_queuing_when_already_paused( + machine, + eip1559_transaction, + account, + mock_wake_sleep, +): + wake, sleep = mock_wake_sleep + + assert machine.current_state == machine._IDLE + + machine.pause() + machine._cycle() + assert machine.paused + assert machine.current_state == machine._PAUSED + + assert sleep.call_count == 1 + + # Queue a transaction + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + info={"message": "something wonderful is happening..."}, + ) + + assert wake.call_count == 0 + + @pytest_twisted.inlineCallbacks def test_broadcast( clock, @@ -204,8 +325,6 @@ def test_finalize( def test_follow( chain, machine, state_observer, clock, eip1559_transaction, account, mock_wake_sleep ): - wake, sleep = mock_wake_sleep - machine.start() assert machine.current_state == machine._IDLE @@ -245,7 +364,9 @@ def test_follow( machine.stop() -def test_simple_state_transitions(chain, machine, eip1559_transaction, account): +def test_simple_state_transitions( + chain, machine, eip1559_transaction, account, mock_wake_sleep +): assert machine.current_state == machine._IDLE for i in range(3): From 96e1435146802d43e3a380614089a5561c4eae02 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 09:01:54 -0500 Subject: [PATCH 12/25] Force a move to paused state instead of waiting for next iteration. --- atxm/machine.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index 1d2a8f3..b4dcf8d 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -228,7 +228,7 @@ def _stop(self): def _wake(self) -> None: """Runs the looping call immediately.""" - log.info("[reset] running looping call now.") + log.info("[wake] running looping call now.") if self._task.running: # TODO instead of stopping/starting, can you set interval to 0 # and call reset to have looping call immediately? @@ -409,8 +409,10 @@ def pause(self) -> None: """ Pause the machine's tx processing loop; no txs are processed until unpaused (resume()). """ - self._pause = True - self.log.info("[pause] pause mode requested") + if not self._pause: + self._pause = True + self.log.info("[pause] pause mode requested") + self._cycle_state() # force a move to PAUSED state (don't wait for next iteration) def resume(self) -> None: """Unpauses (resumes) the machine's tx processing loop.""" From 554040529fdf00ed3c313fd6d9006d0884676f10 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 09:02:46 -0500 Subject: [PATCH 13/25] Improve testing around pausing/waking given that pause now forces a move to the PAUSED state instead of waiting until next iteration of looping call. --- tests/test_machine.py | 82 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 5 deletions(-) diff --git a/tests/test_machine.py b/tests/test_machine.py index 09e936c..76bf4f3 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -181,7 +181,6 @@ def test_wake_no_call_after_queuing_when_already_paused( assert machine.current_state == machine._IDLE machine.pause() - machine._cycle() assert machine.paused assert machine.current_state == machine._PAUSED @@ -364,6 +363,75 @@ def test_follow( machine.stop() +@pytest_twisted.inlineCallbacks +def test_pause_when_idle(clock, machine, mocker): + machine.start() + assert machine.current_state == machine._IDLE + assert machine.running + + stop_spy = mocker.spy(machine._task, "stop") + start_spy = mocker.spy(machine._task, "start") + + machine.pause() + yield clock.advance(1) + + assert machine.current_state == machine._PAUSED + + assert not machine.running + + assert stop_spy.call_count == 1, "task stopped since paused" + assert start_spy.call_count == 0 + + machine.resume() + + assert stop_spy.call_count == 1 + assert start_spy.call_count == 1, "machine restarted" + + assert machine.current_state == machine._IDLE + assert machine.running + machine.stop() + + +@pytest_twisted.inlineCallbacks +def test_pause_when_busy(clock, machine, eip1559_transaction, account, mocker): + machine.start() + assert machine.current_state == machine._IDLE + assert machine.running + + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + ) + + # advance to broadcast the transaction + while machine.pending is None: + yield clock.advance(1) + + assert machine.current_state == machine._BUSY + + stop_spy = mocker.spy(machine._task, "stop") + start_spy = mocker.spy(machine._task, "start") + + machine.pause() + yield clock.advance(1) + + assert machine.current_state == machine._PAUSED + + assert not machine.running + + assert stop_spy.call_count == 1, "task stopped since paused" + assert start_spy.call_count == 0 + + machine.resume() + + assert stop_spy.call_count == 1 + assert start_spy.call_count == 1, "machine restarted" + + assert machine.current_state == machine._BUSY + assert machine.running + machine.stop() + + def test_simple_state_transitions( chain, machine, eip1559_transaction, account, mock_wake_sleep ): @@ -376,7 +444,6 @@ def test_simple_state_transitions( # idle -> pause machine.pause() - machine._cycle() assert machine.current_state == machine._PAUSED assert machine.paused @@ -387,7 +454,7 @@ def test_simple_state_transitions( # resume after pausing machine.resume() - machine._cycle() + machine._cycle() # wake doesn't do anything because mocked assert machine.current_state == machine._IDLE assert not machine.paused assert not machine.busy @@ -404,13 +471,12 @@ def test_simple_state_transitions( # busy -> pause machine.pause() - machine._cycle() assert machine.current_state == machine._PAUSED assert machine.paused # resume after pausing machine.resume() - machine._cycle() + machine._cycle() # wake doesn't do anything because mocked assert machine.current_state == machine._BUSY assert not machine.paused @@ -426,7 +492,13 @@ def test_simple_state_transitions( assert machine.current_state == machine._IDLE # resume has no effect if not paused + wake, sleep = mock_wake_sleep + wake_call_count = wake.call_count + sleep_call_count = sleep.call_count + assert not machine.paused for i in range(3): machine.resume() assert machine.current_state == machine._IDLE + assert wake.call_count == wake_call_count, "wake call count remains unchanged" + assert sleep.call_count == sleep_call_count, "wake call count remains unchanged" From 7a98fe64e7cb2c840679860a0819598ea474e0d0 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 09:18:58 -0500 Subject: [PATCH 14/25] Don't unpause based on strategies - pauses are explicitly user-directed. --- atxm/machine.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index b4dcf8d..1c1aca9 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -350,9 +350,6 @@ def __strategize(self) -> Optional[PendingTx]: # keep the parameters as they are. _active_copy.params.update(params) - if self._pause: - self.resume() - # (!) retry the transaction with the new parameters retry_params = TxParams(_active_copy.params) _names = " -> ".join(s.name for s in self.strategies) From 4fc18457791a405faad7325bf0ac52bb61cfb41d Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 10:46:19 -0500 Subject: [PATCH 15/25] Rename FaultyTx to FaultedTx. --- atxm/tracker.py | 6 +++--- atxm/tx.py | 2 +- tests/test_faults.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/atxm/tracker.py b/atxm/tracker.py index 2d71e85..4948f58 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -17,7 +17,7 @@ PendingTx, TxHash, AsyncTx, - FaultyTx, + FaultedTx, ) from atxm.utils import fire_hook @@ -130,8 +130,8 @@ def fault( tx = self.__active tx.fault = fault tx.error = error - tx.__class__ = FaultyTx - tx: FaultyTx + tx.__class__ = FaultedTx + tx: FaultedTx self.faulty.append(tx) log.warn( f"[state] tracked faulty transaction #atx-{tx.id} " diff --git a/atxm/tx.py b/atxm/tx.py index a262dc4..7fd71a0 100644 --- a/atxm/tx.py +++ b/atxm/tx.py @@ -124,7 +124,7 @@ def txhash(self) -> TxHash: @dataclass -class FaultyTx(AsyncTx): +class FaultedTx(AsyncTx): final: bool = field(default=False, init=False) fault: Fault error: Optional[str] = None diff --git a/tests/test_faults.py b/tests/test_faults.py index be25008..102f56f 100644 --- a/tests/test_faults.py +++ b/tests/test_faults.py @@ -5,7 +5,7 @@ from web3.exceptions import TransactionNotFound from atxm.exceptions import Fault -from atxm.tx import FaultyTx +from atxm.tx import FaultedTx @pytest.fixture @@ -47,7 +47,7 @@ def test_timeout( assert atx.final is False assert isinstance(atx.fault, Fault) - assert isinstance(atx, FaultyTx) + assert isinstance(atx, FaultedTx) # check async tx advanced through the state machine assert atx not in machine.queued From 4108ada7584132b7a486fc29ea8f2394f97058c9 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 10:54:52 -0500 Subject: [PATCH 16/25] Remove tracking of faulted txs. --- atxm/main.py | 6 ------ atxm/tracker.py | 9 ++------- tests/test_faults.py | 1 - tests/test_machine.py | 1 - 4 files changed, 2 insertions(+), 15 deletions(-) diff --git a/atxm/main.py b/atxm/main.py index b6eccd5..ee8e244 100644 --- a/atxm/main.py +++ b/atxm/main.py @@ -9,7 +9,6 @@ FinalizedTx, FutureTx, PendingTx, - AsyncTx, ) @@ -52,11 +51,6 @@ def finalized(self) -> Set[FinalizedTx]: """Return a set of finalized transactions.""" return set(self._tx_tracker.finalized) - @property - def faults(self) -> List[AsyncTx]: - """Return a set of faulted transactions.""" - return list(self._tx_tracker.faulty) - def queue_transactions( self, params: List[TxParams], signer: LocalAccount, *args, **kwargs ) -> List[FutureTx]: diff --git a/atxm/tracker.py b/atxm/tracker.py index 4948f58..101846f 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -16,7 +16,6 @@ FutureTx, PendingTx, TxHash, - AsyncTx, FaultedTx, ) from atxm.utils import fire_hook @@ -27,7 +26,6 @@ class _TxTracker: _FILEPATH = ".txs.json" __COUNTER = 0 # id generator - __FAULT_CACHE_SIZE = 10 def __init__(self, disk_cache: bool, filepath: Optional[Path] = None): self.__filepath = filepath or self._FILEPATH @@ -35,7 +33,6 @@ def __init__(self, disk_cache: bool, filepath: Optional[Path] = None): self.__queue: Deque[FutureTx] = deque() self.__active: Optional[PendingTx] = None self.finalized: Set[FinalizedTx] = set() - self.faulty: Deque[AsyncTx] = deque(maxlen=self.__FAULT_CACHE_SIZE) self.disk_cache = disk_cache if disk_cache: @@ -132,10 +129,8 @@ def fault( tx.error = error tx.__class__ = FaultedTx tx: FaultedTx - self.faulty.append(tx) log.warn( - f"[state] tracked faulty transaction #atx-{tx.id} " - f"{len(self.faulty)}/{self.faulty.maxlen} faults cached" + f"[state] transaction #atx-{tx.id} faulted {fault}{f' ({error})' if error else ''}" ) if clear_active: self.clear_active() @@ -218,6 +213,6 @@ def _queue( self.commit() self.__COUNTER += 1 log.info( - f"[state] queued transaction #atx-{tx.id} " f"priority {len(self.__queue)}" + f"[state] queued transaction #atx-{tx.id} priority {len(self.__queue)}" ) return tx diff --git a/tests/test_faults.py b/tests/test_faults.py index 102f56f..91707d3 100644 --- a/tests/test_faults.py +++ b/tests/test_faults.py @@ -52,7 +52,6 @@ def test_timeout( # check async tx advanced through the state machine assert atx not in machine.queued assert machine.pending is None - assert atx in machine.faults assert atx.final is False yield deferLater(reactor, 0.2, lambda: None) diff --git a/tests/test_machine.py b/tests/test_machine.py index 76bf4f3..d7f369e 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -349,7 +349,6 @@ def test_follow( assert len(machine.finalized) == 0 assert len(machine.queued) == 0 - assert len(machine.faults) == 0 assert machine.pending is None assert not machine.busy From 9fd6729e33875315beab7baa4dafc1d626352fa5 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 10:56:43 -0500 Subject: [PATCH 17/25] Change logging annotation from [state] -> [tracker] for clarity. --- atxm/tracker.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/atxm/tracker.py b/atxm/tracker.py index 101846f..d5d50da 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -53,7 +53,7 @@ def commit(self) -> None: with open(self.__filepath, "w+t") as file: data = json.dumps(self.to_dict()) file.write(data) - log.debug(f"[state] wrote transaction cache file {self.__filepath}") + log.debug(f"[tracker] wrote transaction cache file {self.__filepath}") def restore(self) -> bool: """ @@ -82,7 +82,7 @@ def restore(self) -> bool: self.__queue.extend(FutureTx.from_dict(tx) for tx in queue) self.finalized = {FinalizedTx.from_dict(tx) for tx in final} log.debug( - f"[state] restored {len(queue)} transactions from cache file {self.__filepath}" + f"[tracker] restored {len(queue)} transactions from cache file {self.__filepath}" ) return bool(data) @@ -96,10 +96,10 @@ def __set_active(self, tx: PendingTx) -> None: self.commit() if old: log.debug( - f"[state] updated active transaction {old.hex()} -> {tx.txhash.hex()}" + f"[tracker] updated active transaction {old.hex()} -> {tx.txhash.hex()}" ) return - log.debug(f"[state] tracked active transaction {tx.txhash.hex()}") + log.debug(f"[tracker] tracked active transaction {tx.txhash.hex()}") def morph(self, tx: FutureTx, txhash: TxHash) -> PendingTx: """ @@ -130,7 +130,7 @@ def fault( tx.__class__ = FaultedTx tx: FaultedTx log.warn( - f"[state] transaction #atx-{tx.id} faulted {fault}{f' ({error})' if error else ''}" + f"[tracker] transaction #atx-{tx.id} faulted {fault}{f' ({error})' if error else ''}" ) if clear_active: self.clear_active() @@ -149,7 +149,7 @@ def finalize_active_tx(self, receipt: TxReceipt) -> None: self.__active.__class__ = FinalizedTx tx = self.__active self.finalized.add(tx) # noqa - log.info(f"[state] #atx-{tx.id} pending -> finalized") + log.info(f"[tracker] #atx-{tx.id} pending -> finalized") self.clear_active() if hook: fire_hook(hook=hook, tx=tx) @@ -159,8 +159,8 @@ def clear_active(self) -> None: self.__active = None self.commit() log.debug( - f"[state] cleared 1 pending transaction \n" - f"[state] {len(self.queue)} queued " + f"[tracker] cleared 1 pending transaction \n" + f"[tracker] {len(self.queue)} queued " f"transaction{'s' if len(self.queue) != 1 else ''} remaining" ) @@ -183,7 +183,7 @@ def _requeue(self, tx: FutureTx) -> None: self.__queue.append(tx) self.commit() log.info( - f"[state] re-queued transaction #atx-{tx.id} " + f"[tracker] re-queued transaction #atx-{tx.id} " f"priority {len(self.__queue)}" ) @@ -213,6 +213,6 @@ def _queue( self.commit() self.__COUNTER += 1 log.info( - f"[state] queued transaction #atx-{tx.id} priority {len(self.__queue)}" + f"[tracker] queued transaction #atx-{tx.id} priority {len(self.__queue)}" ) return tx From a73a66251cb1f26a0c42c254ead97a25dbfb0152 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 11:18:16 -0500 Subject: [PATCH 18/25] Always clear a faulted tx - don't give the option not to clear it. --- atxm/exceptions.py | 3 +-- atxm/machine.py | 2 -- atxm/strategies.py | 2 -- atxm/tracker.py | 4 +--- 4 files changed, 2 insertions(+), 9 deletions(-) diff --git a/atxm/exceptions.py b/atxm/exceptions.py index a1649a1..d29abd4 100644 --- a/atxm/exceptions.py +++ b/atxm/exceptions.py @@ -40,11 +40,10 @@ class Wait(Exception): class TransactionFault(Exception): """raised when a transaction has been faulted""" - def __init__(self, tx, fault: Fault, clear: bool, message: str): + def __init__(self, tx, fault: Fault, message: str): self.tx = tx self.fault = fault self.message = message - self.clear = clear super().__init__(message) diff --git a/atxm/machine.py b/atxm/machine.py index 1c1aca9..d6c7305 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -269,7 +269,6 @@ def __handle_active_transaction(self) -> bool: self._tx_tracker.fault( error=pending_tx.txhash.hex(), fault=Fault.REVERT, - clear_active=True, ) return True @@ -342,7 +341,6 @@ def __strategize(self) -> Optional[PendingTx]: self._tx_tracker.fault( error=self._tx_tracker.pending.txhash.hex(), fault=e.fault, - clear_active=e.clear, ) return if params: diff --git a/atxm/strategies.py b/atxm/strategies.py index 49f3ab1..269e81c 100644 --- a/atxm/strategies.py +++ b/atxm/strategies.py @@ -78,7 +78,6 @@ def execute(self, pending: PendingTx) -> TxParams: tx=pending, fault=Fault.INSUFFICIENT_FUNDS, message="Insufficient funds", - clear=False, ) # log.warn(f"Insufficient funds for transaction #{pending.params['nonce']}") # raise Wait("Insufficient funds") @@ -129,7 +128,6 @@ def execute(self, pending: PendingTx) -> TxParams: tx=pending, fault=Fault.TIMEOUT, message="Transaction has timed out", - clear=True, # signal to clear the active transaction ) return pending.params diff --git a/atxm/tracker.py b/atxm/tracker.py index d5d50da..aa06a42 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -117,7 +117,6 @@ def morph(self, tx: FutureTx, txhash: TxHash) -> PendingTx: def fault( self, fault: Fault, - clear_active: bool, error: Optional[str] = None, ) -> None: """Fault the active transaction.""" @@ -132,8 +131,7 @@ def fault( log.warn( f"[tracker] transaction #atx-{tx.id} faulted {fault}{f' ({error})' if error else ''}" ) - if clear_active: - self.clear_active() + self.clear_active() if hook: fire_hook(hook=hook, tx=tx) From 2fac537cfca87b5deaf4a8c075153a9d6281856e Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 11:20:09 -0500 Subject: [PATCH 19/25] Add typehints for hooks provided when queuing a transaction. --- atxm/tracker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/atxm/tracker.py b/atxm/tracker.py index aa06a42..044f75a 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -190,8 +190,8 @@ def _queue( params: TxParams, _from: ChecksumAddress, info: Dict[str, str] = None, - on_broadcast: Optional[Callable] = None, - on_finalized: Optional[Callable] = None, + on_broadcast: Optional[Callable[[PendingTx], None]] = None, + on_finalized: Optional[Callable[[FinalizedTx], None]] = None, on_fault: Optional[Callable] = None, ) -> FutureTx: """Queue a new transaction for broadcast and subsequent tracking.""" From 836cae28111018e9fcb1c60f328cc574c40a66cd Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 13:15:17 -0500 Subject: [PATCH 20/25] TransactionReverted is really just a subclass of TransactionFaulted; a specialized case of it. TransactionFault -> TransactionFaulted --- atxm/exceptions.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/atxm/exceptions.py b/atxm/exceptions.py index d29abd4..d09142f 100644 --- a/atxm/exceptions.py +++ b/atxm/exceptions.py @@ -1,6 +1,6 @@ from enum import Enum -from web3.types import RPCError +from web3.types import PendingTx, RPCError, TxReceipt class Fault(Enum): @@ -37,15 +37,19 @@ class Wait(Exception): """ -class TransactionFault(Exception): - """raised when a transaction has been faulted""" +class TransactionFaulted(Exception): + """Raised when a transaction has been faulted.""" - def __init__(self, tx, fault: Fault, message: str): + def __init__(self, tx: PendingTx, fault: Fault, message: str): self.tx = tx self.fault = fault self.message = message super().__init__(message) -class TransactionReverted(Exception): - """raised when a transaction has been reverted""" +class TransactionReverted(TransactionFaulted): + """Raised when a transaction has been reverted.""" + + def __init__(self, tx: PendingTx, receipt: TxReceipt, message: str): + self.receipt = receipt + super().__init__(tx=tx, fault=Fault.REVERT, message=message) From 299940a29eb4fabe4d91eddf85610a266e4cf584 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 13:18:35 -0500 Subject: [PATCH 21/25] fault() now takes a TransactionFaulted exception and constructs the FaultedTx from that; simpler than taking values from exception the passing it along etc. Also ensures that faulted tx is indeed the same as the active tx as an added bit of validation. --- atxm/machine.py | 31 ++++++++++--------------------- atxm/strategies.py | 8 ++++---- atxm/tracker.py | 22 +++++++++++++++------- atxm/utils.py | 4 +++- 4 files changed, 32 insertions(+), 33 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index d6c7305..6ef57d4 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -1,27 +1,22 @@ from copy import deepcopy -from typing import Optional, List, Type +from typing import List, Optional, Type from eth_account.signers.local import LocalAccount -from statemachine import StateMachine, State +from statemachine import State, StateMachine from twisted.internet import reactor from twisted.internet.defer import Deferred from twisted.internet.task import LoopingCall from web3 import Web3 from web3.types import TxParams -from atxm.exceptions import ( - Wait, - TransactionReverted, - TransactionFault, - Fault, -) -from atxm.tracker import _TxTracker +from atxm.exceptions import TransactionFaulted, TransactionReverted, Wait from atxm.strategies import ( AsyncTxStrategy, + FixedRateSpeedUp, InsufficientFundsPause, TimeoutPause, - FixedRateSpeedUp, ) +from atxm.tracker import _TxTracker from atxm.tx import ( FutureTx, PendingTx, @@ -32,8 +27,8 @@ _get_confirmations, _get_receipt, _handle_rpc_error, - fire_hook, _make_tx_params, + fire_hook, ) from .logging import log @@ -265,11 +260,8 @@ def __handle_active_transaction(self) -> bool: receipt = _get_receipt(w3=self.w3, pending_tx=pending_tx) # Outcome 2: the pending transaction was reverted (final error) - except TransactionReverted: - self._tx_tracker.fault( - error=pending_tx.txhash.hex(), - fault=Fault.REVERT, - ) + except TransactionReverted as e: + self._tx_tracker.fault(fault_error=e) return True # Outcome 3: pending transaction is finalized (final success) @@ -337,11 +329,8 @@ def __strategize(self) -> Optional[PendingTx]: except Wait as e: log.info(f"[wait] strategy {strategy.__class__} signalled wait: {e}") return - except TransactionFault as e: - self._tx_tracker.fault( - error=self._tx_tracker.pending.txhash.hex(), - fault=e.fault, - ) + except TransactionFaulted as e: + self._tx_tracker.fault(fault_error=e) return if params: # in case the strategy accidentally returns None diff --git a/atxm/strategies.py b/atxm/strategies.py index 269e81c..505e783 100644 --- a/atxm/strategies.py +++ b/atxm/strategies.py @@ -6,9 +6,9 @@ from web3.types import Gwei, TxParams, Wei, PendingTx from atxm.exceptions import ( - Wait, - TransactionFault, Fault, + Wait, + TransactionFaulted, ) from atxm.logging import log from atxm.utils import ( @@ -74,7 +74,7 @@ def execute(self, pending: PendingTx) -> TxParams: self.log.warn( f"Insufficient funds for transaction #{pending.params['nonce']}" ) - raise TransactionFault( + raise TransactionFaulted( tx=pending, fault=Fault.INSUFFICIENT_FUNDS, message="Insufficient funds", @@ -124,7 +124,7 @@ def __active_timed_out(self, pending: PendingTx) -> bool: def execute(self, pending: PendingTx) -> TxParams: timeout = self.__active_timed_out(pending) if timeout: - raise TransactionFault( + raise TransactionFaulted( tx=pending, fault=Fault.TIMEOUT, message="Transaction has timed out", diff --git a/atxm/tracker.py b/atxm/tracker.py index 044f75a..5eb758c 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -9,7 +9,7 @@ from eth_typing import ChecksumAddress from web3.types import TxParams, TxReceipt -from atxm.exceptions import Fault +from atxm.exceptions import TransactionFaulted from atxm.logging import log from atxm.tx import ( FinalizedTx, @@ -116,20 +116,28 @@ def morph(self, tx: FutureTx, txhash: TxHash) -> PendingTx: def fault( self, - fault: Fault, - error: Optional[str] = None, + fault_error: TransactionFaulted, ) -> None: """Fault the active transaction.""" - hook = self.__active.on_fault if not self.__active: raise RuntimeError("No active transaction to fault") + if fault_error.tx.id != self.__active.id: + raise RuntimeError( + f"Mismatch between active tx ({self.__active.id}) and faulted tx ({fault_error.tx.id})" + ) + + hook = self.__active.on_fault tx = self.__active - tx.fault = fault - tx.error = error + txhash = tx.txhash + + tx.fault = fault_error.fault + tx.error = fault_error.message tx.__class__ = FaultedTx tx: FaultedTx + log.warn( - f"[tracker] transaction #atx-{tx.id} faulted {fault}{f' ({error})' if error else ''}" + f"[tracker] transaction #atx-{tx.id} faulted {tx.fault.value}; " + f"{txhash} {f'{fault_error.message}' if fault_error.message else ''}" ) self.clear_active() if hook: diff --git a/atxm/utils.py b/atxm/utils.py index ef4c0d3..8576e36 100644 --- a/atxm/utils.py +++ b/atxm/utils.py @@ -73,7 +73,9 @@ def _get_receipt(w3: Web3, pending_tx: PendingTx) -> Optional[TxReceipt]: log.warn( f"[reverted] Transaction {txdata['hash'].hex()} was reverted by EVM with status {status}" ) - raise TransactionReverted(receipt) + raise TransactionReverted( + tx=pending_tx, receipt=receipt, message=f"Reverted with EVM status {status}" + ) log.info( f"[accepted] Transaction {txdata['nonce']}|{txdata['hash'].hex()} " From 25606cdb72a1a0398b1eba5463c734db68fbf51b Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 14:16:32 -0500 Subject: [PATCH 22/25] Add typehints for on_fault callback. --- atxm/tracker.py | 2 +- atxm/tx.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/atxm/tracker.py b/atxm/tracker.py index 5eb758c..d8f700d 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -200,7 +200,7 @@ def _queue( info: Dict[str, str] = None, on_broadcast: Optional[Callable[[PendingTx], None]] = None, on_finalized: Optional[Callable[[FinalizedTx], None]] = None, - on_fault: Optional[Callable] = None, + on_fault: Optional[Callable[[FaultedTx], None]] = None, ) -> FutureTx: """Queue a new transaction for broadcast and subsequent tracking.""" tx = FutureTx( diff --git a/atxm/tx.py b/atxm/tx.py index 7fd71a0..3320aa7 100644 --- a/atxm/tx.py +++ b/atxm/tx.py @@ -23,7 +23,7 @@ class AsyncTx(ABC): on_finalized: Optional[Callable[["FinalizedTx"], None]] = field( default=None, init=False ) - on_fault: Optional[Callable] = field(default=None, init=False) + on_fault: Optional[Callable[["FaultedTx"], None]] = field(default=None, init=False) def __repr__(self): return f"<{self.__class__.__name__} id={self.id} final={self.final}>" From 65c214897613e9e270d47a2bd5db06b69ddba3f8 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 15:45:28 -0500 Subject: [PATCH 23/25] Improve logging when fault occurs. --- atxm/tracker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/atxm/tracker.py b/atxm/tracker.py index d8f700d..f5632be 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -128,7 +128,7 @@ def fault( hook = self.__active.on_fault tx = self.__active - txhash = tx.txhash + txhash = tx.txhash.hex() tx.fault = fault_error.fault tx.error = fault_error.message @@ -136,8 +136,8 @@ def fault( tx: FaultedTx log.warn( - f"[tracker] transaction #atx-{tx.id} faulted {tx.fault.value}; " - f"{txhash} {f'{fault_error.message}' if fault_error.message else ''}" + f"[tracker] transaction #atx-{tx.id} faulted with '{tx.fault.value}'; " + f"{txhash}{f' ({fault_error.message})' if fault_error.message else ''}" ) self.clear_active() if hook: From 4bb93a947e8846181e46d0fd6480ab5dffe28ded Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 15:47:32 -0500 Subject: [PATCH 24/25] Make strategies a protected member for safety. --- atxm/machine.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index 6ef57d4..49a0dc1 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -115,9 +115,9 @@ def __init__( self.w3 = w3 self.signers = {} self.log = log - self.strategies = [s(w3) for s in self.STRATEGIES] + self._strategies = [s(w3) for s in self.STRATEGIES] if strategies: - self.strategies.extend(list(strategies)) + self._strategies.extend(list(strategies)) # state self._pause = False @@ -323,7 +323,7 @@ def __strategize(self) -> Optional[PendingTx]: raise RuntimeError("No active transaction to strategize") _active_copy = deepcopy(self._tx_tracker.pending) - for strategy in self.strategies: + for strategy in self._strategies: try: params = strategy.execute(pending=_active_copy) except Wait as e: @@ -339,7 +339,7 @@ def __strategize(self) -> Optional[PendingTx]: # (!) retry the transaction with the new parameters retry_params = TxParams(_active_copy.params) - _names = " -> ".join(s.name for s in self.strategies) + _names = " -> ".join(s.name for s in self._strategies) pending_tx = self.__fire(tx=retry_params, msg=_names) self.log.info(f"[retry] transaction #{pending_tx.id} has been re-broadcasted") From 14a66e2e93451ec69fc1ebfe75a4b0ab0fae9869 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 15:49:45 -0500 Subject: [PATCH 25/25] Add tests for faulted tx conditions - revert or faulted from strategy. --- atxm/utils.py | 6 +-- tests/test_faults.py | 115 ++++++++++++++++++++++++++++++------------- 2 files changed, 85 insertions(+), 36 deletions(-) diff --git a/atxm/utils.py b/atxm/utils.py index 8576e36..b444068 100644 --- a/atxm/utils.py +++ b/atxm/utils.py @@ -37,7 +37,7 @@ def _log_gas_weather(base_fee: Wei, tip: Wei) -> None: log.info(f"Gas conditions: base {base_fee_gwei} gwei | tip {tip_gwei} gwei") -def __get_receipt_from_txhash(w3: Web3, txhash: TxHash) -> Optional[TxReceipt]: +def _get_receipt_from_txhash(w3: Web3, txhash: TxHash) -> Optional[TxReceipt]: try: receipt = w3.eth.get_transaction_receipt(txhash) except TransactionNotFound: @@ -61,7 +61,7 @@ def _get_receipt(w3: Web3, pending_tx: PendingTx) -> Optional[TxReceipt]: log.error(f"[error] Transaction {pending_tx.txhash.hex()} not found") return - receipt = __get_receipt_from_txhash(w3=w3, txhash=txdata["hash"]) + receipt = _get_receipt_from_txhash(w3=w3, txhash=txdata["hash"]) if not receipt: return @@ -86,7 +86,7 @@ def _get_receipt(w3: Web3, pending_tx: PendingTx) -> Optional[TxReceipt]: def _get_confirmations(w3: Web3, tx: Union[PendingTx, FinalizedTx]) -> int: current_block = w3.eth.block_number - tx_receipt = __get_receipt_from_txhash(w3=w3, txhash=tx.txhash) + tx_receipt = _get_receipt_from_txhash(w3=w3, txhash=tx.txhash) if not tx_receipt: log.info(f"Transaction {tx.txhash.hex()} is pending or unconfirmed") return 0 diff --git a/tests/test_faults.py b/tests/test_faults.py index 91707d3..0ffd3ef 100644 --- a/tests/test_faults.py +++ b/tests/test_faults.py @@ -1,58 +1,107 @@ -import pytest -import pytest_twisted -from twisted.internet import reactor -from twisted.internet.task import deferLater from web3.exceptions import TransactionNotFound +from web3.types import TxReceipt -from atxm.exceptions import Fault +from atxm.exceptions import Fault, TransactionFaulted +from atxm.strategies import AsyncTxStrategy from atxm.tx import FaultedTx +from atxm.utils import _get_receipt_from_txhash -@pytest.fixture -def mock_eth_get_transaction(mocker, w3): - return mocker.patch.object( - w3.eth, - "get_transaction", - side_effect=TransactionNotFound - ) - +def _broadcast_tx(machine, eip1559_transaction, account, mocker): + fault_hook = mocker.Mock() -@pytest_twisted.inlineCallbacks -def test_timeout( - machine, clock, eip1559_transaction, account, interval, - mock_wake_sleep, mocker, mock_eth_get_transaction -): - hook = mocker.Mock() atx = machine.queue_transaction( params=eip1559_transaction, signer=account, - on_fault=hook, + on_fault=fault_hook, ) - machine.start() - while not machine.pending: - yield clock.advance(interval) - machine.stop() - assert not machine.running + # broadcast tx + machine._cycle() assert machine.pending == atx assert atx.final is False assert atx.fault is None - atx.created -= 9999999999 - machine.start() - while machine.pending: - yield clock.advance(interval) - machine.stop() + return atx, fault_hook + + +def _verify_tx_faulted(machine, atx, fault_hook, expected_fault: Fault): + while fault_hook.call_count == 0: + # ensure tx processed + machine._cycle() assert atx.final is False - assert isinstance(atx.fault, Fault) assert isinstance(atx, FaultedTx) + assert isinstance(atx.fault, Fault) + assert atx.fault == expected_fault # check async tx advanced through the state machine assert atx not in machine.queued + assert machine.pending is None assert atx.final is False - yield deferLater(reactor, 0.2, lambda: None) - assert hook.call_count == 1 + assert fault_hook.call_count == 1 + fault_hook.assert_called_with(atx) + + +def test_revert( + chain, + w3, + machine, + clock, + eip1559_transaction, + account, + interval, + mock_wake_sleep, + mocker, +): + atx, fault_hook = _broadcast_tx(machine, eip1559_transaction, account, mocker) + + assert machine.pending + + chain.mine(1) + + # force receipt to symbolize a revert of the tx + receipt = _get_receipt_from_txhash(w3, atx.txhash) + revert_receipt = dict(receipt) + revert_receipt["status"] = 0 + + mocker.patch.object( + w3.eth, "get_transaction_receipt", return_value=TxReceipt(revert_receipt) + ) + + _verify_tx_faulted(machine, atx, fault_hook, expected_fault=Fault.REVERT) + + +def test_strategy_fault( + w3, machine, clock, eip1559_transaction, account, interval, mock_wake_sleep, mocker +): + faulty_strategy = mocker.Mock(spec=AsyncTxStrategy) + machine._strategies.insert(0, faulty_strategy) # add first + + atx, fault_hook = _broadcast_tx(machine, eip1559_transaction, account, mocker) + + faulty_message = "mocked fault" + faulty_strategy.execute.side_effect = TransactionFaulted( + tx=atx, fault=Fault.ERROR, message=faulty_message + ) + + mocker.patch.object( + w3.eth, "get_transaction_receipt", side_effect=TransactionNotFound + ) + + _verify_tx_faulted(machine, atx, fault_hook, expected_fault=Fault.ERROR) + assert atx.error == faulty_message + + +def test_timeout_strategy_fault( + w3, machine, clock, eip1559_transaction, account, interval, mock_wake_sleep, mocker +): + atx, fault_hook = _broadcast_tx(machine, eip1559_transaction, account, mocker) + + atx.created -= 9999999999 + mocker.patch.object(w3.eth, "get_transaction", side_effect=TransactionNotFound) + + _verify_tx_faulted(machine, atx, fault_hook, expected_fault=Fault.TIMEOUT)