Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Post-extrication code quality improvements #4

Merged
merged 2 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 0 additions & 47 deletions atxm/_task.py

This file was deleted.

209 changes: 114 additions & 95 deletions atxm/machine.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import time
from copy import deepcopy
from logging import Logger
from typing import Optional, Union, List, Type

from eth_account.signers.local import LocalAccount
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall
from web3 import Web3
from web3.exceptions import TransactionNotFound
from web3.types import TxReceipt, TxParams
Expand All @@ -26,37 +28,36 @@
FutureTx,
PendingTx,
TxHash,
_make_tx_params,
)
from atxm.utils import (
_get_average_blocktime,
_get_receipt,
_handle_rpc_error,
fire_hook,
_make_tx_params,
)
from ._task import SimpleTask
from .logging import log


class _Machine(SimpleTask):
"""Do not import this - use the public `AutomaticTxMachine` instead."""
class _Machine:
"""
Do not import this - it will not work.
Please import the publicly exposed `AutomaticTxMachine` instead.
Thanks! :)
"""

# internal
__CLOCK = reactor # twisted reactor

# tweaks
_TRACKING_CONFIRMATIONS = 300 # blocks until clearing a finalized transaction
_RPC_THROTTLE = 1 # min. seconds between RPC calls (>1 recommended)
_MIN_INTERVAL = (
1 # absolute min. async loop interval (edge case of low block count)
)

# idle (slower)
INTERVAL = 60 * 5 # seconds
IDLE_INTERVAL = INTERVAL # renames above constant
_MIN_INTERVAL = 1
_IDLE_INTERVAL = 60 * 5 # seconds
_BLOCK_INTERVAL = 20 # ~20 blocks
_BLOCK_SAMPLE_SIZE = 10_000 # blocks

# work (faster)
BLOCK_INTERVAL = 20 # ~20 blocks
BLOCK_SAMPLE_SIZE = 10_000 # blocks

