Skip to content

Commit

Permalink
Merge pull request #3631 from lbryio/bootstrap_node
Browse files Browse the repository at this point in the history
Add all peers when running as a bootstrap node
  • Loading branch information
shyba authored Aug 29, 2022
2 parents 01cd95f + e3ee389 commit a334a93
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 164 deletions.
4 changes: 4 additions & 0 deletions lbry/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,10 @@ class Config(CLIConfig):
"will increase. This setting is used by seed nodes, you probably don't want to change it during normal "
"use.", 2
)
is_bootstrap_node = Toggle(
"When running as a bootstrap node, disable all logic related to balancing the routing table, so we can "
"add as many peers as possible and better help first-runs.", False
)

# protocol timeouts
download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0)
Expand Down
21 changes: 6 additions & 15 deletions lbry/dht/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ class Node:
)
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX,
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_bootstrap_node: bool = False,
storage: typing.Optional['SQLiteStorage'] = None):
self.loop = loop
self.internal_udp_port = internal_udp_port
self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout,
split_buckets_under_index)
split_buckets_under_index, is_bootstrap_node)
self.listening_port: asyncio.DatagramTransport = None
self.joined = asyncio.Event(loop=self.loop)
self._join_task: asyncio.Task = None
Expand Down Expand Up @@ -70,13 +70,6 @@ async def refresh_node(self, force_once=False):

# get ids falling in the midpoint of each bucket that hasn't been recently updated
node_ids = self.protocol.routing_table.get_refresh_list(0, True)
# if we have 3 or fewer populated buckets get two random ids in the range of each to try and
# populate/split the buckets further
buckets_with_contacts = self.protocol.routing_table.buckets_with_contacts()
if buckets_with_contacts <= 3:
for i in range(buckets_with_contacts):
node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i))
node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i))

if self.protocol.routing_table.get_peers():
# if we have node ids to look up, perform the iterative search until we have k results
Expand Down Expand Up @@ -203,15 +196,13 @@ def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typ

def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
max_results: int = constants.K) -> IterativeNodeFinder:

return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, max_results, None, shortlist)
shortlist = shortlist or self.protocol.routing_table.find_close_peers(key)
return IterativeNodeFinder(self.loop, self.protocol, key, max_results, shortlist)

def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
max_results: int = -1) -> IterativeValueFinder:

return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, max_results, None, shortlist)
shortlist = shortlist or self.protocol.routing_table.find_close_peers(key)
return IterativeValueFinder(self.loop, self.protocol, key, max_results, shortlist)

async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None
Expand Down
45 changes: 10 additions & 35 deletions lbry/dht/protocol/iterative_find.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from lbry.dht.serialization.datagram import PAGE_KEY

if TYPE_CHECKING:
from lbry.dht.protocol.routing_table import TreeRoutingTable
from lbry.dht.protocol.protocol import KademliaProtocol
from lbry.dht.peer import PeerManager, KademliaPeer

Expand Down Expand Up @@ -57,37 +56,19 @@ def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
return [(node_id, address.decode(), port) for node_id, address, port in self.close_triples]


def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
shortlist: typing.Optional[typing.List['KademliaPeer']]) -> typing.List['KademliaPeer']:
"""
If not provided, initialize the shortlist of peers to probe to the (up to) k closest peers in the routing table
:param routing_table: a TreeRoutingTable
:param key: a 48 byte hash
:param shortlist: optional manually provided shortlist, this is done during bootstrapping when there are no
peers in the routing table. During bootstrap the shortlist is set to be the seed nodes.
"""
if len(key) != constants.HASH_LENGTH:
raise ValueError("invalid key length: %i" % len(key))
return shortlist or routing_table.find_close_peers(key)


