Skip to content

Commit

Permalink
Merge pull request #4 from KPrasch/dev
Browse files Browse the repository at this point in the history
Post-extrication code quality improvements
  • Loading branch information
KPrasch authored Feb 7, 2024
2 parents 7655e4a + f5887e6 commit 64fc21f
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 176 deletions.
47 changes: 0 additions & 47 deletions atxm/_task.py

This file was deleted.

209 changes: 114 additions & 95 deletions atxm/machine.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import time
from copy import deepcopy
from logging import Logger
from typing import Optional, Union, List, Type

from eth_account.signers.local import LocalAccount
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall
from web3 import Web3
from web3.exceptions import TransactionNotFound
from web3.types import TxReceipt, TxParams
Expand All @@ -26,37 +28,36 @@
FutureTx,
PendingTx,
TxHash,
_make_tx_params,
)
from atxm.utils import (
_get_average_blocktime,
_get_receipt,
_handle_rpc_error,
fire_hook,
_make_tx_params,
)
from ._task import SimpleTask
from .logging import log


class _Machine(SimpleTask):
"""Do not import this - use the public `AutomaticTxMachine` instead."""
class _Machine:
"""
Do not import this - it will not work.
Please import the publicly exposed `AutomaticTxMachine` instead.
Thanks! :)
"""

# internal
__CLOCK = reactor # twisted reactor

# tweaks
_TRACKING_CONFIRMATIONS = 300 # blocks until clearing a finalized transaction
_RPC_THROTTLE = 1 # min. seconds between RPC calls (>1 recommended)
_MIN_INTERVAL = (
1 # absolute min. async loop interval (edge case of low block count)
)

# idle (slower)
INTERVAL = 60 * 5 # seconds
IDLE_INTERVAL = INTERVAL # renames above constant
_MIN_INTERVAL = 1
_IDLE_INTERVAL = 60 * 5 # seconds
_BLOCK_INTERVAL = 20 # ~20 blocks
_BLOCK_SAMPLE_SIZE = 10_000 # blocks

# work (faster)
BLOCK_INTERVAL = 20 # ~20 blocks
BLOCK_SAMPLE_SIZE = 10_000 # blocks

