Skip to content

Commit 0cbd7a2

Browse files
committed
PYTHON-1860 Use OP_MSG for find/aggregate_raw_batches when supported (#622)
(cherry picked from commit 209d500)
1 parent 2ac2da0 commit 0cbd7a2

File tree

5 files changed

+50
-14
lines changed

5 files changed

+50
-14
lines changed

bson/__init__.py

+10
Original file line numberDiff line numberDiff line change
@@ -1062,6 +1062,16 @@ def _decode_selective(rawdoc, fields, codec_options):
10621062
return doc
10631063

10641064

1065+
def _convert_raw_document_lists_to_streams(document):
1066+
cursor = document.get('cursor')
1067+
if cursor:
1068+
for key in ('firstBatch', 'nextBatch'):
1069+
batch = cursor.get(key)
1070+
if batch:
1071+
stream = b"".join(doc.raw for doc in batch)
1072+
cursor[key] = [stream]
1073+
1074+
10651075
def _decode_all_selective(data, codec_options, fields):
10661076
"""Decode BSON data to a single document while using user-provided
10671077
custom decoding logic.

pymongo/command_cursor.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from collections import deque
1818

19+
from bson import _convert_raw_document_lists_to_streams
1920
from bson.py3compat import integer_types
2021
from pymongo.errors import (ConnectionFailure,
2122
InvalidOperation,
@@ -302,7 +303,13 @@ def __init__(self, collection, cursor_info, address, retrieved=0,
302303

303304
def _unpack_response(self, response, cursor_id, codec_options,
304305
user_fields=None, legacy_response=False):
305-
return response.raw_response(cursor_id)
306+
raw_response = response.raw_response(
307+
cursor_id, user_fields=user_fields)
308+
if not legacy_response:
309+
# OP_MSG returns firstBatch/nextBatch documents as a BSON array
310+
# Re-assemble the array of documents into a document stream
311+
_convert_raw_document_lists_to_streams(raw_response[0])
312+
return raw_response
306313

307314
def __getitem__(self, index):
308315
raise InvalidOperation("Cannot call __getitem__ on RawBatchCursor")

pymongo/cursor.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from collections import deque
2121

22-
from bson import RE_TYPE
22+
from bson import RE_TYPE, _convert_raw_document_lists_to_streams
2323
from bson.code import Code
2424
from bson.py3compat import (iteritems,
2525
integer_types,
@@ -1137,7 +1137,6 @@ def _refresh(self):
11371137
limit = min(limit, self.__batch_size)
11381138
else:
11391139
limit = self.__batch_size
1140-
11411140
# Exhaust cursors don't send getMore messages.
11421141
g = self._getmore_class(self.__dbname,
11431142
self.__collname,
@@ -1303,7 +1302,13 @@ def __init__(self, *args, **kwargs):
13031302

13041303
def _unpack_response(self, response, cursor_id, codec_options,
13051304
user_fields=None, legacy_response=False):
1306-
return response.raw_response(cursor_id)
1305+
raw_response = response.raw_response(
1306+
cursor_id, user_fields=user_fields)
1307+
if not legacy_response:
1308+
# OP_MSG returns firstBatch/nextBatch documents as a BSON array
1309+
# Re-assemble the array of documents into a document stream
1310+
_convert_raw_document_lists_to_streams(raw_response[0])
1311+
return raw_response
13071312

13081313
def explain(self):
13091314
"""Returns an explain plan record for this cursor.

pymongo/message.py

+21-10
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
from bson import (CodecOptions,
2929
decode,
3030
encode,
31+
_decode_selective,
3132
_dict_to_bson,
3233
_make_c_string)
3334
from bson.codec_options import DEFAULT_CODEC_OPTIONS
34-
from bson.raw_bson import _inflate_bson, DEFAULT_RAW_BSON_OPTIONS
35+
from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS,
36+
RawBSONDocument)
3537
from bson.py3compat import b, StringIO
3638
from bson.son import SON
3739

@@ -442,28 +444,30 @@ def get_message(self, dummy0, sock_info, use_cmd=False):
442444
return get_more(ns, self.ntoreturn, self.cursor_id, ctx)
443445

444446

445-
# TODO: Use OP_MSG once the server is able to respond with document streams.
446447
class _RawBatchQuery(_Query):
447448
def use_command(self, socket_info, exhaust):
448449
# Compatibility checks.
449450
super(_RawBatchQuery, self).use_command(socket_info, exhaust)
450-
451+
# Use OP_MSG when available.
452+
if socket_info.op_msg_enabled and not exhaust:
453+
return True
451454
return False
452455

453456
def get_message(self, set_slave_ok, sock_info, use_cmd=False):
454-
# Always pass False for use_cmd.
455457
return super(_RawBatchQuery, self).get_message(
456-
set_slave_ok, sock_info, False)
458+
set_slave_ok, sock_info, use_cmd)
457459

458460

459461
class _RawBatchGetMore(_GetMore):
460462
def use_command(self, socket_info, exhaust):
463+
# Use OP_MSG when available.
464+
if socket_info.op_msg_enabled and not exhaust:
465+
return True
461466
return False
462467

463468
def get_message(self, set_slave_ok, sock_info, use_cmd=False):
464-
# Always pass False for use_cmd.
465469
return super(_RawBatchGetMore, self).get_message(
466-
set_slave_ok, sock_info, False)
470+
set_slave_ok, sock_info, use_cmd)
467471

468472

469473
class _CursorAddress(tuple):
@@ -1492,7 +1496,7 @@ def __init__(self, flags, cursor_id, number_returned, documents):
14921496
self.number_returned = number_returned
14931497
self.documents = documents
14941498

1495-
def raw_response(self, cursor_id=None):
1499+
def raw_response(self, cursor_id=None, user_fields=None):
14961500
"""Check the response header from the database, without decoding BSON.
14971501
14981502
Check the response for errors and unpack.
@@ -1602,8 +1606,15 @@ def __init__(self, flags, payload_document):
16021606
self.flags = flags
16031607
self.payload_document = payload_document
16041608

1605-
def raw_response(self, cursor_id=None):
1606-
raise NotImplementedError
1609+
def raw_response(self, cursor_id=None, user_fields={}):
1610+
"""
1611+
cursor_id is ignored
1612+
user_fields is used to determine which fields must not be decoded
1613+
"""
1614+
inflated_response = _decode_selective(
1615+
RawBSONDocument(self.payload_document), user_fields,
1616+
DEFAULT_RAW_BSON_OPTIONS)
1617+
return [inflated_response]
16071618

16081619
def unpack_response(self, cursor_id=None,
16091620
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,

test/test_cursor.py

+3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
OperationFailure)
4242
from pymongo.read_concern import ReadConcern
4343
from pymongo.read_preferences import ReadPreference
44+
from pymongo.write_concern import WriteConcern
4445
from test import (client_context,
4546
unittest,
4647
IntegrationTest)
@@ -1526,6 +1527,8 @@ def test_collation_error(self):
15261527

15271528
@client_context.require_version_min(3, 2)
15281529
def test_read_concern(self):
1530+
self.db.get_collection(
1531+
"test", write_concern=WriteConcern(w="majority")).insert_one({})
15291532
c = self.db.get_collection("test", read_concern=ReadConcern("majority"))
15301533
next(c.find_raw_batches())
15311534

0 commit comments

Comments
 (0)