Skip to content

Commit

Permalink
drop more events
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout committed Nov 29, 2023
1 parent c7dd6d7 commit 8be53d2
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions src/dipdup/datasources/evm_node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import time
from collections import defaultdict
from collections.abc import Awaitable
from collections.abc import Callable
from dataclasses import dataclass
Expand Down Expand Up @@ -63,7 +62,7 @@ def __init__(self, config: EvmNodeDatasourceConfig, merge_subscriptions: bool =
self._realtime: asyncio.Event = asyncio.Event()
self._requests: dict[str, tuple[asyncio.Event, Any]] = {}
self._subscription_ids: dict[str, EvmNodeSubscription] = {}
self._heads: defaultdict[int, NodeHead] = defaultdict(NodeHead)
self._heads: dict[int, EvmNodeHeadData] = {}

self._on_connected_callbacks: set[EmptyCallback] = set()
self._on_disconnected_callbacks: set[EmptyCallback] = set()
Expand Down Expand Up @@ -222,9 +221,15 @@ async def _jsonrpc_request(

message = WebsocketMessage(request)
client = self._get_ws_client()
await client.send(message)

await event.wait()
async def _request() -> None:
await client.send(message)
await event.wait()

await asyncio.wait_for(
_request(),
timeout=self._http_config.request_timeout,
)
data = self._requests[request_id][1]
del self._requests[request_id]

Expand Down Expand Up @@ -279,25 +284,22 @@ async def _handle_subscription(self, subscription: EvmNodeSubscription, data: An
head = EvmNodeHeadData.from_json(data)
level = head.number

known_hash = self._heads[level].hash
if known_hash not in (head.hash, None):
await self.emit_rollback(
MessageType(),
from_level=max(self._heads.keys() or (level,)),
to_level=level - 1,
)
if level in self._heads:
known_hash = self._heads[level].hash
if known_hash != head.hash:
await self.emit_rollback(
MessageType(),
from_level=max(self._heads.keys() or (level,)),
to_level=level - 1,
)

self._heads[level].hash = head.hash
self._heads[level].timestamp = head.timestamp
self._heads[level] = head
await self.emit_head(head)
self._heads[level].event.set()

elif isinstance(subscription, EvmNodeLogsSubscription):
level = int(data['blockNumber'], 16)
await self._heads[level].event.wait()
# NOTE: We expect head to arrive before other data. Is it true for all kinds of nodes/subscriptions?
timestamp = self._heads[level].timestamp
if timestamp is None:
raise FrameworkException('Head cached but timestamp is None')
logs = EvmNodeLogData.from_json(data, timestamp)
await self.emit_logs(logs)

Expand Down

0 comments on commit 8be53d2

Please sign in to comment.