Skip to content

Commit ed4204f

Browse files
committed
PYTHON-1954 Stop holding the topology lock while creating new connections
1 parent 228f716 commit ed4204f

9 files changed

+78
-164
lines changed

pymongo/pool.py

+12-2
Original file line numberDiff line numberDiff line change
@@ -1033,14 +1033,19 @@ def reset(self):
10331033
def close(self):
10341034
self._reset(close=True)
10351035

1036-
def remove_stale_sockets(self):
1037-
"""Removes stale sockets then adds new ones if pool is too small."""
1036+
def remove_stale_sockets(self, reference_pool_id):
1037+
"""Removes stale sockets then adds new ones if pool is too small and
1038+
has not been reset. The `reference_pool_id` argument specifies the
1039+
`pool_id` at the point in time this operation was requested on the
1040+
pool.
1041+
"""
10381042
if self.opts.max_idle_time_seconds is not None:
10391043
with self.lock:
10401044
while (self.sockets and
10411045
self.sockets[-1].idle_time_seconds() > self.opts.max_idle_time_seconds):
10421046
sock_info = self.sockets.pop()
10431047
sock_info.close_socket(ConnectionClosedReason.IDLE)
1048+
10441049
while True:
10451050
with self.lock:
10461051
if (len(self.sockets) + self.active_sockets >=
@@ -1054,6 +1059,11 @@ def remove_stale_sockets(self):
10541059
try:
10551060
sock_info = self.connect()
10561061
with self.lock:
1062+
# Close connection and return if the pool was reset during
1063+
# socket creation or while acquiring the pool lock.
1064+
if self.pool_id != reference_pool_id:
1065+
sock_info.close_socket()
1066+
break
10571067
self.sockets.appendleft(sock_info)
10581068
finally:
10591069
self._socket_semaphore.release()

pymongo/topology.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -429,9 +429,13 @@ def mark_server_unknown_and_request_check(self, address):
429429

430430
def update_pool(self):
431431
# Remove any stale sockets and add new sockets if pool is too small.
432+
servers = []
432433
with self._lock:
433434
for server in self._servers.values():
434-
server._pool.remove_stale_sockets()
435+
servers.append((server, server._pool.pool_id))
436+
437+
for server, pool_id in servers:
438+
server._pool.remove_stale_sockets(pool_id)
435439

436440
def close(self):
437441
"""Clear pools and terminate monitors. Topology reopens on demand."""

test/test_command_monitoring_spec.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
from pymongo.errors import OperationFailure
2727
from pymongo.write_concern import WriteConcern
2828
from test import unittest, client_context
29-
from test.utils import single_client, wait_until, EventListener
30-
from test.utils_selection_tests import parse_read_preference
29+
from test.utils import single_client, wait_until, EventListener, parse_read_preference
3130

3231
# Location of JSON test specifications.
3332
_TEST_PATH = os.path.join(

test/test_discovery_and_monitoring.py

+2-31
Original file line numberDiff line numberDiff line change
@@ -30,43 +30,14 @@
3030
from pymongo.settings import TopologySettings
3131
from pymongo.uri_parser import parse_uri
3232
from test import unittest
33+
from test.utils import MockPool
3334

3435

3536
# Location of JSON test specifications.
3637
_TEST_PATH = os.path.join(
3738
os.path.dirname(os.path.realpath(__file__)), 'discovery_and_monitoring')
3839

3940

40-
class MockSocketInfo(object):
41-
def close(self):
42-
pass
43-
44-
def __enter__(self):
45-
return self
46-
47-
def __exit__(self, exc_type, exc_val, exc_tb):
48-
pass
49-
50-
51-
class MockPool(object):
52-
def __init__(self, *args, **kwargs):
53-
self.pool_id = 0
54-
self._lock = threading.Lock()
55-
56-
def _reset(self):
57-
with self._lock:
58-
self.pool_id += 1
59-
60-
def reset(self):
61-
self._reset()
62-
63-
def close(self):
64-
self._reset()
65-
66-
def update_is_writable(self, is_writable):
67-
pass
68-
69-
7041
class MockMonitor(object):
7142
def __init__(self, server_description, topology, pool, topology_settings):
7243
self._server_description = server_description
@@ -81,7 +52,7 @@ def request_check(self):
8152
def close(self):
8253
pass
8354

84-
def remove_stale_sockets(self):
55+
def remove_stale_sockets(self, reference_pool_id):
8556
pass
8657

8758

test/test_heartbeat_monitoring.py

+2-42
Original file line numberDiff line numberDiff line change
@@ -22,49 +22,9 @@
2222
from pymongo.errors import ConnectionFailure
2323
from pymongo.ismaster import IsMaster
2424
from pymongo.monitor import Monitor
25-
from pymongo.pool import PoolOptions
2625
from test import unittest, client_knobs
27-
from test.utils import HeartbeatEventListener, single_client, wait_until
28-
29-
30-
class MockSocketInfo(object):
31-
def close(self):
32-
pass
33-
34-
def __enter__(self):
35-
return self
36-
37-
def __exit__(self, exc_type, exc_val, exc_tb):
38-
pass
39-
40-
41-
class MockPool(object):
42-
def __init__(self, *args, **kwargs):
43-
self.pool_id = 0
44-
self._lock = threading.Lock()
45-
self.opts = PoolOptions()
46-
47-
def get_socket(self, all_credentials):
48-
return MockSocketInfo()
49-
50-
def return_socket(self, *args, **kwargs):
51-
pass
52-
53-
def _reset(self):
54-
with self._lock:
55-
self.pool_id += 1
56-
57-
def reset(self):
58-
self._reset()
59-
60-
def close(self):
61-
self._reset()
62-
63-
def update_is_writable(self, is_writable):
64-
pass
65-
66-
def remove_stale_sockets(self):
67-
pass
26+
from test.utils import (HeartbeatEventListener, MockPool, single_client,
27+
wait_until)
6828

6929

7030
class TestHeartbeatMonitoring(unittest.TestCase):

test/test_topology.py

+1-41
Original file line numberDiff line numberDiff line change
@@ -37,47 +37,7 @@
3737
writable_server_selector)
3838
from pymongo.settings import TopologySettings
3939
from test import client_knobs, unittest
40-
from test.utils import wait_until
41-
42-
43-
class MockSocketInfo(object):
44-
def close(self):
45-
pass
46-
47-
def __enter__(self):
48-
return self
49-
50-
def __exit__(self, exc_type, exc_val, exc_tb):
51-
pass
52-
53-
54-
class MockPool(object):
55-
def __init__(self, *args, **kwargs):
56-
self.pool_id = 0
57-
self._lock = threading.Lock()
58-
self.opts = PoolOptions()
59-
60-
def get_socket(self, all_credentials):
61-
return MockSocketInfo()
62-
63-
def return_socket(self, *args, **kwargs):
64-
pass
65-
66-
def _reset(self):
67-
with self._lock:
68-
self.pool_id += 1
69-
70-
def reset(self):
71-
self._reset()
72-
73-
def close(self):
74-
self._reset()
75-
76-
def update_is_writable(self, is_writable):
77-
pass
78-
79-
def remove_stale_sockets(self):
80-
pass
40+
from test.utils import MockPool, wait_until
8141

8242

8343
class MockMonitor(object):

test/utils.py

+53-3
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@
3232
from bson.objectid import ObjectId
3333

3434
from pymongo import (MongoClient,
35-
monitoring)
35+
monitoring, read_preferences)
3636
from pymongo.errors import ConfigurationError, OperationFailure
3737
from pymongo.monitoring import _SENSITIVE_COMMANDS, ConnectionPoolListener
38+
from pymongo.pool import PoolOptions
3839
from pymongo.read_concern import ReadConcern
3940
from pymongo.read_preferences import ReadPreference
4041
from pymongo.server_selectors import (any_server_selector,
@@ -44,8 +45,6 @@
4445
from test import (client_context,
4546
db_user,
4647
db_pwd)
47-
from test.utils_selection_tests import parse_read_preference
48-
4948

5049
IMPOSSIBLE_WRITE_CONCERN = WriteConcern(w=1000)
5150

@@ -185,6 +184,46 @@ def failed(self, event):
185184
self.results.append(event)
186185

187186

187+
class MockSocketInfo(object):
188+
def close(self):
189+
pass
190+
191+
def __enter__(self):
192+
return self
193+
194+
def __exit__(self, exc_type, exc_val, exc_tb):
195+
pass
196+
197+
198+
class MockPool(object):
199+
def __init__(self, *args, **kwargs):
200+
self.pool_id = 0
201+
self._lock = threading.Lock()
202+
self.opts = PoolOptions()
203+
204+
def get_socket(self, all_credentials):
205+
return MockSocketInfo()
206+
207+
def return_socket(self, *args, **kwargs):
208+
pass
209+
210+
def _reset(self):
211+
with self._lock:
212+
self.pool_id += 1
213+
214+
def reset(self):
215+
self._reset()
216+
217+
def close(self):
218+
self._reset()
219+
220+
def update_is_writable(self, is_writable):
221+
pass
222+
223+
def remove_stale_sockets(self, reference_pool_id):
224+
pass
225+
226+
188227
class ScenarioDict(dict):
189228
"""Dict that returns {} for any unknown key, recursively."""
190229
def __init__(self, data):
@@ -811,3 +850,14 @@ def run(self):
811850
except BaseException as exc:
812851
self.exc = exc
813852
raise
853+
854+
855+
def parse_read_preference(pref):
856+
# Make first letter lowercase to match read_pref's modes.
857+
mode_string = pref.get('mode', 'primary')
858+
mode_string = mode_string[:1].lower() + mode_string[1:]
859+
mode = read_preferences.read_pref_mode_from_name(mode_string)
860+
max_staleness = pref.get('maxStalenessSeconds', -1)
861+
tag_sets = pref.get('tag_sets')
862+
return read_preferences.make_read_preference(
863+
mode, tag_sets=tag_sets, max_staleness=max_staleness)

test/utils_selection_tests.py

+1-40
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
sys.path[0:0] = [""]
2222

2323
from bson import json_util
24-
from pymongo import read_preferences
2524
from pymongo.common import clean_node, HEARTBEAT_FREQUENCY
2625
from pymongo.errors import AutoReconnect, ConfigurationError
2726
from pymongo.ismaster import IsMaster
@@ -30,34 +29,7 @@
3029
from pymongo.server_selectors import writable_server_selector
3130
from pymongo.topology import Topology
3231
from test import unittest
33-
34-
35-
class MockSocketInfo(object):
36-
def close(self):
37-
pass
38-
39-
def __enter__(self):
40-
return self
41-
42-
def __exit__(self, exc_type, exc_val, exc_tb):
43-
pass
44-
45-
46-
class MockPool(object):
47-
def __init__(self, *args, **kwargs):
48-
pass
49-
50-
def reset(self):
51-
pass
52-
53-
def close(self):
54-
pass
55-
56-
def update_is_writable(self, is_writable):
57-
pass
58-
59-
def remove_stale_sockets(self):
60-
pass
32+
from test.utils import MockPool, parse_read_preference
6133

6234

6335
class MockMonitor(object):
@@ -288,14 +260,3 @@ class TestAllScenarios(unittest.TestCase):
288260
setattr(TestAllScenarios, new_test.__name__, new_test)
289261

290262
return TestAllScenarios
291-
292-
293-
def parse_read_preference(pref):
294-
# Make first letter lowercase to match read_pref's modes.
295-
mode_string = pref.get('mode', 'primary')
296-
mode_string = mode_string[:1].lower() + mode_string[1:]
297-
mode = read_preferences.read_pref_mode_from_name(mode_string)
298-
max_staleness = pref.get('maxStalenessSeconds', -1)
299-
tag_sets = pref.get('tag_sets')
300-
return read_preferences.make_read_preference(
301-
mode, tag_sets=tag_sets, max_staleness=max_staleness)

test/utils_spec_runner.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@
4646
camel_to_upper_camel,
4747
CompareType,
4848
OvertCommandListener,
49-
rs_client)
50-
from test.utils_selection_tests import parse_read_preference
49+
rs_client, parse_read_preference)
5150

5251

5352
class SpecRunner(IntegrationTest):

0 commit comments

Comments
 (0)