__DEFAULT_STRATEGIES: List[Type[AsyncTxStrategy]] = [
STRATEGIES: List[Type[AsyncTxStrategy]] = [
InsufficientFundsPause,
TimeoutPause,
FixedRateSpeedUp,
Expand All @@ -71,48 +72,134 @@ def __init__(
# public
self.w3 = w3
self.signers = {}
self.strategies = [s(w3) for s in self.__DEFAULT_STRATEGIES]
self.log = Logger(self.__class__.__name__)
self.strategies = [s(w3) for s in self.STRATEGIES]
if strategies:
self.strategies.extend(list(strategies))

# internal
self._state = _State(disk_cache=disk_cache)
# state
self.__pause = False
super().__init__(interval=self.INTERVAL)
self._state = _State(disk_cache=disk_cache)

# async
self._task = LoopingCall(self._cycle)
self._task.clock = self.__CLOCK
self._task.interval = self._IDLE_INTERVAL

@property
def _clock(self):
return self.__CLOCK

@property
def _interval(self) -> Optional[float]:
return self._task.interval

@property
def _start_time(self) -> float:
return self._task.starttime

@property
def _busy(self) -> bool:
"""Returns True if the machine is busy."""
return bool(self._state.queue or self._state.pending)

#
# Async
#

def _handle_errors(self, *args, **kwargs):
"""Handles unexpected errors during task processing."""
self._state.commit()
self.log.warn(
"[recovery] error during transaction: {}".format(args[0].getTraceback())
)
if self._task.running:
return
self.log.warn("[recovery] restarting transaction machine!")
self._start(now=False)

def _cycle(self) -> None:
"""Execute one cycle"""

if self.__pause:
self.log.warn("[pause] paused")
return

self.__monitor_finalized()
if not self._busy:
self.log.info(f"[idle] cycle interval is {self._task.interval} seconds")
return

# sample block time and adjust the interval
# every work cycle.
self.__work_mode()
self.log.info(
f"[working] tracking {len(self._state.queue)} queued "
f"transaction{'s' if len(self._state.queue) > 1 else ''} "
f"{'and 1 pending transaction' if self._state.pending else ''}"
)

if self._state.pending:
# There is an active transaction
self.__handle_active_transaction()

elif self._state.queue:
# There is no active transaction
# and there are queued transactions
self.__broadcast()

# After one work cycle, return to idle mode
# if the machine is not busy.
if not self._busy:
self.__idle_mode()

#
# Throttle
#

def _start(self, now: bool = False) -> Deferred:
if self._task.running:
return self._task.deferred
when = "now" if now else f"in {self._task.interval} seconds"
self.log.info(f"[atxm] starting async transaction machine {when}")
d = self._task.start(interval=self._interval, now=now)
d.addErrback(self._handle_errors)
return d

def _stop(self):
"""Stop task."""
if self._task.running:
self._task.stop()

def __idle_mode(self) -> None:
"""Return to idle mode (slow down)"""
self._task.interval = self.IDLE_INTERVAL
self._task.interval = self._IDLE_INTERVAL
self.log.info(
f"[done] returning to idle mode with "
f"{self._task.interval} second interval"
)
if not self.busy:
if not self._busy:
self._sleep()

def __work_mode(self) -> None:
"""Start work mode (speed up)"""
average_block_time = _get_average_blocktime(
w3=self.w3, sample_size=self.BLOCK_SAMPLE_SIZE
w3=self.w3, sample_size=self._BLOCK_SAMPLE_SIZE
)
self._task.interval = max(
round(average_block_time * self.BLOCK_INTERVAL), self._MIN_INTERVAL
round(average_block_time * self._BLOCK_INTERVAL), self._MIN_INTERVAL
)
self.log.info(f"[working] cycle interval is {self._task.interval} seconds")

def _wake(self) -> None:
if not self._task.running:
log.info("[wake] waking up")
self.start(now=True)
self._start(now=True)

def _sleep(self) -> None:
if self._task.running:
log.info("[sleep] sleeping")
self.stop()
self._stop()

#
# Lifecycle
Expand Down Expand Up @@ -344,71 +431,3 @@ def __monitor_finalized(self) -> None:
self.log.info(
f"[monitor] transaction {txhash.hex()} has {confirmations} confirmations"
)

#
# Async
#

def handle_errors(self, *args, **kwargs) -> None:
"""Handles unexpected errors during task processing."""
self.log.warn(
"[recovery] error during transaction: {}".format(args[0].getTraceback())
)
self._state.commit()
time.sleep(self._RPC_THROTTLE)
if not self._task.running:
self.log.warn("[recovery] restarting transaction machine!")
self.start(now=False) # take a breather

def run(self) -> None:
"""Execute one cycle"""

if self.__pause:
self.log.warn("[pause] paused")
return

self.__monitor_finalized()
if not self.busy:
self.log.info(f"[idle] cycle interval is {self._task.interval} seconds")
return

# sample block time and adjust the interval
# every work cycle.
self.__work_mode()
self.log.info(
f"[working] tracking {len(self._state.queue)} queued "
f"transaction{'s' if len(self._state.queue) > 1 else ''} "
f"{'and 1 pending transaction' if self._state.pending else ''}"
)

if self._state.pending:
# There is an active transaction
self.__handle_active_transaction()

elif self._state.queue:
# There is no active transaction
# and there are queued transactions
self.__broadcast()

# After one work cycle, return to idle mode
# if the machine is not busy.
if not self.busy:
self.__idle_mode()

@property
def busy(self) -> bool:
"""Returns True if the machine is busy."""
if self._state.pending:
return True
if len(self._state.queue) > 0:
return True
return False

def start(self, now: bool = True) -> Deferred:
if now:
self.log.info("[atxm] starting async transaction machine now")
else:
self.log.info(
f"[atxm] starting async transaction machine in {self.INTERVAL} seconds"
)
return super().start(now=now)
28 changes: 26 additions & 2 deletions atxm/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import copy
from typing import List, Set

from eth_account.signers.local import LocalAccount
Expand All @@ -13,6 +14,29 @@


class AutomaticTxMachine(_Machine):
def start(self, now: bool = False) -> None:
"""Start the machine. if now is True, start immediately."""
super()._start(now=now)

def stop(self) -> None:
"""Stop the machine."""
super()._stop()

@property
def running(self) -> bool:
"""Return True if the machine is running."""
return bool(self._task.running)

@property
def paused(self) -> bool:
"""Return True if the machine is paused."""
return bool(self.__pause)

@property
def busy(self) -> bool:
"""Returns True if the machine is busy."""
return super()._busy

@property
def queued(self) -> List[FutureTx]:
"""Return a list of queued transactions."""
Expand All @@ -21,12 +45,12 @@ def queued(self) -> List[FutureTx]:
@property
def pending(self) -> PendingTx:
"""Return the active transaction if there is one."""
return self._state.pending or None
return copy(self._state.pending or None)

@property
def finalized(self) -> Set[FinalizedTx]:
"""Return a set of finalized transactions."""
return self._state.finalized
return set(self._state.finalized)

@property
def faults(self) -> List[AsyncTx]:
Expand Down
Loading

0 comments on commit 64fc21f

Please sign in to comment.