-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathzip_util.py
82 lines (74 loc) · 2.39 KB
/
zip_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import gzip
import subprocess
from subprocess import Popen, PIPE, check_call
import signal
class LineReader(object):
def __init__(self,fname):
if fname.endswith('.gz'):
if not os.path.isfile(fname):
raise IOError(fname)
self.f = Popen(['gunzip', '-c', fname], stdout=PIPE, stderr=PIPE)
self.zipped=True
else:
self.f = open(fname,'r')
self.zipped=False
def readlines(self):
if self.zipped:
for line in self.f.stdout:
yield line
else:
for line in self.f.readlines():
yield line
def close(self):
if self.zipped:
if self.f.poll() == None:
os.kill(self.f.pid, signal.SIGHUP)
else:
self.f.close()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def __iter__(self):
return self.readlines()
class CachedLineReader(object):
def __init__ (self, fname):
self.line_reader = LineReader(fname)
self.cached_lines = [line for line in self.line_reader.readlines()]
self.line_reader.close()
def readlines(self):
return self.cached_lines
def zip_file(file_path):
if os.path.isfile(file_path):
f_in = open(file_path, 'rb')
f_out_name = file_path + '.gz'
f_out = gzip.open(f_out_name, 'wb')
f_out.writelines(f_in)
f_out.close()
f_in.close()
os.remove(file_path)
return f_out_name
def zip_file_with_gzip(file_path):
if file_path.endswith('.gz'):
return file_path
subprocess.check_call(['gzip', file_path])
zipped_path = file_path + '.gz'
assert( os.path.isfile(zipped_path) )
return zipped_path
def unzip_file(file_path):
if os.path.isfile(file_path) and file_path.endswith('.gz'):
f_in = gzip.open(file_path,'rb')
f_out_name = file_path[:-3]
f_out = open(f_out_name, 'wb')
f_out.writelines(f_in)
f_out.close()
f_in.close()
os.remove(file_path)
return f_out_name
def recursive_gunzip( dir_path ):
for p in [os.path.join(dir_path, x) for x in os.listdir( dir_path )]:
if os.path.isdir(p):
recursive_gunzip(p)
elif os.path.isfile(p) and p.endswith('.gz'):
subprocess.check_call(['gunzip', p])