-
-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Compressed output format (jsongz) for rethinkdb export/import #251
base: master
Are you sure you want to change the base?
Changes from 4 commits
5ec52e0
f6e3e40
a6d8978
a88f11c
1e61100
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ | |
import tempfile | ||
import time | ||
import traceback | ||
import zlib | ||
from multiprocessing.queues import SimpleQueue | ||
|
||
import six | ||
|
@@ -48,7 +49,7 @@ | |
|
||
usage = """rethinkdb export [-c HOST:PORT] [-p] [--password-file FILENAME] [--tls-cert filename] [-d DIR] | ||
[-e (DB | DB.TABLE)]... | ||
[--format (csv | json | ndjson)] [--fields FIELD,FIELD...] [--delimiter CHARACTER] | ||
[--format (csv | json | ndjson | jsongz)] [--fields FIELD,FIELD...] [--delimiter CHARACTER] | ||
[--clients NUM]""" | ||
help_description = ( | ||
"`rethinkdb export` exports data from a RethinkDB cluster into a directory" | ||
|
@@ -118,11 +119,11 @@ def parse_options(argv, prog=None): | |
parser.add_option( | ||
"--format", | ||
dest="format", | ||
metavar="json|csv|ndjson", | ||
metavar="json|csv|ndjson|jsongz", | ||
default="json", | ||
help="format to write (defaults to json. ndjson is newline delimited json.)", | ||
type="choice", | ||
choices=["json", "csv", "ndjson"], | ||
choices=["json", "csv", "ndjson", "jsongz"], | ||
) | ||
parser.add_option( | ||
"--clients", | ||
|
@@ -150,6 +151,17 @@ def parse_options(argv, prog=None): | |
) | ||
parser.add_option_group(csvGroup) | ||
|
||
jsongzGroup = optparse.OptionGroup(parser, "jsongz options") | ||
jsongzGroup.add_option( | ||
"--compression-level", | ||
dest="compression_level", | ||
metavar="NUM", | ||
default=None, | ||
help="compression level, an integer from 0 to 9 (defaults to -1 default zlib compression)", | ||
type="int", | ||
) | ||
parser.add_option_group(jsongzGroup) | ||
|
||
options, args = parser.parse_args(argv) | ||
|
||
# -- Check validity of arguments | ||
|
@@ -185,6 +197,15 @@ def parse_options(argv, prog=None): | |
if options.delimiter: | ||
parser.error("--delimiter option is only valid for CSV file formats") | ||
|
||
if options.format == "jsongz": | ||
if options.compression_level is None: | ||
options.compression_level = -1 | ||
elif options.compression_level < 0 or options.compression_level > 9: | ||
parser.error("--compression-level must be an integer from 0 and 9") | ||
else: | ||
if options.compression_level: | ||
parser.error("--compression-level option is only valid for jsongz file formats") | ||
|
||
# - | ||
|
||
return options | ||
|
@@ -226,6 +247,43 @@ def json_writer(filename, fields, task_queue, error_queue, format): | |
pass | ||
|
||
|
||
def jsongz_writer(filename, fields, task_queue, error_queue, format, compression_level): | ||
try: | ||
with open(filename, "wb") as out: | ||
# wbits 31 = MAX_WBITS + gzip header and trailer | ||
compressor = zlib.compressobj(compression_level, zlib.DEFLATED, 31) | ||
def compress_and_write(str): | ||
out.write(compressor.compress(str.encode("utf-8"))) | ||
|
||
first = True | ||
compress_and_write("[") | ||
item = task_queue.get() | ||
while not isinstance(item, StopIteration): | ||
row = item[0] | ||
if fields is not None: | ||
for item in list(row.keys()): | ||
if item not in fields: | ||
del row[item] | ||
if first: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implies that the objects in the I'm no compression expert, but since it'll be binary and unreadable from a high level perspective, why not skip the new line? Object would be written as follows Or maybe I'm wrong and the \n are used for compression. Let me know, I'm curious now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code here is replicating what the json export does. One can still unpack the jsongz file get the json file from inside (it's a standard gzip file), in which case the formatting might help. I considered the gains from removing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should say that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Did you mean big documents? Hehe Regarding the custom json parser, I have no clue why there's a custom one. Probably, when the library has been written, there were parsers that did not fit/match the requirements. Nowadays there are tons of high performance parsers. In order to not break anything, I would keep the custom one for now. |
||
compress_and_write("\n") | ||
first = False | ||
else: | ||
compress_and_write(",\n") | ||
|
||
compress_and_write(json.dumps(row)) | ||
item = task_queue.get() | ||
|
||
compress_and_write("\n]\n") | ||
out.write(compressor.flush()) | ||
except BaseException: | ||
ex_type, ex_class, tb = sys.exc_info() | ||
error_queue.put((ex_type, ex_class, traceback.extract_tb(tb))) | ||
|
||
# Read until the exit task so the readers do not hang on pushing onto the queue | ||
while not isinstance(task_queue.get(), StopIteration): | ||
pass | ||
|
||
|
||
def csv_writer(filename, fields, delimiter, task_queue, error_queue): | ||
try: | ||
with open(filename, "w") as out: | ||
|
@@ -331,6 +389,19 @@ def export_table( | |
options.format, | ||
), | ||
) | ||
elif options.format == "jsongz": | ||
filename = directory + "/%s/%s.jsongz" % (db, table) | ||
writer = multiprocessing.Process( | ||
target=jsongz_writer, | ||
args=( | ||
filename, | ||
options.fields, | ||
task_queue, | ||
error_queue, | ||
options.format, | ||
options.compression_level, | ||
), | ||
) | ||
elif options.format == "csv": | ||
filename = directory + "/%s/%s.csv" % (db, table) | ||
writer = multiprocessing.Process( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if someone passes -1 as option?
In my opinion it should be changed into
elif options.compression_level < -1 or options.compression_level > 9:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reasoning was: passing
-1
is the same as not specifying the compression level, it's not really setting thecompression_level
.Happy to make the change as suggested though, it might make it easier to switch between setting and not setting the compression level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your reasoning, but then if someone passes
-1
,if options.compression_level is None
is evaluatedFalse
,if options.compression_level < 0 or options.compression_level > 9
is evaluated toTrue
and raises an exception. Which is incorrect, since-1
is an acceptable value.If you have another suggestion on how to handle such situation, feel free to add it. Mine was just a potential suggestion on how to handle it.
All in all, It's no big deal to support
-1
, but could prevent some errors/exceptions, since I assume that the default value is an acceptable value.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated as suggested.