Skip to content

Commit 4d4fdda

Browse files
authored
PYTHON-3363 Allow change stream to be resumed after a timeout (#1014)
Apply client timeoutMS to ChangeStream iteration.
1 parent 935f926 commit 4d4fdda

File tree

4 files changed

+56
-16
lines changed

4 files changed

+56
-16
lines changed

pymongo/change_stream.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from bson import _bson_to_dict
2121
from bson.raw_bson import RawBSONDocument
2222
from bson.timestamp import Timestamp
23-
from pymongo import common
23+
from pymongo import _csot, common
2424
from pymongo.aggregation import (
2525
_CollectionAggregationCommand,
2626
_DatabaseAggregationCommand,
@@ -128,6 +128,8 @@ def __init__(
128128
self._start_at_operation_time = start_at_operation_time
129129
self._session = session
130130
self._comment = comment
131+
self._closed = False
132+
self._timeout = self._target._timeout
131133
# Initialize cursor.
132134
self._cursor = self._create_cursor()
133135

@@ -234,6 +236,7 @@ def _resume(self):
234236

235237
def close(self) -> None:
236238
"""Close this ChangeStream."""
239+
self._closed = True
237240
self._cursor.close()
238241

239242
def __iter__(self) -> "ChangeStream[_DocumentType]":
@@ -248,6 +251,7 @@ def resume_token(self) -> Optional[Mapping[str, Any]]:
248251
"""
249252
return copy.deepcopy(self._resume_token)
250253

254+
@_csot.apply
251255
def next(self) -> _DocumentType:
252256
"""Advance the cursor.
253257
@@ -298,8 +302,9 @@ def alive(self) -> bool:
298302
299303
.. versionadded:: 3.8
300304
"""
301-
return self._cursor.alive
305+
return not self._closed
302306

307+
@_csot.apply
303308
def try_next(self) -> Optional[_DocumentType]:
304309
"""Advance the cursor without blocking indefinitely.
305310
@@ -332,6 +337,9 @@ def try_next(self) -> Optional[_DocumentType]:
332337
333338
.. versionadded:: 3.8
334339
"""
340+
if not self._closed and not self._cursor.alive:
341+
self._resume()
342+
335343
# Attempt to get the next change with at most one getMore and at most
336344
# one resume attempt.
337345
try:
@@ -350,6 +358,10 @@ def try_next(self) -> Optional[_DocumentType]:
350358
self._resume()
351359
change = self._cursor._try_next(False)
352360

361+
# Check if the cursor was invalidated.
362+
if not self._cursor.alive:
363+
self._closed = True
364+
353365
# If no changes are available.
354366
if change is None:
355367
# We have either iterated over all documents in the cursor,

test/test_change_stream.py

+6-8
Original file line numberDiff line numberDiff line change
@@ -486,16 +486,15 @@ def _get_expected_resume_token(self, stream, listener, previous_change=None):
486486
return response["cursor"]["postBatchResumeToken"]
487487

488488
@no_type_check
489-
def _test_raises_error_on_missing_id(self, expected_exception):
489+
def _test_raises_error_on_missing_id(self, expected_exception, expected_exception2):
490490
"""ChangeStream will raise an exception if the server response is
491491
missing the resume token.
492492
"""
493493
with self.change_stream([{"$project": {"_id": 0}}]) as change_stream:
494494
self.watched_collection().insert_one({})
495495
with self.assertRaises(expected_exception):
496496
next(change_stream)
497-
# The cursor should now be closed.
498-
with self.assertRaises(StopIteration):
497+
with self.assertRaises(expected_exception2):
499498
next(change_stream)
500499

501500
@no_type_check
@@ -525,17 +524,16 @@ def test_update_resume_token_legacy(self):
525524
self._test_update_resume_token(self._get_expected_resume_token_legacy)
526525

527526
# Prose test no. 2
528-
@client_context.require_version_max(4, 3, 3) # PYTHON-2120
529527
@client_context.require_version_min(4, 1, 8)
530528
def test_raises_error_on_missing_id_418plus(self):
531-
# Server returns an error on 4.1.8+
532-
self._test_raises_error_on_missing_id(OperationFailure)
529+
# Server returns an error on 4.1.8+, subsequent next() resumes and gets the same error.
530+
self._test_raises_error_on_missing_id(OperationFailure, OperationFailure)
533531

534532
# Prose test no. 2
535533
@client_context.require_version_max(4, 1, 8)
536534
def test_raises_error_on_missing_id_418minus(self):
537-
# PyMongo raises an error
538-
self._test_raises_error_on_missing_id(InvalidOperation)
535+
# PyMongo raises an error, closes the cursor, subsequent next() raises StopIteration.
536+
self._test_raises_error_on_missing_id(InvalidOperation, StopIteration)
539537

540538
# Prose test no. 3
541539
@no_type_check

test/test_csot.py

+32-1
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919

2020
sys.path[0:0] = [""]
2121

22-
from test import IntegrationTest, unittest
22+
from test import IntegrationTest, client_context, unittest
2323
from test.unified_format import generate_test_classes
2424

2525
import pymongo
2626
from pymongo import _csot
27+
from pymongo.errors import PyMongoError
2728

2829
# Location of JSON test specifications.
2930
TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "csot")
@@ -72,6 +73,36 @@ def test_timeout_nested(self):
7273
self.assertEqual(_csot.get_deadline(), float("inf"))
7374
self.assertEqual(_csot.get_rtt(), 0.0)
7475

76+
@client_context.require_version_min(3, 6)
77+
@client_context.require_no_mmap
78+
@client_context.require_no_standalone
79+
def test_change_stream_can_resume_after_timeouts(self):
80+
coll = self.db.test
81+
with coll.watch(max_await_time_ms=150) as stream:
82+
with pymongo.timeout(0.1):
83+
with self.assertRaises(PyMongoError) as ctx:
84+
stream.try_next()
85+
self.assertTrue(ctx.exception.timeout)
86+
self.assertTrue(stream.alive)
87+
with self.assertRaises(PyMongoError) as ctx:
88+
stream.try_next()
89+
self.assertTrue(ctx.exception.timeout)
90+
self.assertTrue(stream.alive)
91+
# Resume before the insert on 3.6 because 4.0 is required to avoid skipping documents
92+
if client_context.version < (4, 0):
93+
stream.try_next()
94+
coll.insert_one({})
95+
with pymongo.timeout(10):
96+
self.assertTrue(stream.next())
97+
self.assertTrue(stream.alive)
98+
# Timeout applies to entire next() call, not only individual commands.
99+
with pymongo.timeout(0.5):
100+
with self.assertRaises(PyMongoError) as ctx:
101+
stream.next()
102+
self.assertTrue(ctx.exception.timeout)
103+
self.assertTrue(stream.alive)
104+
self.assertFalse(stream.alive)
105+
75106

76107
if __name__ == "__main__":
77108
unittest.main()

test/unified_format.py

+4-5
Original file line numberDiff line numberDiff line change
@@ -1078,10 +1078,6 @@ def _sessionOperation_startTransaction(self, target, *args, **kwargs):
10781078
self.__raise_if_unsupported("startTransaction", target, ClientSession)
10791079
return target.start_transaction(*args, **kwargs)
10801080

1081-
def _cursor_iterateOnce(self, target, *args, **kwargs):
1082-
self.__raise_if_unsupported("iterateOnce", target, NonLazyCursor, ChangeStream)
1083-
return target.try_next()
1084-
10851081
def _changeStreamOperation_iterateUntilDocumentOrError(self, target, *args, **kwargs):
10861082
self.__raise_if_unsupported("iterateUntilDocumentOrError", target, ChangeStream)
10871083
return next(target)
@@ -1204,8 +1200,11 @@ def run_entity_operation(self, spec):
12041200
try:
12051201
method = getattr(self, method_name)
12061202
except AttributeError:
1203+
target_opname = camel_to_snake(opname)
1204+
if target_opname == "iterate_once":
1205+
target_opname = "try_next"
12071206
try:
1208-
cmd = getattr(target, camel_to_snake(opname))
1207+
cmd = getattr(target, target_opname)
12091208
except AttributeError:
12101209
self.fail("Unsupported operation %s on entity %s" % (opname, target))
12111210
else:

0 commit comments

Comments
 (0)