Skip to content

Commit

Permalink
Convert connection tracking to a plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
pipermerriam committed Apr 25, 2019
1 parent c221409 commit 8d9d19f
Show file tree
Hide file tree
Showing 28 changed files with 999 additions and 449 deletions.
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ disallow_untyped_calls = True
warn_redundant_casts = True
warn_unused_configs = True
strict_equality = True
plugins = sqlmypy
38 changes: 26 additions & 12 deletions p2p/peer_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@
DiscoveryPeerBackend,
BootnodesPeerBackend,
)
from p2p.persistence import (
BasePeerInfo,
NoopPeerInfo,
)
from p2p.p2p_proto import (
DisconnectReason,
)
from p2p.service import (
BaseService,
)
from p2p.tracking.connection import (
BaseConnectionTracker,
NoopConnectionTracker,
)


TO_DISCOVERY_BROADCAST_CONFIG = BroadcastConfig(filter_endpoint=DISCOVERY_EVENTBUS_ENDPOINT)
Expand Down Expand Up @@ -108,17 +108,11 @@ def __init__(self,
privkey: datatypes.PrivateKey,
context: BasePeerContext,
max_peers: int = DEFAULT_MAX_PEERS,
peer_info: BasePeerInfo = None,
token: CancelToken = None,
event_bus: Endpoint = None,
) -> None:
super().__init__(token)

if peer_info is None:
peer_info = NoopPeerInfo()

self.peer_info = peer_info

self.privkey = privkey
self.max_peers = max_peers
self.context = context
Expand Down Expand Up @@ -146,6 +140,13 @@ def event_bus(self) -> Endpoint:
def has_event_bus(self) -> bool:
return self._event_bus is not None

def setup_connection_tracker(self) -> BaseConnectionTracker:
"""
Return an instance of `p2p.tracking.connection.BaseConnectionTracker`
which will be used to track peer connection failures.
"""
return NoopConnectionTracker()

def setup_peer_backends(self) -> Tuple[BasePeerBackend, ...]:
return (
DiscoveryPeerBackend(self.event_bus),
Expand Down Expand Up @@ -246,6 +247,9 @@ async def start_peer(self, peer: BasePeer) -> None:
self.logger.debug('Timout waiting for peer to boot: %s', err)
await peer.disconnect(DisconnectReason.timeout)
return
except HandshakeFailure as err:
self.connection_tracker.record_failure(peer.remote, err)
raise
else:
if peer.is_operational:
self._add_peer(peer, buffer.get_messages())
Expand Down Expand Up @@ -298,7 +302,17 @@ async def connect(self, remote: Node) -> BasePeer:
if remote in self.connected_nodes:
self.logger.debug2("Skipping %s; already connected to it", remote)
raise IneligiblePeer(f"Already connected to {remote}")
if not self.peer_info.should_connect_to(remote):

try:
should_connect = await self.wait(
self.connection_tracker.coro_should_connect_to(remote),
timeout=1,
)
except TimeoutError:
self.logger.warning("ConnectionTracker.coro_should_connect_to request timed out.")
raise

if not should_connect:
raise IneligiblePeer(f"Peer database rejected peer candidate: {remote}")

try:
Expand Down Expand Up @@ -329,7 +343,7 @@ async def connect(self, remote: Node) -> BasePeer:
raise
except HandshakeFailure as e:
self.logger.debug("Could not complete handshake with %r: %s", remote, repr(e))
self.peer_info.record_failure(remote, e)
self.connection_tracker.record_failure(remote, e)
raise
except COMMON_PEER_CONNECTION_EXCEPTIONS as e:
self.logger.debug("Could not complete handshake with %r: %s", remote, repr(e))
Expand Down
252 changes: 0 additions & 252 deletions p2p/persistence.py

This file was deleted.

Empty file added p2p/tracking/__init__.py
Empty file.
Loading

0 comments on commit 8d9d19f

Please sign in to comment.