-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathutils.py
94 lines (60 loc) · 2.23 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import binascii
import math
from collections import namedtuple
from heapq import nsmallest
from secrets import token_bytes, randbits
from socket import inet_ntoa, inet_aton
from chardet import detect
Peer = namedtuple("peer", ["host", "port"])
Node = namedtuple("node", ["id", "host", "port"])
def generate_id():
return token_bytes(2)
def generate_node_id():
return token_bytes(20)
def xor(node_one_id, node_two_id):
return int.from_bytes(node_one_id, "big") ^ int.from_bytes(node_two_id, "big")
def fetch_k_closest_nodes(nodes, target_id, k_value=8):
return nsmallest(k_value, nodes, lambda node: xor(node.id, target_id))
def get_rand_bool():
return bool(randbits(1))
def get_routing_table_index(distance):
if distance:
return int(math.floor(math.log(math.fabs(distance), 2.0)))
else:
return 0
def decode_values(values):
for value in values:
if len(value) % 6 != 0:
return
ip = inet_ntoa(value[0: 4]) # from network order to IP address
port = int.from_bytes(value[4: 6], "big")
if port >= 1024:
yield Peer(ip, port)
def decode_nodes(nodes):
if len(nodes) % 26 != 0:
return
for i in range(0, len(nodes), 26):
node_id = nodes[i: i + 20]
ip = inet_ntoa(nodes[i + 20: i + 24]) # from network order to IP address
port = int.from_bytes(nodes[i + 24: i + 26], "big")
if port >= 1024:
yield Node(node_id, ip, port)
def encode_nodes(nodes):
result = bytes()
for node_id, ip, port in nodes:
ip_message = inet_aton(ip)
port_message = port.to_bytes(2, "big")
result = result + node_id + ip_message + port_message
return result
def hexlify(info_hash):
return str(binascii.hexlify(info_hash), "utf-8")
def decode_bytes(byte_str):
if isinstance(byte_str, list):
return [decode_bytes(item) for item in byte_str]
if isinstance(byte_str, dict):
return {key: decode_bytes(value) for key, value in byte_str.items()}
if isinstance(byte_str, bytes):
return str(byte_str, detect(byte_str).get("encoding", "utf-8"))
return byte_str
def decode_bkeys(val_type, value):
return str(value, "utf-8") if val_type == "key" else value