-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathdecrypt_all.py
124 lines (96 loc) · 3.9 KB
/
decrypt_all.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
''' Decrypt all CMS files - extracted files are placed in the working directory
'''
import stat
from functools import partial
from os.path import join as pjoin
from StringIO import StringIO
from subprocess import PIPE
import logging # Exception to OCAP
logging.basicConfig(format='%(asctime)s (%(levelname)s) %(message)s',
datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO)
log = logging.getLogger(__name__)
def main(mk_decrypt, mk_fops, get_enc_path, walk, getpass,
isfile, abspath, filter_prefix,
decrypt_args=' --overwrite --verbose'):
fops = mk_fops()
decrypt = mk_decrypt(password=getpass())
decrypt_count = 0
for root, dirs, files in walk(get_enc_path()):
for name in files:
full_path = abspath(pjoin(root, name))
if isfile(full_path):
if name.startswith(filter_prefix):
fops.chmod(full_path,
stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR)
log.info('Decrypting %s' % full_path)
ret = decrypt.decrypt(full_path, decrypt_args)
if ret:
raise RuntimeError('Return %d from "%s"' %
(ret, full_path))
decrypt_count += 1
else:
log.info('Skipped due to filter: %s' % name)
log.info('Decrypted %d files' % decrypt_count)
def mock_do_chmod(path, mode):
log.info('chmod: %s, %s' % (path, mode))
class Decrypt(object):
def __init__(self, popen, password, chk_path):
self.password = password
self.popen = popen
self.chk_path = chk_path
@classmethod
def make(cls, popen, password, chk_path):
return Decrypt(popen, password, chk_path)
def decrypt(self, path, decrypt_args=''):
self.chk_path(path)
proc = self.popen(path + decrypt_args, stdin=PIPE, shell=True)
proc.communicate(self.password + '\n')
# Warning, possible deadlock if more input is expected
return proc.wait()
class MockPopen(object):
def __init__(self, path, stdin=None, stdout=None, stderr=None, shell=True):
self.path = path
self.stdout = StringIO('Enter Passphrase: ')
self.stderr = StringIO()
def communicate(self, s):
log.info('MockPopen::communicate %s' % s)
def wait(self):
pass
class Fops(object):
def __init__(self, chk_path, chmod):
self.chk_path = chk_path
self.chmod = chmod
@classmethod
def make(cls, chk_path, chmod):
return Fops(chk_path, chmod)
def chmod(self, path, mode):
self.chk_path(path)
self.chmod(path, mode)
def mock_chmod(path, mode):
log.info('mock_chmod: %s, %s' % (path, mode))
if __name__ == '__main__':
def _tcb(filter_prefix='res000050354req'):
from os import walk, chmod, environ
from os.path import isfile, abspath
from subprocess import Popen
from sys import argv
# Path to where the delivered HD was copied. Recursively search
# for encrypted files matching the filter_prefix pattern.
def get_enc_path():
return abspath(argv[1])
def getpass():
return environ[argv[2]]
def chk_path(path):
if get_enc_path() not in path:
raise RuntimeError('%s not in %s' % (
get_enc_path(), path))
if '--dry-run' in argv:
mk_fops = partial(Fops.make, chk_path=chk_path, chmod=mock_chmod)
mk_decrypt = partial(Decrypt.make, popen=MockPopen,
chk_path=chk_path)
else:
mk_fops = partial(Fops.make, chk_path=chk_path, chmod=chmod)
mk_decrypt = partial(Decrypt.make, popen=Popen, chk_path=chk_path)
main(mk_decrypt, mk_fops, get_enc_path, walk, getpass,
isfile, abspath, filter_prefix)
_tcb()