-
Notifications
You must be signed in to change notification settings - Fork 91
/
Copy pathtools.py
131 lines (98 loc) · 3.06 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import base64
import hashlib
import os
import yaml
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Hash import SHA1
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Util.Padding import pad, unpad
yaml_path = "./config.yaml"
def get_pssh(inia):
raw = base64.b64decode(inia) if isinstance(inia, str) else inia
offset = raw.rfind(b'pssh')
d = base64.b64encode(raw[offset - 4:offset - 4 + raw[offset - 1]])
return d.decode('utf-8')
def is_dir(path):
if not os.path.exists(path):
os.makedirs(path)
def check_file():
files = ["./chache", "./download"]
for file in files:
is_dir(file)
def write_yaml(r):
with open(yaml_path, "w", encoding="utf-8") as f:
yaml.dump(r, f)
def read_yaml():
with open(yaml_path, "r", encoding="utf-8") as f:
return yaml.load(f, Loader=yaml.FullLoader)
def updata_yaml(k, v):
old_data = read_yaml()
old_data[k] = v
with open(yaml_path, "w", encoding="utf-8") as f:
yaml.dump(old_data, f)
def get_config():
try:
data = read_yaml()
except FileNotFoundError:
tx = input("请输入腾讯ck:")
yk = input("请输入优酷ck:")
aqy = input("请输入爱奇艺ck:")
data = {
"txck": tx,
"yk": yk,
"aqy": aqy,
}
write_yaml(data)
return data
def b64decode(data: str):
data = data + "=" * (4 - len(data) % 4)
return base64.b64decode(data)
def djb2Hash(e):
t = 5381
for r in range(len(e)):
t += (t << 5) + ord(e[r])
return t & 2147483647
def aes_encrypt(key: bytes, data: bytes, iv: bytes = None):
if iv is None:
iv = b'\x00' * 16
cipher = AES.new(key, AES.MODE_CBC, iv)
data = pad(data, cipher.block_size)
return cipher.encrypt(data)
def aes_decrypt(key: bytes, data: bytes, iv: bytes = None):
if iv is None:
iv = b'\x00' * 16
cipher = AES.new(key, AES.MODE_CBC, iv)
data = cipher.decrypt(data)
return unpad(data, cipher.block_size)
def rsa_dec(prikey, data: bytes):
key = RSA.importKey(prikey)
cipher = PKCS1_OAEP.new(key)
ret = b""
k = cipher._key.size_in_bytes()
for i in range(0, len(data), k):
ret += cipher.decrypt(data[i:i + k])
return ret.decode()
def sha1withrsa(prikey, data: bytes):
key = RSA.importKey(prikey)
h = SHA1.new(data)
signer = pkcs1_15.new(key)
signature = signer.sign(h)
return base64.b64encode(signature).decode()
def dealck(ck: str) -> dict:
ck = ck.split(";")
ckdict = {}
for i in ck:
i = i.strip()
i = i.split("=")
ckdict[i[0]] = i[1]
return ckdict
def md5(data: str) -> str:
return hashlib.md5(data.encode()).hexdigest()
def get_size(a):
size_suffixes = ['B', 'KB', 'MB', 'GB']
for suffix in size_suffixes:
if a < 1024:
return f"{a:.2f}{suffix}"
a /= 1024
return f"{a:.2f}TB"