-
Notifications
You must be signed in to change notification settings - Fork 76
/
Copy paths3-mp-upload.py
executable file
·175 lines (153 loc) · 6.65 KB
/
s3-mp-upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python
import argparse
from cStringIO import StringIO
import logging
from math import ceil
from multiprocessing import Pool
import time
import urlparse
import boto
from boto.s3.connection import OrdinaryCallingFormat
parser = argparse.ArgumentParser(description="Transfer large files to S3",
prog="s3-mp-upload")
parser.add_argument("src", type=file, help="The file to transfer")
parser.add_argument("dest", help="The S3 destination object")
parser.add_argument("-np", "--num-processes", help="Number of processors to use",
type=int, default=2)
parser.add_argument("-f", "--force", help="Overwrite an existing S3 key",
action="store_true")
parser.add_argument("-s", "--split", help="Split size, in Mb", type=int, default=50)
parser.add_argument("-rrs", "--reduced-redundancy", help="Use reduced redundancy storage. Default is standard.", default=False, action="store_true")
parser.add_argument("--insecure", dest='secure', help="Use HTTP for connection",
default=True, action="store_false")
parser.add_argument("-t", "--max-tries", help="Max allowed retries for http timeout", type=int, default=5)
parser.add_argument("-v", "--verbose", help="Be more verbose", default=False, action="store_true")
parser.add_argument("-q", "--quiet", help="Be less verbose (for use in cron jobs)", default=False, action="store_true")
logger = logging.getLogger("s3-mp-upload")
def do_part_upload(args):
"""
Upload a part of a MultiPartUpload
Open the target file and read in a chunk. Since we can't pickle
S3Connection or MultiPartUpload objects, we have to reconnect and lookup
the MPU object with each part upload.
:type args: tuple of (string, string, string, int, int, int)
:param args: The actual arguments of this method. Due to lameness of
multiprocessing, we have to extract these outside of the
function definition.
The arguments are: S3 Bucket name, MultiPartUpload id, file
name, the part number, part offset, part size
"""
# Multiprocessing args lameness
bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries = args
logger.debug("do_part_upload got args: %s" % (args,))
# Connect to S3, get the MultiPartUpload
s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
s3.is_secure = secure
bucket = s3.lookup(bucket_name)
mpu = None
for mp in bucket.list_multipart_uploads():
if mp.id == mpu_id:
mpu = mp
break
if mpu is None:
raise Exception("Could not find MultiPartUpload %s" % mpu_id)
# Read the chunk from the file
fp = open(fname, 'rb')
fp.seek(start)
data = fp.read(size)
fp.close()
if not data:
raise Exception("Unexpectedly tried to read an empty chunk")
def progress(x,y):
logger.debug("Part %d: %0.2f%%" % (i+1, 100.*x/y))
try:
# Do the upload
t1 = time.time()
mpu.upload_part_from_file(StringIO(data), i+1, cb=progress)
# Print some timings
t2 = time.time() - t1
s = len(data)/1024./1024.
logger.info("Uploaded part %s (%0.2fM) in %0.2fs at %0.2fMBps" % (i+1, s, t2, s/t2))
except Exception, err:
logger.debug("Retry request %d of max %d times" % (current_tries, max_tries))
if (current_tries > max_tries):
logger.error(err)
else:
time.sleep(3)
current_tries += 1
do_part_download(bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries)
def main(src, dest, num_processes=2, split=50, force=False, reduced_redundancy=False, verbose=False, quiet=False, secure=True, max_tries=5):
# Check that dest is a valid S3 url
split_rs = urlparse.urlsplit(dest)
if split_rs.scheme != "s3":
raise ValueError("'%s' is not an S3 url" % dest)
s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
s3.is_secure = secure
bucket = s3.lookup(split_rs.netloc)
if bucket == None:
raise ValueError("'%s' is not a valid bucket" % split_rs.netloc)
key = bucket.get_key(split_rs.path)
# See if we're overwriting an existing key
if key is not None:
if not force:
raise ValueError("'%s' already exists. Specify -f to overwrite it" % dest)
# Determine the splits
part_size = max(5*1024*1024, 1024*1024*split)
src.seek(0,2)
size = src.tell()
num_parts = int(ceil(size / part_size))
# If file is less than 5M, just upload it directly
if size < 5*1024*1024:
src.seek(0)
t1 = time.time()
k = boto.s3.key.Key(bucket,split_rs.path)
k.set_contents_from_file(src)
t2 = time.time() - t1
s = size/1024./1024.
logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2))
return
# Create the multi-part upload object
mpu = bucket.initiate_multipart_upload(split_rs.path, reduced_redundancy=reduced_redundancy)
logger.info("Initialized upload: %s" % mpu.id)
# Generate arguments for invocations of do_part_upload
def gen_args(num_parts, fold_last):
for i in range(num_parts+1):
part_start = part_size*i
if i == (num_parts-1) and fold_last is True:
yield (bucket.name, mpu.id, src.name, i, part_start, part_size*2, secure, max_tries, 0)
break
else:
yield (bucket.name, mpu.id, src.name, i, part_start, part_size, secure, max_tries, 0)
# If the last part is less than 5M, just fold it into the previous part
fold_last = ((size % part_size) < 5*1024*1024)
# Do the thing
try:
# Create a pool of workers
pool = Pool(processes=num_processes)
t1 = time.time()
pool.map_async(do_part_upload, gen_args(num_parts, fold_last)).get(9999999)
# Print out some timings
t2 = time.time() - t1
s = size/1024./1024.
# Finalize
src.close()
mpu.complete_upload()
logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2))
except KeyboardInterrupt:
logger.warn("Received KeyboardInterrupt, canceling upload")
pool.terminate()
mpu.cancel_upload()
except Exception, err:
logger.error("Encountered an error, canceling upload")
logger.error(err)
mpu.cancel_upload()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
args = parser.parse_args()
arg_dict = vars(args)
if arg_dict['quiet'] == True:
logger.setLevel(logging.WARNING)
if arg_dict['verbose'] == True:
logger.setLevel(logging.DEBUG)
logger.debug("CLI args: %s" % args)
main(**arg_dict)