This repository has been archived by the owner on Jul 18, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathapp.py
129 lines (102 loc) · 3.52 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import subprocess
import shlex
import re
import os
from IPy import IP
from flask import Flask, make_response, abort, current_app, render_template
from utils import ListConverter, abort_when_error
from config import configs
IPSET_NAME_MAX = 20
PAC_MAX_CHAIN_PROXIES = 20
app = Flask(__name__)
app.config.from_object(configs[os.environ.get('g2wconf', 'dev')])
app.url_map.converters['list'] = ListConverter
@app.route('/')
def index():
return render_template('index.html')
@app.route('/ipset/<path:args>')
@abort_when_error
def ipset(args):
name, addr = args.split(',')
if name and addr and check_valid_ipset(name, addr):
return make_resp_with_cmd_stdout(
create_cmd(*addr.split(':'), name=name), name + '_ipset.conf')
abort(404)
def check_valid_ipset(name, addr):
return len(name) <= IPSET_NAME_MAX and re.match("^[a-zA-Z0-9_]*$", name)\
and check_addr(*addr.split(':'))
@app.route('/dnsq/<path:args>')
@abort_when_error
def dnsq(args):
ip, port = args.split(':')
if check_addr(ip, port):
return make_resp_with_cmd_stdout(create_cmd(ip, port), 'gfwlist_dnsmasq.conf')
abort(404)
def make_resp_with_cmd_stdout(cmd, filename):
resp = make_response(subprocess.check_output(cmd))
resp.headers["Content-Disposition"] = \
'attachment; filename=' + filename
return resp
def create_cmd(ip, port, name=None):
command = shlex.split(' '.join([
current_app.config['G2D_PTAH'],
'-l',
current_app.config['TXTLIST_PTAH'],
'-d',
ip,
'-p',
port,
'-o -',
]))
if not name: # without ipset name
command.extend(['-i', '-'])
else:
command.extend(['-i', name])
return command
@app.route('/pac/<list:proxies>')
@abort_when_error
def pac(proxies):
if len(proxies) <= PAC_MAX_CHAIN_PROXIES: # allow max 10 chain proxies
all_args = []
for p in proxies:
pass_check, arg = check_and_parse_pac_args(p.split(','))
if pass_check:
all_args.append(arg)
else:
abort(404)
if all_args:
command = shlex.split(' '.join([
'genpac --gfwlist-url="-"',
'--gfwlist-local=' + current_app.config['TXTLIST_PTAH'],
'-p',
''.join(['"', '; '.join(all_args), '"']),
'--user-rule="||naver.jp"',
]))
return make_resp_with_cmd_stdout(command, 'gfwlist.pac')
abort(404)
def check_addr(ip, port):
pass_check = True
if ip and port and 0 < int(port) < 65536:
try:
_ = IP(ip)
except ValueError:
pass_check = False
else:
pass_check = False
return pass_check
def check_and_parse_pac_args(type_addr):
pass_check, arg = False, None
if len(type_addr) == 2:
proxy_type, addr = type_addr
if proxy_type in ['h', 's'] and check_addr(*addr.split(':')):
pass_check = True
arg = create_proxy_arg(proxy_type, addr)
return pass_check, arg
def create_proxy_arg(proxy_type, addr):
args = {
's': ['SOCKS5 ', addr, '; ', 'SOCKS ', addr],
'h': ['PROXY ', addr],
}
return ''.join(args[proxy_type])
if __name__ == '__main__':
app.run()