class IterativeFinder(AsyncIterator):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
def __init__(self, loop: asyncio.AbstractEventLoop,
protocol: 'KademliaProtocol', key: bytes,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
if len(key) != constants.HASH_LENGTH:
raise ValueError("invalid key length: %i" % len(key))
self.loop = loop
self.peer_manager = peer_manager
self.routing_table = routing_table
self.peer_manager = protocol.peer_manager
self.protocol = protocol

self.key = key
self.max_results = max(constants.K, max_results)
self.exclude = exclude or []

self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted
self.contacted: typing.Set['KademliaPeer'] = set()
Expand All @@ -99,7 +80,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.iteration_count = 0
self.running = False
self.tasks: typing.List[asyncio.Task] = []
for peer in get_shortlist(routing_table, key, shortlist):
for peer in shortlist:
if peer.node_id:
self._add_active(peer, force=True)
else:
Expand Down Expand Up @@ -198,8 +179,6 @@ def _search_round(self):
if index > (constants.K + len(self.running_probes)):
break
origin_address = (peer.address, peer.udp_port)
if origin_address in self.exclude:
continue
if peer.node_id == self.protocol.node_id:
continue
if origin_address == (self.protocol.external_ip, self.protocol.udp_port):
Expand Down Expand Up @@ -277,13 +256,11 @@ async def aclose(self):
type(self).__name__, id(self), self.key.hex()[:8])

class IterativeNodeFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
def __init__(self, loop: asyncio.AbstractEventLoop,
protocol: 'KademliaProtocol', key: bytes,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
super().__init__(loop, protocol, key, max_results, shortlist)
self.yielded_peers: typing.Set['KademliaPeer'] = set()

async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
Expand Down Expand Up @@ -319,13 +296,11 @@ def check_result_ready(self, response: FindNodeResponse):


class IterativeValueFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
def __init__(self, loop: asyncio.AbstractEventLoop,
protocol: 'KademliaProtocol', key: bytes,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
super().__init__(loop, protocol, key, max_results, shortlist)
self.blob_peers: typing.Set['KademliaPeer'] = set()
# this tracks the index of the most recent page we requested from each peer
self.peer_pages: typing.DefaultDict['KademliaPeer', int] = defaultdict(int)
Expand Down
87 changes: 14 additions & 73 deletions lbry/dht/protocol/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ def __init__(self, loop: asyncio.AbstractEventLoop, protocol: 'KademliaProtocol'
def running(self):
return self._running

@property
def busy(self):
return self._running and (any(self._running_pings) or any(self._pending_contacts))

def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None):
delay = delay if delay is not None else self._default_delay
now = self._loop.time()
Expand All @@ -229,7 +233,7 @@ def maybe_ping(self, peer: 'KademliaPeer'):
async def ping_task():
try:
if self._protocol.peer_manager.peer_is_good(peer):
if peer not in self._protocol.routing_table.get_peers():
if not self._protocol.routing_table.get_peer(peer.node_id):
self._protocol.add_peer(peer)
return
await self._protocol.get_rpc_peer(peer).ping()
Expand Down Expand Up @@ -294,7 +298,7 @@ class KademliaProtocol(DatagramProtocol):

def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT,
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_boostrap_node: bool = False):
self.peer_manager = peer_manager
self.loop = loop
self.node_id = node_id
Expand All @@ -309,7 +313,8 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.transport: DatagramTransport = None
self.old_token_secret = constants.generate_id()
self.token_secret = constants.generate_id()
self.routing_table = TreeRoutingTable(self.loop, self.peer_manager, self.node_id, split_buckets_under_index)
self.routing_table = TreeRoutingTable(
self.loop, self.peer_manager, self.node_id, split_buckets_under_index, is_bootstrap_node=is_boostrap_node)
self.data_store = DictDataStore(self.loop, self.peer_manager)
self.ping_queue = PingQueue(self.loop, self)
self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
Expand Down Expand Up @@ -356,72 +361,10 @@ def _migrate_incoming_rpc_args(peer: 'KademliaPeer', method: bytes, *args) -> ty
return args, {}

async def _add_peer(self, peer: 'KademliaPeer'):
if not peer.node_id:
log.warning("Tried adding a peer with no node id!")
return False
for my_peer in self.routing_table.get_peers():
if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id:
self.routing_table.remove_peer(my_peer)
self.routing_table.join_buckets()
bucket_index = self.routing_table.kbucket_index(peer.node_id)
if self.routing_table.buckets[bucket_index].add_peer(peer):
return True

# The bucket is full; see if it can be split (by checking if its range includes the host node's node_id)
if self.routing_table.should_split(bucket_index, peer.node_id):
self.routing_table.split_bucket(bucket_index)
# Retry the insertion attempt
result = await self._add_peer(peer)
self.routing_table.join_buckets()
return result
else:
# We can't split the k-bucket
#
# The 13 page kademlia paper specifies that the least recently contacted node in the bucket
# shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful
# the new contact is ignored and not added to the bucket (sections 2.2 and 2.4).
#
# A reasonable extension to this is BEP 0005, which extends the above:
#
# Not all nodes that we learn about are equal. Some are "good" and some are not.
# Many nodes using the DHT are able to send queries and receive responses,
# but are not able to respond to queries from other nodes. It is important that
# each node's routing table must contain only known good nodes. A good node is
# a node has responded to one of our queries within the last 15 minutes. A node
# is also good if it has ever responded to one of our queries and has sent us a
# query within the last 15 minutes. After 15 minutes of inactivity, a node becomes
# questionable. Nodes become bad when they fail to respond to multiple queries
# in a row. Nodes that we know are good are given priority over nodes with unknown status.
#
# When there are bad or questionable nodes in the bucket, the least recent is selected for
# potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent)
# contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact
# is ignored if the pinged node replies.

not_good_contacts = self.routing_table.buckets[bucket_index].get_bad_or_unknown_peers()
not_recently_replied = []
for my_peer in not_good_contacts:
last_replied = self.peer_manager.get_last_replied(my_peer.address, my_peer.udp_port)
if not last_replied or last_replied + 60 < self.loop.time():
not_recently_replied.append(my_peer)
if not_recently_replied:
to_replace = not_recently_replied[0]
else:
to_replace = self.routing_table.buckets[bucket_index].peers[0]
last_replied = self.peer_manager.get_last_replied(to_replace.address, to_replace.udp_port)
if last_replied and last_replied + 60 > self.loop.time():
return False
log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port)
try:
to_replace_rpc = self.get_rpc_peer(to_replace)
await to_replace_rpc.ping()
return False
except asyncio.TimeoutError:
log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index,
to_replace.address, to_replace.udp_port, peer.address, peer.udp_port)
if to_replace in self.routing_table.buckets[bucket_index]:
self.routing_table.buckets[bucket_index].remove_peer(to_replace)
return await self._add_peer(peer)
async def probe(some_peer: 'KademliaPeer'):
rpc_peer = self.get_rpc_peer(some_peer)
await rpc_peer.ping()
return await self.routing_table.add_peer(peer, probe)

def add_peer(self, peer: 'KademliaPeer'):
if peer.node_id == self.node_id:
Expand All @@ -439,7 +382,6 @@ async def routing_table_task(self):
async with self._split_lock:
peer = self._to_remove.pop()
self.routing_table.remove_peer(peer)
self.routing_table.join_buckets()
while self._to_add:
async with self._split_lock:
await self._add_peer(self._to_add.pop())
Expand Down Expand Up @@ -482,9 +424,8 @@ def handle_request_datagram(self, address: typing.Tuple[str, int], request_datag
# This is an RPC method request
self.received_request_metric.labels(method=request_datagram.method).inc()
self.peer_manager.report_last_requested(address[0], address[1])
try:
peer = self.routing_table.get_peer(request_datagram.node_id)
except IndexError:
peer = self.routing_table.get_peer(request_datagram.node_id)
if not peer:
try:
peer = make_kademlia_peer(request_datagram.node_id, address[0], address[1])
except ValueError as err:
Expand Down
Loading

0 comments on commit a334a93

Please sign in to comment.