From 216407ea9b8201e5839cfebe585147c361ea6494 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 21 Apr 2025 16:45:47 -0400 Subject: [PATCH 01/10] PYTHON-5212 - Do not hold Topology lock while resetting pool --- pymongo/asynchronous/pool.py | 28 ++++++++++++++++++++++------ pymongo/asynchronous/topology.py | 11 +++++------ pymongo/synchronous/pool.py | 28 ++++++++++++++++++++++------ pymongo/synchronous/topology.py | 11 +++++------ 4 files changed, 54 insertions(+), 24 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a67cc5f3c8..aba4df3fda 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -14,6 +14,7 @@ from __future__ import annotations +import asyncio import collections import contextlib import logging @@ -860,8 +861,13 @@ async def _reset( # PoolClosedEvent but that reset() SHOULD close sockets *after* # publishing the PoolClearedEvent. if close: - for conn in sockets: - await conn.close_conn(ConnectionClosedReason.POOL_CLOSED) + if not _IS_SYNC: + await asyncio.gather( + *[conn._close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets] + ) + else: + for conn in sockets: + await conn.close_conn(ConnectionClosedReason.POOL_CLOSED) if self.enabled_for_cmap: assert listeners is not None listeners.publish_pool_closed(self.address) @@ -891,8 +897,13 @@ async def _reset( serverPort=self.address[1], serviceId=service_id, ) - for conn in sockets: - await conn.close_conn(ConnectionClosedReason.STALE) + if not _IS_SYNC: + await asyncio.gather( + *[conn._close_conn(ConnectionClosedReason.STALE) for conn in sockets] + ) + else: + for conn in sockets: + await conn.close_conn(ConnectionClosedReason.STALE) async def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the @@ -938,8 +949,13 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds ): close_conns.append(self.conns.pop()) - for conn in close_conns: - await conn.close_conn(ConnectionClosedReason.IDLE) + if not _IS_SYNC: + await asyncio.gather( + *[conn._close_conn(ConnectionClosedReason.IDLE) for conn in close_conns] + ) + else: + for conn in close_conns: + await conn.close_conn(ConnectionClosedReason.IDLE) while True: async with self.size_cond: diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 32776bf7b9..6e114736a3 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -529,12 +529,6 @@ async def _process_change( if not _IS_SYNC: self._monitor_tasks.append(self._srv_monitor) - # Clear the pool from a failed heartbeat. - if reset_pool: - server = self._servers.get(server_description.address) - if server: - await server.pool.reset(interrupt_connections=interrupt_connections) - # Wake anything waiting in select_servers(). self._condition.notify_all() @@ -557,6 +551,11 @@ async def on_change( # that didn't include this server. if self._opened and self._description.has_server(server_description.address): await self._process_change(server_description, reset_pool, interrupt_connections) + # Clear the pool from a failed heartbeat, done outside the lock to avoid blocking on connection close. + if self._opened and self._description.has_server(server_description.address) and reset_pool: + server = self._servers.get(server_description.address) + if server: + await server.pool.reset(interrupt_connections=interrupt_connections) async def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None: """Process a new seedlist on an opened topology. diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 224834af31..331794d528 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -14,6 +14,7 @@ from __future__ import annotations +import asyncio import collections import contextlib import logging @@ -858,8 +859,13 @@ def _reset( # PoolClosedEvent but that reset() SHOULD close sockets *after* # publishing the PoolClearedEvent. if close: - for conn in sockets: - conn.close_conn(ConnectionClosedReason.POOL_CLOSED) + if not _IS_SYNC: + asyncio.gather( + *[conn._close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets] + ) + else: + for conn in sockets: + conn.close_conn(ConnectionClosedReason.POOL_CLOSED) if self.enabled_for_cmap: assert listeners is not None listeners.publish_pool_closed(self.address) @@ -889,8 +895,13 @@ def _reset( serverPort=self.address[1], serviceId=service_id, ) - for conn in sockets: - conn.close_conn(ConnectionClosedReason.STALE) + if not _IS_SYNC: + asyncio.gather( + *[conn._close_conn(ConnectionClosedReason.STALE) for conn in sockets] + ) + else: + for conn in sockets: + conn.close_conn(ConnectionClosedReason.STALE) def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the @@ -934,8 +945,13 @@ def remove_stale_sockets(self, reference_generation: int) -> None: and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds ): close_conns.append(self.conns.pop()) - for conn in close_conns: - conn.close_conn(ConnectionClosedReason.IDLE) + if not _IS_SYNC: + asyncio.gather( + *[conn._close_conn(ConnectionClosedReason.IDLE) for conn in close_conns] + ) + else: + for conn in close_conns: + conn.close_conn(ConnectionClosedReason.IDLE) while True: with self.size_cond: diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index df23bff28c..39c204b19a 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -529,12 +529,6 @@ def _process_change( if not _IS_SYNC: self._monitor_tasks.append(self._srv_monitor) - # Clear the pool from a failed heartbeat. - if reset_pool: - server = self._servers.get(server_description.address) - if server: - server.pool.reset(interrupt_connections=interrupt_connections) - # Wake anything waiting in select_servers(). self._condition.notify_all() @@ -557,6 +551,11 @@ def on_change( # that didn't include this server. if self._opened and self._description.has_server(server_description.address): self._process_change(server_description, reset_pool, interrupt_connections) + # Clear the pool from a failed heartbeat, done outside the lock to avoid blocking on connection close. + if self._opened and self._description.has_server(server_description.address) and reset_pool: + server = self._servers.get(server_description.address) + if server: + server.pool.reset(interrupt_connections=interrupt_connections) def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None: """Process a new seedlist on an opened topology. From 4f0c8e44a56c45e5fd6b5c6ff15f496d23f27db5 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 21 Apr 2025 16:48:32 -0400 Subject: [PATCH 02/10] Use correct close_conn --- pymongo/asynchronous/pool.py | 6 +++--- pymongo/synchronous/pool.py | 8 +++----- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index aba4df3fda..94bd46f68b 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -863,7 +863,7 @@ async def _reset( if close: if not _IS_SYNC: await asyncio.gather( - *[conn._close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets] + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets] ) else: for conn in sockets: @@ -899,7 +899,7 @@ async def _reset( ) if not _IS_SYNC: await asyncio.gather( - *[conn._close_conn(ConnectionClosedReason.STALE) for conn in sockets] + *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets] ) else: for conn in sockets: @@ -951,7 +951,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: close_conns.append(self.conns.pop()) if not _IS_SYNC: await asyncio.gather( - *[conn._close_conn(ConnectionClosedReason.IDLE) for conn in close_conns] + *[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns] ) else: for conn in close_conns: diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 331794d528..52872e17c6 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -861,7 +861,7 @@ def _reset( if close: if not _IS_SYNC: asyncio.gather( - *[conn._close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets] + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets] ) else: for conn in sockets: @@ -896,9 +896,7 @@ def _reset( serviceId=service_id, ) if not _IS_SYNC: - asyncio.gather( - *[conn._close_conn(ConnectionClosedReason.STALE) for conn in sockets] - ) + asyncio.gather(*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets]) else: for conn in sockets: conn.close_conn(ConnectionClosedReason.STALE) @@ -947,7 +945,7 @@ def remove_stale_sockets(self, reference_generation: int) -> None: close_conns.append(self.conns.pop()) if not _IS_SYNC: asyncio.gather( - *[conn._close_conn(ConnectionClosedReason.IDLE) for conn in close_conns] + *[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns] ) else: for conn in close_conns: From b7232db071c42e9015fde823a06ac12c3b7f77db Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 22 Apr 2025 10:09:50 -0400 Subject: [PATCH 03/10] Address review --- pymongo/asynchronous/pool.py | 9 ++++++--- pymongo/asynchronous/topology.py | 2 +- pymongo/synchronous/pool.py | 11 ++++++++--- pymongo/synchronous/topology.py | 2 +- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 94bd46f68b..8b18ab927b 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -863,7 +863,8 @@ async def _reset( if close: if not _IS_SYNC: await asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets] + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], + return_exceptions=True, ) else: for conn in sockets: @@ -899,7 +900,8 @@ async def _reset( ) if not _IS_SYNC: await asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets] + *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], + return_exceptions=True, ) else: for conn in sockets: @@ -951,7 +953,8 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: close_conns.append(self.conns.pop()) if not _IS_SYNC: await asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns] + *[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns], + return_exceptions=True, ) else: for conn in close_conns: diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 6e114736a3..438dd1e352 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -552,7 +552,7 @@ async def on_change( if self._opened and self._description.has_server(server_description.address): await self._process_change(server_description, reset_pool, interrupt_connections) # Clear the pool from a failed heartbeat, done outside the lock to avoid blocking on connection close. - if self._opened and self._description.has_server(server_description.address) and reset_pool: + if reset_pool: server = self._servers.get(server_description.address) if server: await server.pool.reset(interrupt_connections=interrupt_connections) diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 52872e17c6..b3eec64f27 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -861,7 +861,8 @@ def _reset( if close: if not _IS_SYNC: asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets] + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], + return_exceptions=True, ) else: for conn in sockets: @@ -896,7 +897,10 @@ def _reset( serviceId=service_id, ) if not _IS_SYNC: - asyncio.gather(*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets]) + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], + return_exceptions=True, + ) else: for conn in sockets: conn.close_conn(ConnectionClosedReason.STALE) @@ -945,7 +949,8 @@ def remove_stale_sockets(self, reference_generation: int) -> None: close_conns.append(self.conns.pop()) if not _IS_SYNC: asyncio.gather( - *[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns] + *[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns], + return_exceptions=True, ) else: for conn in close_conns: diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 39c204b19a..1e99adf726 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -552,7 +552,7 @@ def on_change( if self._opened and self._description.has_server(server_description.address): self._process_change(server_description, reset_pool, interrupt_connections) # Clear the pool from a failed heartbeat, done outside the lock to avoid blocking on connection close. - if self._opened and self._description.has_server(server_description.address) and reset_pool: + if reset_pool: server = self._servers.get(server_description.address) if server: server.pool.reset(interrupt_connections=interrupt_connections) From f5166be2df67e4ea46e1ef39e0564df073cb7feb Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 23 Apr 2025 09:38:43 -0400 Subject: [PATCH 04/10] Add test --- test/__init__.py | 8 +++ test/asynchronous/__init__.py | 8 +++ .../test_discovery_and_monitoring.py | 69 +++++++++++++++++++ test/test_discovery_and_monitoring.py | 67 ++++++++++++++++++ tools/synchro.py | 11 ++- 5 files changed, 162 insertions(+), 1 deletion(-) diff --git a/test/__init__.py b/test/__init__.py index 7ae3432062..39b4045e66 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -826,6 +826,14 @@ def require_sync(self, func): lambda: _IS_SYNC, "This test only works with the synchronous API", func=func ) + def require_async(self, func): + """Run a test only if using the asynchronous API.""" # unasync: off + return self._require( + lambda: not _IS_SYNC, + "This test only works with the asynchronous API", # unasync: off + func=func, + ) + def mongos_seeds(self): return ",".join("{}:{}".format(*address) for address in self.mongoses) diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index c57bf2a880..882cb6110f 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -828,6 +828,14 @@ def require_sync(self, func): lambda: _IS_SYNC, "This test only works with the synchronous API", func=func ) + def require_async(self, func): + """Run a test only if using the asynchronous API.""" # unasync: off + return self._require( + lambda: not _IS_SYNC, + "This test only works with the asynchronous API", # unasync: off + func=func, + ) + def mongos_seeds(self): return ",".join("{}:{}".format(*address) for address in self.mongoses) diff --git a/test/asynchronous/test_discovery_and_monitoring.py b/test/asynchronous/test_discovery_and_monitoring.py index fa62b25dd1..da547a0541 100644 --- a/test/asynchronous/test_discovery_and_monitoring.py +++ b/test/asynchronous/test_discovery_and_monitoring.py @@ -20,10 +20,15 @@ import socketserver import sys import threading +import time from asyncio import StreamReader, StreamWriter from pathlib import Path from test.asynchronous.helpers import ConcurrentRunner +from pymongo.asynchronous.pool import AsyncConnection +from pymongo.operations import _Op +from pymongo.server_selectors import readable_server_selector + sys.path[0:0] = [""] from test.asynchronous import ( @@ -46,6 +51,7 @@ async_barrier_wait, async_create_barrier, async_wait_until, + delay, server_name_to_type, ) from unittest.mock import patch @@ -370,6 +376,69 @@ async def test_pool_unpause(self): await listener.async_wait_for_event(monitoring.ServerHeartbeatSucceededEvent, 1) await listener.async_wait_for_event(monitoring.PoolReadyEvent, 1) + @async_client_context.require_failCommand_appName + @async_client_context.require_test_commands + @async_client_context.require_async + async def test_connection_close_does_not_block_other_operations(self): + listener = CMAPHeartbeatListener() + client = await self.async_single_client( + appName="SDAMConnectionCloseTest", + event_listeners=[listener], + heartbeatFrequencyMS=500, + minPoolSize=10, + ) + server = await (await client._get_topology()).select_server( + readable_server_selector, _Op.TEST + ) + await async_wait_until( + lambda: len(server._pool.conns) == 10, + "pool initialized with 10 connections", + ) + + await client.db.test.insert_one({"x": 1}) + close_delay = 0.05 + latencies = [] + + async def run_task(): + while True: + start_time = time.monotonic() + await client.db.test.find_one({}) + elapsed = time.monotonic() - start_time + latencies.append(elapsed) + if elapsed >= close_delay: + break + await asyncio.sleep(0.001) + + task = ConcurrentRunner(target=run_task) + await task.start() + original_close = AsyncConnection.close_conn + try: + # Artificially delay the close operation to simulate a slow close + async def mock_close(self, reason): + await asyncio.sleep(close_delay) + await original_close(self, reason) + + AsyncConnection.close_conn = mock_close + + fail_hello = { + "mode": {"times": 4}, + "data": { + "failCommands": [HelloCompat.LEGACY_CMD, "hello"], + "errorCode": 91, + "appName": "SDAMConnectionCloseTest", + }, + } + async with self.fail_point(fail_hello): + # Wait for server heartbeat to fail + await listener.async_wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1) + # Wait until all idle connections are closed to simulate real-world conditions + await listener.async_wait_for_event(monitoring.ConnectionClosedEvent, 10) + # No operation latency should not significantly exceed close_delay + self.assertLessEqual(max(latencies), close_delay * 1.5) + finally: + AsyncConnection.close_conn = original_close + await task.join() + class TestServerMonitoringMode(AsyncIntegrationTest): @async_client_context.require_no_serverless diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index 07720473ca..ff164b3a83 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -20,10 +20,15 @@ import socketserver import sys import threading +import time from asyncio import StreamReader, StreamWriter from pathlib import Path from test.helpers import ConcurrentRunner +from pymongo.operations import _Op +from pymongo.server_selectors import readable_server_selector +from pymongo.synchronous.pool import Connection + sys.path[0:0] = [""] from test import ( @@ -45,6 +50,7 @@ assertion_context, barrier_wait, create_barrier, + delay, server_name_to_type, wait_until, ) @@ -370,6 +376,67 @@ def test_pool_unpause(self): listener.wait_for_event(monitoring.ServerHeartbeatSucceededEvent, 1) listener.wait_for_event(monitoring.PoolReadyEvent, 1) + @client_context.require_failCommand_appName + @client_context.require_test_commands + @client_context.require_async + def test_connection_close_does_not_block_other_operations(self): + listener = CMAPHeartbeatListener() + client = self.single_client( + appName="SDAMConnectionCloseTest", + event_listeners=[listener], + heartbeatFrequencyMS=500, + minPoolSize=10, + ) + server = (client._get_topology()).select_server(readable_server_selector, _Op.TEST) + wait_until( + lambda: len(server._pool.conns) == 10, + "pool initialized with 10 connections", + ) + + client.db.test.insert_one({"x": 1}) + close_delay = 0.05 + latencies = [] + + def run_task(): + while True: + start_time = time.monotonic() + client.db.test.find_one({}) + elapsed = time.monotonic() - start_time + latencies.append(elapsed) + if elapsed >= close_delay: + break + time.sleep(0.001) + + task = ConcurrentRunner(target=run_task) + task.start() + original_close = Connection.close_conn + try: + # Artificially delay the close operation to simulate a slow close + def mock_close(self, reason): + time.sleep(close_delay) + original_close(self, reason) + + Connection.close_conn = mock_close + + fail_hello = { + "mode": {"times": 4}, + "data": { + "failCommands": [HelloCompat.LEGACY_CMD, "hello"], + "errorCode": 91, + "appName": "SDAMConnectionCloseTest", + }, + } + with self.fail_point(fail_hello): + # Wait for server heartbeat to fail + listener.wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1) + # Wait until all idle connections are closed to simulate real-world conditions + listener.wait_for_event(monitoring.ConnectionClosedEvent, 10) + # No operation latency should not significantly exceed close_delay + self.assertLessEqual(max(latencies), close_delay * 1.5) + finally: + Connection.close_conn = original_close + task.join() + class TestServerMonitoringMode(IntegrationTest): @client_context.require_no_serverless diff --git a/tools/synchro.py b/tools/synchro.py index f6176e2038..bfe8f71125 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -288,7 +288,8 @@ def process_files( if file in docstring_translate_files: lines = translate_docstrings(lines) if file in sync_test_files: - translate_imports(lines) + lines = translate_imports(lines) + lines = process_ignores(lines) f.seek(0) f.writelines(lines) f.truncate() @@ -390,6 +391,14 @@ def translate_docstrings(lines: list[str]) -> list[str]: return [line for line in lines if line != "DOCSTRING_REMOVED"] +def process_ignores(lines: list[str]) -> list[str]: + for i in range(len(lines)): + for k, v in replacements.items(): + if "unasync: off" in lines[i] and v in lines[i]: + lines[i] = lines[i].replace(v, k) + return lines + + def unasync_directory(files: list[str], src: str, dest: str, replacements: dict[str, str]) -> None: unasync_files( files, From 1b34973d7afc0bab74c81ede079fd5a15b64388c Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 23 Apr 2025 10:43:41 -0400 Subject: [PATCH 05/10] Removed unused import --- test/asynchronous/test_discovery_and_monitoring.py | 1 - test/test_discovery_and_monitoring.py | 1 - 2 files changed, 2 deletions(-) diff --git a/test/asynchronous/test_discovery_and_monitoring.py b/test/asynchronous/test_discovery_and_monitoring.py index da547a0541..d57e02aaee 100644 --- a/test/asynchronous/test_discovery_and_monitoring.py +++ b/test/asynchronous/test_discovery_and_monitoring.py @@ -51,7 +51,6 @@ async_barrier_wait, async_create_barrier, async_wait_until, - delay, server_name_to_type, ) from unittest.mock import patch diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index ff164b3a83..b957eaf350 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -50,7 +50,6 @@ assertion_context, barrier_wait, create_barrier, - delay, server_name_to_type, wait_until, ) From c612c58e52b1cfb192500fdba5792176ff54a0f2 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 23 Apr 2025 12:11:07 -0400 Subject: [PATCH 06/10] Increase test threshold to account for variance --- test/asynchronous/test_discovery_and_monitoring.py | 2 +- test/test_discovery_and_monitoring.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_discovery_and_monitoring.py b/test/asynchronous/test_discovery_and_monitoring.py index d57e02aaee..9be412071b 100644 --- a/test/asynchronous/test_discovery_and_monitoring.py +++ b/test/asynchronous/test_discovery_and_monitoring.py @@ -433,7 +433,7 @@ async def mock_close(self, reason): # Wait until all idle connections are closed to simulate real-world conditions await listener.async_wait_for_event(monitoring.ConnectionClosedEvent, 10) # No operation latency should not significantly exceed close_delay - self.assertLessEqual(max(latencies), close_delay * 1.5) + self.assertLessEqual(max(latencies), close_delay * 2.0) finally: AsyncConnection.close_conn = original_close await task.join() diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index b957eaf350..7c2a425033 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -431,7 +431,7 @@ def mock_close(self, reason): # Wait until all idle connections are closed to simulate real-world conditions listener.wait_for_event(monitoring.ConnectionClosedEvent, 10) # No operation latency should not significantly exceed close_delay - self.assertLessEqual(max(latencies), close_delay * 1.5) + self.assertLessEqual(max(latencies), close_delay * 2.0) finally: Connection.close_conn = original_close task.join() From c5bf1ab25a6ae2f0fc94b770a25024004a5a26cb Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 23 Apr 2025 12:23:08 -0400 Subject: [PATCH 07/10] Disable test_continuous_network_errors with debug logs --- test/asynchronous/test_client.py | 1 + test/test_client.py | 1 + 2 files changed, 2 insertions(+) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index b9deb985bd..fc4628f7f9 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -1907,6 +1907,7 @@ async def test_direct_connection(self): AsyncMongoClient(["host1", "host2"], directConnection=True) @unittest.skipIf("PyPy" in sys.version, "PYTHON-2927 fails often on PyPy") + @skipIf(os.environ.get("DEBUG_LOG"), "Enabling debug logs breaks this test") async def test_continuous_network_errors(self): def server_description_count(): i = 0 diff --git a/test/test_client.py b/test/test_client.py index c2df8ab2b6..03c158b99a 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1864,6 +1864,7 @@ def test_direct_connection(self): MongoClient(["host1", "host2"], directConnection=True) @unittest.skipIf("PyPy" in sys.version, "PYTHON-2927 fails often on PyPy") + @skipIf(os.environ.get("DEBUG_LOG"), "Enabling debug logs breaks this test") def test_continuous_network_errors(self): def server_description_count(): i = 0 From 10406de48ed4f59f9e7fc43a9015eefaaa5da758 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 23 Apr 2025 14:11:37 -0400 Subject: [PATCH 08/10] Address review --- test/asynchronous/test_discovery_and_monitoring.py | 12 +++++++----- test/test_discovery_and_monitoring.py | 12 +++++++----- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/test/asynchronous/test_discovery_and_monitoring.py b/test/asynchronous/test_discovery_and_monitoring.py index 9be412071b..8c530b4ab6 100644 --- a/test/asynchronous/test_discovery_and_monitoring.py +++ b/test/asynchronous/test_discovery_and_monitoring.py @@ -27,7 +27,7 @@ from pymongo.asynchronous.pool import AsyncConnection from pymongo.operations import _Op -from pymongo.server_selectors import readable_server_selector +from pymongo.server_selectors import writable_server_selector sys.path[0:0] = [""] @@ -387,7 +387,7 @@ async def test_connection_close_does_not_block_other_operations(self): minPoolSize=10, ) server = await (await client._get_topology()).select_server( - readable_server_selector, _Op.TEST + writable_server_selector, _Op.TEST ) await async_wait_until( lambda: len(server._pool.conns) == 10, @@ -395,8 +395,9 @@ async def test_connection_close_does_not_block_other_operations(self): ) await client.db.test.insert_one({"x": 1}) - close_delay = 0.05 + close_delay = 0.1 latencies = [] + should_exit = [] async def run_task(): while True: @@ -404,7 +405,7 @@ async def run_task(): await client.db.test.find_one({}) elapsed = time.monotonic() - start_time latencies.append(elapsed) - if elapsed >= close_delay: + if should_exit: break await asyncio.sleep(0.001) @@ -432,8 +433,9 @@ async def mock_close(self, reason): await listener.async_wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1) # Wait until all idle connections are closed to simulate real-world conditions await listener.async_wait_for_event(monitoring.ConnectionClosedEvent, 10) + should_exit.append(True) # No operation latency should not significantly exceed close_delay - self.assertLessEqual(max(latencies), close_delay * 2.0) + self.assertLessEqual(max(latencies), close_delay * 5.0) finally: AsyncConnection.close_conn = original_close await task.join() diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index 7c2a425033..03453706aa 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -26,7 +26,7 @@ from test.helpers import ConcurrentRunner from pymongo.operations import _Op -from pymongo.server_selectors import readable_server_selector +from pymongo.server_selectors import writable_server_selector from pymongo.synchronous.pool import Connection sys.path[0:0] = [""] @@ -386,15 +386,16 @@ def test_connection_close_does_not_block_other_operations(self): heartbeatFrequencyMS=500, minPoolSize=10, ) - server = (client._get_topology()).select_server(readable_server_selector, _Op.TEST) + server = (client._get_topology()).select_server(writable_server_selector, _Op.TEST) wait_until( lambda: len(server._pool.conns) == 10, "pool initialized with 10 connections", ) client.db.test.insert_one({"x": 1}) - close_delay = 0.05 + close_delay = 0.1 latencies = [] + should_exit = [] def run_task(): while True: @@ -402,7 +403,7 @@ def run_task(): client.db.test.find_one({}) elapsed = time.monotonic() - start_time latencies.append(elapsed) - if elapsed >= close_delay: + if should_exit: break time.sleep(0.001) @@ -430,8 +431,9 @@ def mock_close(self, reason): listener.wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1) # Wait until all idle connections are closed to simulate real-world conditions listener.wait_for_event(monitoring.ConnectionClosedEvent, 10) + should_exit.append(True) # No operation latency should not significantly exceed close_delay - self.assertLessEqual(max(latencies), close_delay * 2.0) + self.assertLessEqual(max(latencies), close_delay * 5.0) finally: Connection.close_conn = original_close task.join() From 37be1665102cea1a612fb0915650afb6632e8d7e Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 23 Apr 2025 14:45:34 -0400 Subject: [PATCH 09/10] Address review --- test/asynchronous/test_client.py | 1 - test/asynchronous/test_discovery_and_monitoring.py | 5 ++++- test/test_client.py | 1 - test/test_discovery_and_monitoring.py | 5 ++++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index fc4628f7f9..b9deb985bd 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -1907,7 +1907,6 @@ async def test_direct_connection(self): AsyncMongoClient(["host1", "host2"], directConnection=True) @unittest.skipIf("PyPy" in sys.version, "PYTHON-2927 fails often on PyPy") - @skipIf(os.environ.get("DEBUG_LOG"), "Enabling debug logs breaks this test") async def test_continuous_network_errors(self): def server_description_count(): i = 0 diff --git a/test/asynchronous/test_discovery_and_monitoring.py b/test/asynchronous/test_discovery_and_monitoring.py index 8c530b4ab6..cf26faf248 100644 --- a/test/asynchronous/test_discovery_and_monitoring.py +++ b/test/asynchronous/test_discovery_and_monitoring.py @@ -433,12 +433,15 @@ async def mock_close(self, reason): await listener.async_wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1) # Wait until all idle connections are closed to simulate real-world conditions await listener.async_wait_for_event(monitoring.ConnectionClosedEvent, 10) + # Wait for one more find to complete after the pool has been reset, then shutdown the task + n = len(latencies) + await async_wait_until(lambda: len(latencies) >= n + 1, "run one more find") should_exit.append(True) + await task.join() # No operation latency should not significantly exceed close_delay self.assertLessEqual(max(latencies), close_delay * 5.0) finally: AsyncConnection.close_conn = original_close - await task.join() class TestServerMonitoringMode(AsyncIntegrationTest): diff --git a/test/test_client.py b/test/test_client.py index 03c158b99a..c2df8ab2b6 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1864,7 +1864,6 @@ def test_direct_connection(self): MongoClient(["host1", "host2"], directConnection=True) @unittest.skipIf("PyPy" in sys.version, "PYTHON-2927 fails often on PyPy") - @skipIf(os.environ.get("DEBUG_LOG"), "Enabling debug logs breaks this test") def test_continuous_network_errors(self): def server_description_count(): i = 0 diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index 03453706aa..9d6c945707 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -431,12 +431,15 @@ def mock_close(self, reason): listener.wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1) # Wait until all idle connections are closed to simulate real-world conditions listener.wait_for_event(monitoring.ConnectionClosedEvent, 10) + # Wait for one more find to complete after the pool has been reset, then shutdown the task + n = len(latencies) + wait_until(lambda: len(latencies) >= n + 1, "run one more find") should_exit.append(True) + task.join() # No operation latency should not significantly exceed close_delay self.assertLessEqual(max(latencies), close_delay * 5.0) finally: Connection.close_conn = original_close - task.join() class TestServerMonitoringMode(IntegrationTest): From d94219cc953e3c9e294583b4f9ead819995509aa Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 23 Apr 2025 15:17:27 -0400 Subject: [PATCH 10/10] Update changelog --- doc/changelog.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/changelog.rst b/doc/changelog.rst index 4fff06c9cb..46e7364f53 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -16,6 +16,8 @@ Version 4.12.1 is a bug fix release. errors such as: "NotImplementedError: Database objects do not implement truth value testing or bool()". - Removed Eventlet testing against Python versions newer than 3.9 since Eventlet is actively being sunset by its maintainers and has compatibility issues with PyMongo's dnspython dependency. +- Fixed a bug where MongoDB cluster topology changes could cause asynchronous operations to take much longer to complete + due to holding the Topology lock while closing stale connections. Issues Resolved ...............