Skip to content

Commit

Permalink
Remove the use of on_pause hook - pauses are user-controlled so no us…
Browse files Browse the repository at this point in the history
…e in providing a callback to alert the user about pausing.

Move queue_transaction to _Machine instead - more applicable there since internals of _Machine are used.
Move exposed functions to their own section in _Machine.
  • Loading branch information
derekpierre committed Feb 28, 2024
1 parent b71b110 commit da297a3
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 34 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ Hooks are fired in a dedicated thread for lifecycle events.

- `on_broadcast`: When a transaction is broadcasted.
- `on_finalized`: When a transaction is finalized.
- `on_pause`: When a transaction is halted by the strategy.
- `on_fault`: When a transaction reverted or another error occurred.


Expand Down
51 changes: 38 additions & 13 deletions atxm/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,19 +328,6 @@ def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]:
fire_hook(hook=tx.on_broadcast, tx=pending_tx)
return pending_tx

def pause(self) -> None:
self._pause = True
self.log.warn(
f"[pause] pending transaction {self._tx_tracker.pending.txhash.hex()} has been paused."
)
hook = self._tx_tracker.pending.on_pause
if hook:
fire_hook(hook=hook, tx=self._tx_tracker.pending)

def resume(self) -> None:
self.log.info("[pause] pause lifted by strategy")
self._pause = False # resume

def __strategize(self) -> Optional[PendingTx]:
"""Retry the currently tracked pending transaction with the configured strategy."""
if not self._tx_tracker.pending:
Expand Down Expand Up @@ -417,3 +404,41 @@ def __monitor_finalized(self) -> None:
self.log.info(
f"[monitor] transaction {tx.txhash.hex()} has {confirmations} confirmations"
)

#
# Exposed functions
#
def pause(self) -> None:
"""
Pause the machine's tx processing loop; no txs are processed until unpaused (resume()).
"""
self._pause = True
self.log.info("[pause] pause mode requested")

def resume(self) -> None:
"""Unpauses (resumes) the machine's tx processing loop."""
if self._pause:
self._pause = False
self.log.info("[pause] pause mode deactivated")
self._wake()

def queue_transaction(
self, params: TxParams, signer: LocalAccount, *args, **kwargs
) -> FutureTx:
"""
Queue a new transaction for broadcast and subsequent tracking.
Optionally provide a dictionary of additional string data
to log during the transaction's lifecycle for identification.
"""
previously_busy = self._busy

if signer.address not in self.signers:
self.signers[signer.address] = signer

tx = self._tx_tracker._queue(
_from=signer.address, params=params, *args, **kwargs
)
if not previously_busy:
self._wake()

return tx
17 changes: 0 additions & 17 deletions atxm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,6 @@ def faults(self) -> List[AsyncTx]:
"""Return a set of faulted transactions."""
return list(self._tx_tracker.faulty)

def queue_transaction(
self, params: TxParams, signer: LocalAccount, *args, **kwargs
) -> FutureTx:
"""
Queue a new transaction for broadcast and subsequent tracking.
Optionally provide a dictionary of additional string data
to log during the transaction's lifecycle for identification.
"""
if signer.address not in self.signers:
self.signers[signer.address] = signer
tx = self._tx_tracker._queue(
_from=signer.address, params=params, *args, **kwargs
)
if not self._task.running:
self._wake()
return tx

def queue_transactions(
self, params: List[TxParams], signer: LocalAccount, *args, **kwargs
) -> List[FutureTx]:
Expand Down
2 changes: 0 additions & 2 deletions atxm/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ def _queue(
info: Dict[str, str] = None,
on_broadcast: Optional[Callable] = None,
on_finalized: Optional[Callable] = None,
on_pause: Optional[Callable] = None,
on_fault: Optional[Callable] = None,
) -> FutureTx:
"""Queue a new transaction for broadcast and subsequent tracking."""
Expand All @@ -213,7 +212,6 @@ def _queue(
# configure hooks
tx.on_broadcast = on_broadcast
tx.on_finalized = on_finalized
tx.on_pause = on_pause
tx.on_fault = on_fault

self.__queue.append(tx)
Expand Down
1 change: 0 additions & 1 deletion atxm/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class AsyncTx(ABC):
on_finalized: Optional[Callable[["FinalizedTx"], None]] = field(
default=None, init=False
)
on_pause: Optional[Callable[[PendingTx], None]] = field(default=None, init=False)
on_fault: Optional[Callable] = field(default=None, init=False)

def __repr__(self):
Expand Down

0 comments on commit da297a3

Please sign in to comment.