From 5ec52e05ebba1987eac8c86936b3f3e9ac5bd01e Mon Sep 17 00:00:00 2001 From: Cristian Ianto Date: Mon, 25 Jan 2021 01:33:15 +0200 Subject: [PATCH 1/5] feat: jsongz export/import --- rethinkdb/_export.py | 77 ++++++++++++++++++++++++++++++++++++++++++-- rethinkdb/_import.py | 54 ++++++++++++++++++++++++++++--- 2 files changed, 123 insertions(+), 8 deletions(-) mode change 100755 => 100644 rethinkdb/_export.py mode change 100755 => 100644 rethinkdb/_import.py diff --git a/rethinkdb/_export.py b/rethinkdb/_export.py old mode 100755 new mode 100644 index 01bae2f4..c27d9062 --- a/rethinkdb/_export.py +++ b/rethinkdb/_export.py @@ -33,6 +33,7 @@ import tempfile import time import traceback +import zlib from multiprocessing.queues import SimpleQueue import six @@ -48,7 +49,7 @@ usage = """rethinkdb export [-c HOST:PORT] [-p] [--password-file FILENAME] [--tls-cert filename] [-d DIR] [-e (DB | DB.TABLE)]... - [--format (csv | json | ndjson)] [--fields FIELD,FIELD...] [--delimiter CHARACTER] + [--format (csv | json | ndjson | jsongz)] [--fields FIELD,FIELD...] [--delimiter CHARACTER] [--clients NUM]""" help_description = ( "`rethinkdb export` exports data from a RethinkDB cluster into a directory" @@ -118,11 +119,11 @@ def parse_options(argv, prog=None): parser.add_option( "--format", dest="format", - metavar="json|csv|ndjson", + metavar="json|csv|ndjson|jsongz", default="json", help="format to write (defaults to json. ndjson is newline delimited json.)", type="choice", - choices=["json", "csv", "ndjson"], + choices=["json", "csv", "ndjson", "jsongz"], ) parser.add_option( "--clients", @@ -150,6 +151,17 @@ def parse_options(argv, prog=None): ) parser.add_option_group(csvGroup) + jsongzGroup = optparse.OptionGroup(parser, "jsongz options") + jsongzGroup.add_option( + "--compression-level", + dest="compression_level", + metavar="NUM", + default=None, + help="compression level, an integer from 0 to 9 (defaults to -1 default zlib compression)", + type="int", + ) + parser.add_option_group(jsongzGroup) + options, args = parser.parse_args(argv) # -- Check validity of arguments @@ -185,6 +197,15 @@ def parse_options(argv, prog=None): if options.delimiter: parser.error("--delimiter option is only valid for CSV file formats") + if options.format == "jsongz": + if options.compression_level is None: + options.compression_level = -1 + elif options.compression_level < 0 or options.compression_level > 9: + parser.error("--compression-level must be an integer from 0 and 9") + else: + if options.compression_level: + parser.error("--compression-level option is only valid for jsongz file formats") + # - return options @@ -226,6 +247,43 @@ def json_writer(filename, fields, task_queue, error_queue, format): pass +def json_gz_writer(filename, fields, task_queue, error_queue, format, compression_level): + try: + with open(filename, "wb") as out: + # wbits 31 = MAX_WBITS + gzip header and trailer + compressor = zlib.compressobj(compression_level, zlib.DEFLATED, 31) + def compress_write(str): + out.write(compressor.compress(str.encode("utf-8"))) + + first = True + compress_write("[") + item = task_queue.get() + while not isinstance(item, StopIteration): + row = item[0] + if fields is not None: + for item in list(row.keys()): + if item not in fields: + del row[item] + if first: + compress_write("\n") + first = False + else: + compress_write(",\n") + + compress_write(json.dumps(row)) + item = task_queue.get() + + compress_write("\n]\n") + out.write(compressor.flush()) + except BaseException: + ex_type, ex_class, tb = sys.exc_info() + error_queue.put((ex_type, ex_class, traceback.extract_tb(tb))) + + # Read until the exit task so the readers do not hang on pushing onto the queue + while not isinstance(task_queue.get(), StopIteration): + pass + + def csv_writer(filename, fields, delimiter, task_queue, error_queue): try: with open(filename, "w") as out: @@ -331,6 +389,19 @@ def export_table( options.format, ), ) + elif options.format == "jsongz": + filename = directory + "/%s/%s.jsongz" % (db, table) + writer = multiprocessing.Process( + target=json_gz_writer, + args=( + filename, + options.fields, + task_queue, + error_queue, + options.format, + options.compression_level, + ), + ) elif options.format == "csv": filename = directory + "/%s/%s.csv" % (db, table) writer = multiprocessing.Process( diff --git a/rethinkdb/_import.py b/rethinkdb/_import.py old mode 100755 new mode 100644 index 0ce90bfc..d84cf483 --- a/rethinkdb/_import.py +++ b/rethinkdb/_import.py @@ -30,9 +30,11 @@ import optparse import os import signal +import struct import sys import time import traceback +import zlib from multiprocessing.queues import Queue, SimpleQueue import six @@ -56,6 +58,9 @@ JSON_MAX_BUFFER_SIZE = 128 * 1024 * 1024 MAX_NESTING_DEPTH = 100 +# jsongz parameters +JSON_GZ_READ_CHUNK_SIZE = 16 * 1024 + Error = collections.namedtuple("Error", ["message", "traceback", "file"]) @@ -133,7 +138,10 @@ def __init__( self._source = source else: try: - self._source = codecs.open(source, mode="r", encoding="utf-8") + if self.format == "jsongz": + self._source = open(source, mode="rb") + else: + self._source = codecs.open(source, mode="r", encoding="utf-8") except IOError as exc: default_logger.exception(exc) raise ValueError( @@ -145,9 +153,16 @@ def __init__( and self._source.name and os.path.isfile(self._source.name) ): - self._bytes_size.value = os.path.getsize(source) + self._bytes_size.value = os.path.getsize(self._source.name) if self._bytes_size.value == 0: - raise ValueError("Source is zero-length: %s" % source) + raise ValueError("Source is zero-length: %s" % self._source.name) + + # get uncompressed file length from gzip trailer (last 4 bytes) + if self.format == "jsongz": + # TODO: check valid gzip + self._source.seek(-4, 2) + self._bytes_size.value = struct.unpack("I", self._source.read(4))[0] + self._source.seek(0) # table info self.db = db @@ -500,6 +515,9 @@ class JsonSourceFile(SourceFile): _buffer_pos = None _buffer_end = None + def read_chunk(self, max_length): + return self._source.read(max_length) + def fill_buffer(self): if self._buffer_str is None: self._buffer_str = "" @@ -520,7 +538,7 @@ def fill_buffer(self): if read_target < 1: raise AssertionError("Can not set the read target and full the buffer") - new_chunk = self._source.read(read_target) + new_chunk = self.read_chunk(read_target) if len(new_chunk) == 0: raise StopIteration() # file ended @@ -634,6 +652,28 @@ def teardown(self): ) +class JsonGzSourceFile(JsonSourceFile): + format = "jsongz" + + def __init__(self, *args, **kwargs): + + # initialize zlib decompressor + # wbits 31 = window size MAX_WBITS & expects gzip header and trailer + self._decompressor = zlib.decompressobj(31) + + super(JsonGzSourceFile, self).__init__(*args, **kwargs) + + def read_chunk(self, max_length): + chunk = b'' + while len(chunk) < max_length: + compressed_buf = self._decompressor.unconsumed_tail + self._source.read(JSON_GZ_READ_CHUNK_SIZE) + if len(compressed_buf) == 0: + break + decompressed_buf = self._decompressor.decompress(compressed_buf, max_length - len(chunk)) + chunk += decompressed_buf + return chunk.decode("utf-8") + + class CsvSourceFile(SourceFile): format = "csv" @@ -1552,6 +1592,8 @@ def parse_info_file(path): table_type_options = None if ext == ".json": table_type = JsonSourceFile + elif ext == ".jsongz": + table_type = JsonGzSourceFile elif ext == ".csv": table_type = CsvSourceFile table_type_options = { @@ -1622,7 +1664,7 @@ def parse_info_file(path): table, ext = os.path.splitext(filename) table = os.path.basename(table) - if ext not in [".json", ".csv", ".info"]: + if ext not in [".json", ".jsongz", ".csv", ".info"]: files_ignored.append(os.path.join(root, filename)) elif ext == ".info": pass # Info files are included based on the data files @@ -1657,6 +1699,8 @@ def parse_info_file(path): table_type = None if ext == ".json": table_type = JsonSourceFile + elif ext == ".jsongz": + table_type = JsonGzSourceFile elif ext == ".csv": table_type = CsvSourceFile else: From f6e3e40b5371ab95b7b04f10032d1a28148182dd Mon Sep 17 00:00:00 2001 From: Cristian Ianto Date: Fri, 19 Feb 2021 20:38:12 +0200 Subject: [PATCH 2/5] chore: pylintify --- rethinkdb/_export.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rethinkdb/_export.py b/rethinkdb/_export.py index c27d9062..fbae511d 100644 --- a/rethinkdb/_export.py +++ b/rethinkdb/_export.py @@ -247,16 +247,16 @@ def json_writer(filename, fields, task_queue, error_queue, format): pass -def json_gz_writer(filename, fields, task_queue, error_queue, format, compression_level): +def jsongz_writer(filename, fields, task_queue, error_queue, format, compression_level): try: with open(filename, "wb") as out: # wbits 31 = MAX_WBITS + gzip header and trailer compressor = zlib.compressobj(compression_level, zlib.DEFLATED, 31) - def compress_write(str): + def compress_and_write(str): out.write(compressor.compress(str.encode("utf-8"))) first = True - compress_write("[") + compress_and_write("[") item = task_queue.get() while not isinstance(item, StopIteration): row = item[0] @@ -265,15 +265,15 @@ def compress_write(str): if item not in fields: del row[item] if first: - compress_write("\n") + compress_and_write("\n") first = False else: - compress_write(",\n") + compress_and_write(",\n") - compress_write(json.dumps(row)) + compress_and_write(json.dumps(row)) item = task_queue.get() - compress_write("\n]\n") + compress_and_write("\n]\n") out.write(compressor.flush()) except BaseException: ex_type, ex_class, tb = sys.exc_info() @@ -392,7 +392,7 @@ def export_table( elif options.format == "jsongz": filename = directory + "/%s/%s.jsongz" % (db, table) writer = multiprocessing.Process( - target=json_gz_writer, + target=jsongz_writer, args=( filename, options.fields, From a6d8978494581ff8ca096d289b9930648e6620f1 Mon Sep 17 00:00:00 2001 From: Cristian Ianto Date: Mon, 22 Mar 2021 20:27:12 +0200 Subject: [PATCH 3/5] fix: allow single file jsongz import --- rethinkdb/_import.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/rethinkdb/_import.py b/rethinkdb/_import.py index d84cf483..b63bcad9 100644 --- a/rethinkdb/_import.py +++ b/rethinkdb/_import.py @@ -895,11 +895,11 @@ def parse_options(argv, prog=None): file_import_group.add_option( "--format", dest="format", - metavar="json|csv", + metavar="json|jsongz|csv", default=None, help="format of the file (default: json, accepts newline delimited json)", type="choice", - choices=["json", "csv"], + choices=["json", "jsongz", "csv"], ) file_import_group.add_option( "--pkey", @@ -1076,7 +1076,7 @@ def parse_options(argv, prog=None): if options.custom_header: options.custom_header = options.custom_header.split(",") - elif options.format == "json": + elif (options.format == "json" or options.format == "jsongz") : # disallow invalid options if options.delimiter is not None: parser.error("--delimiter option is not valid for json files") @@ -1085,9 +1085,6 @@ def parse_options(argv, prog=None): if options.custom_header is not None: parser.error("--custom-header option is not valid for json files") - # default options - options.format = "json" - if options.max_document_size > 0: global JSON_MAX_BUFFER_SIZE JSON_MAX_BUFFER_SIZE = options.max_document_size From a88f11c365a6fc88b357da9f2f75d2ebf9905dc2 Mon Sep 17 00:00:00 2001 From: Cristian Ianto Date: Mon, 22 Mar 2021 20:36:36 +0200 Subject: [PATCH 4/5] fix: apply --max-document-size for directory imports too --- rethinkdb/_import.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rethinkdb/_import.py b/rethinkdb/_import.py index b63bcad9..8f82ccdd 100644 --- a/rethinkdb/_import.py +++ b/rethinkdb/_import.py @@ -1085,10 +1085,6 @@ def parse_options(argv, prog=None): if options.custom_header is not None: parser.error("--custom-header option is not valid for json files") - if options.max_document_size > 0: - global JSON_MAX_BUFFER_SIZE - JSON_MAX_BUFFER_SIZE = options.max_document_size - options.file = os.path.abspath(options.file) else: @@ -1099,6 +1095,11 @@ def parse_options(argv, prog=None): # -- + # max_document_size - json + if options.max_document_size > 0: + global JSON_MAX_BUFFER_SIZE + JSON_MAX_BUFFER_SIZE = options.max_document_size + # max_nesting_depth if options.max_nesting_depth > 0: global MAX_NESTING_DEPTH From 1e61100d1714f77bc11896faf767e33dcaec1dd0 Mon Sep 17 00:00:00 2001 From: Cristian Ianto Date: Fri, 26 Mar 2021 08:49:30 +0200 Subject: [PATCH 5/5] fix: allow compression_level -1 --- rethinkdb/_export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rethinkdb/_export.py b/rethinkdb/_export.py index fbae511d..62147d8d 100644 --- a/rethinkdb/_export.py +++ b/rethinkdb/_export.py @@ -200,7 +200,7 @@ def parse_options(argv, prog=None): if options.format == "jsongz": if options.compression_level is None: options.compression_level = -1 - elif options.compression_level < 0 or options.compression_level > 9: + elif options.compression_level < -1 or options.compression_level > 9: parser.error("--compression-level must be an integer from 0 and 9") else: if options.compression_level: