diff --git a/rethinkdb/_export.py b/rethinkdb/_export.py old mode 100755 new mode 100644 index 01bae2f4..62147d8d --- a/rethinkdb/_export.py +++ b/rethinkdb/_export.py @@ -33,6 +33,7 @@ import tempfile import time import traceback +import zlib from multiprocessing.queues import SimpleQueue import six @@ -48,7 +49,7 @@ usage = """rethinkdb export [-c HOST:PORT] [-p] [--password-file FILENAME] [--tls-cert filename] [-d DIR] [-e (DB | DB.TABLE)]... - [--format (csv | json | ndjson)] [--fields FIELD,FIELD...] [--delimiter CHARACTER] + [--format (csv | json | ndjson | jsongz)] [--fields FIELD,FIELD...] [--delimiter CHARACTER] [--clients NUM]""" help_description = ( "`rethinkdb export` exports data from a RethinkDB cluster into a directory" @@ -118,11 +119,11 @@ def parse_options(argv, prog=None): parser.add_option( "--format", dest="format", - metavar="json|csv|ndjson", + metavar="json|csv|ndjson|jsongz", default="json", help="format to write (defaults to json. ndjson is newline delimited json.)", type="choice", - choices=["json", "csv", "ndjson"], + choices=["json", "csv", "ndjson", "jsongz"], ) parser.add_option( "--clients", @@ -150,6 +151,17 @@ def parse_options(argv, prog=None): ) parser.add_option_group(csvGroup) + jsongzGroup = optparse.OptionGroup(parser, "jsongz options") + jsongzGroup.add_option( + "--compression-level", + dest="compression_level", + metavar="NUM", + default=None, + help="compression level, an integer from 0 to 9 (defaults to -1 default zlib compression)", + type="int", + ) + parser.add_option_group(jsongzGroup) + options, args = parser.parse_args(argv) # -- Check validity of arguments @@ -185,6 +197,15 @@ def parse_options(argv, prog=None): if options.delimiter: parser.error("--delimiter option is only valid for CSV file formats") + if options.format == "jsongz": + if options.compression_level is None: + options.compression_level = -1 + elif options.compression_level < -1 or options.compression_level > 9: + parser.error("--compression-level must be an integer from 0 and 9") + else: + if options.compression_level: + parser.error("--compression-level option is only valid for jsongz file formats") + # - return options @@ -226,6 +247,43 @@ def json_writer(filename, fields, task_queue, error_queue, format): pass +def jsongz_writer(filename, fields, task_queue, error_queue, format, compression_level): + try: + with open(filename, "wb") as out: + # wbits 31 = MAX_WBITS + gzip header and trailer + compressor = zlib.compressobj(compression_level, zlib.DEFLATED, 31) + def compress_and_write(str): + out.write(compressor.compress(str.encode("utf-8"))) + + first = True + compress_and_write("[") + item = task_queue.get() + while not isinstance(item, StopIteration): + row = item[0] + if fields is not None: + for item in list(row.keys()): + if item not in fields: + del row[item] + if first: + compress_and_write("\n") + first = False + else: + compress_and_write(",\n") + + compress_and_write(json.dumps(row)) + item = task_queue.get() + + compress_and_write("\n]\n") + out.write(compressor.flush()) + except BaseException: + ex_type, ex_class, tb = sys.exc_info() + error_queue.put((ex_type, ex_class, traceback.extract_tb(tb))) + + # Read until the exit task so the readers do not hang on pushing onto the queue + while not isinstance(task_queue.get(), StopIteration): + pass + + def csv_writer(filename, fields, delimiter, task_queue, error_queue): try: with open(filename, "w") as out: @@ -331,6 +389,19 @@ def export_table( options.format, ), ) + elif options.format == "jsongz": + filename = directory + "/%s/%s.jsongz" % (db, table) + writer = multiprocessing.Process( + target=jsongz_writer, + args=( + filename, + options.fields, + task_queue, + error_queue, + options.format, + options.compression_level, + ), + ) elif options.format == "csv": filename = directory + "/%s/%s.csv" % (db, table) writer = multiprocessing.Process( diff --git a/rethinkdb/_import.py b/rethinkdb/_import.py old mode 100755 new mode 100644 index 0ce90bfc..8f82ccdd --- a/rethinkdb/_import.py +++ b/rethinkdb/_import.py @@ -30,9 +30,11 @@ import optparse import os import signal +import struct import sys import time import traceback +import zlib from multiprocessing.queues import Queue, SimpleQueue import six @@ -56,6 +58,9 @@ JSON_MAX_BUFFER_SIZE = 128 * 1024 * 1024 MAX_NESTING_DEPTH = 100 +# jsongz parameters +JSON_GZ_READ_CHUNK_SIZE = 16 * 1024 + Error = collections.namedtuple("Error", ["message", "traceback", "file"]) @@ -133,7 +138,10 @@ def __init__( self._source = source else: try: - self._source = codecs.open(source, mode="r", encoding="utf-8") + if self.format == "jsongz": + self._source = open(source, mode="rb") + else: + self._source = codecs.open(source, mode="r", encoding="utf-8") except IOError as exc: default_logger.exception(exc) raise ValueError( @@ -145,9 +153,16 @@ def __init__( and self._source.name and os.path.isfile(self._source.name) ): - self._bytes_size.value = os.path.getsize(source) + self._bytes_size.value = os.path.getsize(self._source.name) if self._bytes_size.value == 0: - raise ValueError("Source is zero-length: %s" % source) + raise ValueError("Source is zero-length: %s" % self._source.name) + + # get uncompressed file length from gzip trailer (last 4 bytes) + if self.format == "jsongz": + # TODO: check valid gzip + self._source.seek(-4, 2) + self._bytes_size.value = struct.unpack("I", self._source.read(4))[0] + self._source.seek(0) # table info self.db = db @@ -500,6 +515,9 @@ class JsonSourceFile(SourceFile): _buffer_pos = None _buffer_end = None + def read_chunk(self, max_length): + return self._source.read(max_length) + def fill_buffer(self): if self._buffer_str is None: self._buffer_str = "" @@ -520,7 +538,7 @@ def fill_buffer(self): if read_target < 1: raise AssertionError("Can not set the read target and full the buffer") - new_chunk = self._source.read(read_target) + new_chunk = self.read_chunk(read_target) if len(new_chunk) == 0: raise StopIteration() # file ended @@ -634,6 +652,28 @@ def teardown(self): ) +class JsonGzSourceFile(JsonSourceFile): + format = "jsongz" + + def __init__(self, *args, **kwargs): + + # initialize zlib decompressor + # wbits 31 = window size MAX_WBITS & expects gzip header and trailer + self._decompressor = zlib.decompressobj(31) + + super(JsonGzSourceFile, self).__init__(*args, **kwargs) + + def read_chunk(self, max_length): + chunk = b'' + while len(chunk) < max_length: + compressed_buf = self._decompressor.unconsumed_tail + self._source.read(JSON_GZ_READ_CHUNK_SIZE) + if len(compressed_buf) == 0: + break + decompressed_buf = self._decompressor.decompress(compressed_buf, max_length - len(chunk)) + chunk += decompressed_buf + return chunk.decode("utf-8") + + class CsvSourceFile(SourceFile): format = "csv" @@ -855,11 +895,11 @@ def parse_options(argv, prog=None): file_import_group.add_option( "--format", dest="format", - metavar="json|csv", + metavar="json|jsongz|csv", default=None, help="format of the file (default: json, accepts newline delimited json)", type="choice", - choices=["json", "csv"], + choices=["json", "jsongz", "csv"], ) file_import_group.add_option( "--pkey", @@ -1036,7 +1076,7 @@ def parse_options(argv, prog=None): if options.custom_header: options.custom_header = options.custom_header.split(",") - elif options.format == "json": + elif (options.format == "json" or options.format == "jsongz") : # disallow invalid options if options.delimiter is not None: parser.error("--delimiter option is not valid for json files") @@ -1045,13 +1085,6 @@ def parse_options(argv, prog=None): if options.custom_header is not None: parser.error("--custom-header option is not valid for json files") - # default options - options.format = "json" - - if options.max_document_size > 0: - global JSON_MAX_BUFFER_SIZE - JSON_MAX_BUFFER_SIZE = options.max_document_size - options.file = os.path.abspath(options.file) else: @@ -1062,6 +1095,11 @@ def parse_options(argv, prog=None): # -- + # max_document_size - json + if options.max_document_size > 0: + global JSON_MAX_BUFFER_SIZE + JSON_MAX_BUFFER_SIZE = options.max_document_size + # max_nesting_depth if options.max_nesting_depth > 0: global MAX_NESTING_DEPTH @@ -1552,6 +1590,8 @@ def parse_info_file(path): table_type_options = None if ext == ".json": table_type = JsonSourceFile + elif ext == ".jsongz": + table_type = JsonGzSourceFile elif ext == ".csv": table_type = CsvSourceFile table_type_options = { @@ -1622,7 +1662,7 @@ def parse_info_file(path): table, ext = os.path.splitext(filename) table = os.path.basename(table) - if ext not in [".json", ".csv", ".info"]: + if ext not in [".json", ".jsongz", ".csv", ".info"]: files_ignored.append(os.path.join(root, filename)) elif ext == ".info": pass # Info files are included based on the data files @@ -1657,6 +1697,8 @@ def parse_info_file(path): table_type = None if ext == ".json": table_type = JsonSourceFile + elif ext == ".jsongz": + table_type = JsonGzSourceFile elif ext == ".csv": table_type = CsvSourceFile else: