-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwebhook_handler.py
219 lines (190 loc) · 5.78 KB
/
webhook_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from flask import Flask
from flask import request
import pyotp
import os
import json
import logging
from logging.handlers import RotatingFileHandler
import hashlib
import hmac
import sys
CONFIG_PATH=sys.argv[3]
app=Flask(__name__)
def log(payload):
commits=payload['commits']
added_files=[]
removed_files=[]
modified_files=[]
for index in range(len(commits)):
added_files+=commits[index]['added']
removed_files+=commits[index]['removed']
modified_files+=commits[index]['modified']
sender=payload['sender']['login']
branch=payload['ref'].split('/')[-1]
sender_is_site_admin=payload['sender']['site_admin']
logging.info('Branch:{} Added files:{} Modified files:{} Removed files:{}\
Sender:{} isSiteAdmin:{}'
.format(str(branch),
str(added_files),
str(modified_files),
str(removed_files),
str(sender),
str(sender_is_site_admin),
))
def load_config(path=CONFIG_PATH):
try:
with open(path) as f:
config=json.load(f)
except Exception as e:
print (e)
logging.critical("Failed to open config file '{}'.".format(path))
return False
try:
global DEPLOY_PATHS
global CHECK_FILES
global GIT_ADDRESS
global TOTP_SEED
global GITHUB_SECRET
global ALLOW_BRANCHES
ALLOW_BRANCHES=config['allow_branches']
DEPLOY_PATHS=config['deploy_paths']
CHECK_FILES=config['check_files']
GIT_ADDRESS=config['git_address']
TOTP_SEED=config['totp_seed']
GITHUB_SECRET=config['github_secret']
return True
except:
logging.critical("Failed to load config.")
return False
def deploy(deploy_path,branch):
try:
cmd="cd {};git checkout {};git pull origin {}".format(deploy_path,branch,branch)
result=os.popen(cmd).read()
print (result)
logging.info('Deployed succeed.')
except:
logging.critical('Deployed failed.')
def check_signature(signature,data):
try:
signature=signature.split('=')[-1]
load_config()
mac = hmac.new(GITHUB_SECRET.encode(), msg=data,\
digestmod=hashlib.sha1).hexdigest()
result=hmac.compare_digest(mac, str(signature))
except:
logging.warning("Some thing wrong while checking signature.")
return False
if result:
return True
else:
logging.warning('Signatures didn\'t match! {} && {}(local)'.\
format(signature,mac))
return False
def check_permission(payload):
with open('check_totp_flag','r') as f:
if f.read():
logging.warning('Refuse deploy.\
Because the check_totp_flag is True.')
return 1
if not load_config():
return 0
branch=payload['ref'].split('/')[-1]
if branch not in ALLOW_BRANCHES:
logging.info('Refuse deploy. Because the branch doesn\'t in the allow branches.')
return 3
commits=payload['commits']
added_files=[]
removed_files=[]
modified_files=[]
for index in range(len(commits)):
added_files+=commits[index]['added']
removed_files+=commits[index]['removed']
modified_files+=commits[index]['modified']
for item in CHECK_FILES:
if item in added_files+removed_files+modified_files:
with open('check_totp_flag','w') as f:
f.write('1')
logging.warning('Refuse deploy.\
Because there are some files needed to be checked by TOTP.')
return 1
return 2
@app.route('/deploy',methods=['POST'])
def webhook_handle():
data=request.data
payload=request.json
signature=request.headers.get('X-Hub-Signature')
if not check_signature(signature,data=data):
return 'Are you crazzzzzzzzzy!'
if not load_config():
return '500'
log(payload)
result=check_permission(payload)
if result == 0:
return '500'
elif result == 1:
return 'Sorry. Can\'t deploy right now.\
Because there are some files needed to be checked by admin.'
elif result == 2:
branch=payload['ref'].split('/')[-1]
deploy_path=DEPLOY_PATHS[branch]
deploy(deploy_path,branch)
return 'Deployed'
elif result == 3:
return 'Sorry. Can\'t deploy. Because the branch doesn\'t in the allow branches.'
@app.route('/auth/<totp>',methods=['GET'])
def check_totp_handle(totp):
if not load_config():
return "Are you crazzzzzzzy!"
result=pyotp.TOTP(TOTP_SEED).verify(totp)
if not result:
logging.warning('TOTP Authentication failed: {}'.format(totp))
return "Are you crazzzzzzzy!"
logging.info('TOTP Authentication succeed: {}'.format(totp))
deploy()
with open('check_totp_flag','w') as f:
f.write('')
return "Deployed"
def init_logging():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)-8s [+]%(message)s',
datefmt='%d %b %Y %H:%M:%S',
)
Rthandler = RotatingFileHandler('webhook_handler_log',\
maxBytes=1024*1024,backupCount=2)
Rthandler.setLevel(logging.INFO)
formatter=logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
Rthandler.setFormatter(formatter)
logging.getLogger('').addHandler(Rthandler)
def check_deploy_dir(deploy_path):
if not load_config():
return False
if os.path.isdir(deploy_path):
if os.listdir(deploy_path):
if os.path.exists(deploy_path+"/.git/"):
return True
else:
logging.critical("{} is not a Git directory.".format(deploy_path))
else:
result=os.system("git clone {} {}".format(GIT_ADDRESS,deploy_path))
if result != 0:
logging.critical('Failed to clone from {}'.format(GIT_ADDRESS))
return False
logging.info("Cloned from {}".format(GIT_ADDRESS))
return True
else:
logging.critical('Directory {} doesn\'t exist.'.format(deploy_path))
return False
def init():
if not os.path.exists('check_totp_flag'):
with open('check_totp_flag','w') as f:
f.write('')
init_logging()
load_config()
print (DEPLOY_PATHS)
for branch in DEPLOY_PATHS:
if not check_deploy_dir(DEPLOY_PATHS[branch]):
os._exit(0)
if __name__ == '__main__':
init()
app.run(host=sys.argv[1],port=sys.argv[2])