__DEFAULT_STRATEGIES: List[Type[AsyncTxStrategy]] = [
STRATEGIES: List[Type[AsyncTxStrategy]] = [
InsufficientFundsPause,
TimeoutPause,
FixedRateSpeedUp,
Expand All @@ -71,48 +72,134 @@ def __init__(
# public
self.w3 = w3
self.signers = {}
self.strategies = [s(w3) for s in self.__DEFAULT_STRATEGIES]
self.log = Logger(self.__class__.__name__)
self.strategies = [s(w3) for s in self.STRATEGIES]
if strategies:
self.strategies.extend(list(strategies))

# internal
self._state = _State(disk_cache=disk_cache)
# state
self.__pause = False
super().__init__(interval=self.INTERVAL)
self._state = _State(disk_cache=disk_cache)

# async
self._task = LoopingCall(self._cycle)
self._task.clock = self.__CLOCK
self._task.interval = self._IDLE_INTERVAL

@property
def _clock(self):
return self.__CLOCK

@property
def _interval(self) -> Optional[float]:
return self._task.interval

@property
def _start_time(self) -> float:
return self._task.starttime

@property
def _busy(self) -> bool:
"""Returns True if the machine is busy."""
return bool(self._state.queue or self._state.pending)

#
# Async
#

def _handle_errors(self, *args, **kwargs):
"""Handles unexpected errors during task processing."""
self._state.commit()
self.log.warn(
"[recovery] error during transaction: {}".format(args[0].getTraceback())
)
if self._task.running:
return
self.log.warn("[recovery] restarting transaction machine!")
self._start(now=False)

def _cycle(self) -> None:
"""Execute one cycle"""

if self.__pause:
self.log.warn("[pause] paused")
return

self.__monitor_finalized()
if not self._busy:
self.log.info(f"[idle] cycle interval is {self._task.interval} seconds")
return

# sample block time and adjust the interval
# every work cycle.
self.__work_mode()
self.log.info(
f"[working] tracking {len(self._state.queue)} queued "
f"transaction{'s' if len(self._state.queue) > 1 else ''} "
f"{'and 1 pending transaction' if self._state.pending else ''}"
)

if self._state.pending:
# There is an active transaction
self.__handle_active_transaction()

elif self._state.queue:
# There is no active transaction
# and there are queued transactions
self.__broadcast()

# After one work cycle, return to idle mode
# if the machine is not busy.
if not self._busy:
self.__idle_mode()

#
# Throttle
#

def _start(self, now: bool = False) -> Deferred:
if self._task.running:
return self._task.deferred
when = "now" if now else f"in {self._task.interval} seconds"
self.log.info(f"[atxm] starting async transaction machine {when}")
d = self._task.start(interval=self._interval, now=now)
d.addErrback(self._handle_errors)
return d

def _stop(self):
"""Stop task."""
if self._task.running:
self._task.stop()

def __idle_mode(self) -> None:
"""Return to idle mode (slow down)"""
self._task.interval = self.IDLE_INTERVAL
self._task.interval = self._IDLE_INTERVAL
self.log.info(
f"[done] returning to idle mode with "
f"{self._task.interval} second interval"
)
if not self.busy:
if not self._busy:
self._sleep()

def __work_mode(self) -> None:
"""Start work mode (speed up)"""
average_block_time = _get_average_blocktime(
w3=self.w3, sample_size=self.BLOCK_SAMPLE_SIZE
w3=self.w3, sample_size=self._BLOCK_SAMPLE_SIZE
)
self._task.interval = max(
round(average_block_time * self.BLOCK_INTERVAL), self._MIN_INTERVAL
round(average_block_time * self._BLOCK_INTERVAL), self._MIN_INTERVAL
)
self.log.info(f"[working] cycle interval is {self._task.interval} seconds")

def _wake(self) -> None:
if not self._task.running:
log.info("[wake] waking up")
self.start(now=True)
self._start(now=True)

def _sleep(self) -> None:
if self._task.running:
log.info("[sleep] sleeping")
self.stop()
self._stop()

#
# Lifecycle
Expand Down Expand Up @@ -344,71 +431,3 @@ def __monitor_finalized(self) -> None:
self.log.info(
f"[monitor] transaction {txhash.hex()} has {confirmations} confirmations"
)

#
# Async
#

def handle_errors(self, *args, **kwargs) -> None:
"""Handles unexpected errors during task processing."""
self.log.warn(
"[recovery] error during transaction: {}".format(args[0].getTraceback())
)
self._state.commit()
time.sleep(self._RPC_THROTTLE)
if not self._task.running:
self.log.warn("[recovery] restarting transaction machine!")
self.start(now=False) # take a breather

def run(self) -> None:
"""Execute one cycle"""

if self.__pause:
self.log.warn("[pause] paused")
return

self.__monitor_finalized()
if not self.busy:
self.log.info(f"[idle] cycle interval is {self._task.interval} seconds")
return

# sample block time and adjust the interval
# every work cycle.
self.__work_mode()
self.log.info(
f"[working] tracking {len(self._state.queue)} queued "
f"transaction{'s' if len(self._state.queue) > 1 else ''} "
f"{'and 1 pending transaction' if self._state.pending else ''}"
)

if self._state.pending:
# There is an active transaction
self.__handle_active_transaction()

elif self._state.queue:
# There is no active transaction
# and there are queued transactions
self.__broadcast()

# After one work cycle, return to idle mode
# if the machine is not busy.
if not self.busy:
self.__idle_mode()

@property
def busy(self) -> bool:
"""Returns True if the machine is busy."""
if self._state.pending:
return True
if len(self._state.queue) > 0:
return True
return False

def start(self, now: bool = True) -> Deferred:
if now:
self.log.info("[atxm] starting async transaction machine now")
else:
self.log.info(
f"[atxm] starting async transaction machine in {self.INTERVAL} seconds"
)
return super().start(now=now)
28 changes: 26 additions & 2 deletions atxm/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import copy
from typing import List, Set

from eth_account.signers.local import LocalAccount
Expand All @@ -13,6 +14,29 @@


class AutomaticTxMachine(_Machine):
def start(self, now: bool = False) -> None:
"""Start the machine. if now is True, start immediately."""
super()._start(now=now)

def stop(self) -> None:
"""Stop the machine."""
super()._stop()

@property
def running(self) -> bool:
"""Return True if the machine is running."""
return bool(self._task.running)

@property
def paused(self) -> bool:
"""Return True if the machine is paused."""
return bool(self.__pause)

@property
def busy(self) -> bool:
"""Returns True if the machine is busy."""
return super()._busy

@property
def queued(self) -> List[FutureTx]:
"""Return a list of queued transactions."""
Expand All @@ -21,12 +45,12 @@ def queued(self) -> List[FutureTx]:
@property
def pending(self) -> PendingTx:
"""Return the active transaction if there is one."""
return self._state.pending or None
return copy(self._state.pending or None)

@property
def finalized(self) -> Set[FinalizedTx]:
"""Return a set of finalized transactions."""
return self._state.finalized
return set(self._state.finalized)

@property
def faults(self) -> List[AsyncTx]:
Expand Down
Loading
Loading