|
24 | 24 | import struct
|
25 | 25 | import sys
|
26 | 26 | import time
|
| 27 | +import threading |
27 | 28 | import warnings
|
28 | 29 |
|
29 | 30 | sys.path[0:0] = [""]
|
|
51 | 52 | from pymongo.monitoring import (ServerHeartbeatListener,
|
52 | 53 | ServerHeartbeatStartedEvent)
|
53 | 54 | from pymongo.mongo_client import MongoClient
|
| 55 | +from pymongo.monotonic import time as monotonic_time |
54 | 56 | from pymongo.driver_info import DriverInfo
|
55 | 57 | from pymongo.pool import SocketInfo, _METADATA
|
56 | 58 | from pymongo.read_preferences import ReadPreference
|
@@ -1455,6 +1457,75 @@ def compression_settings(client):
|
1455 | 1457 | # No error
|
1456 | 1458 | client.pymongo_test.test.find_one()
|
1457 | 1459 |
|
| 1460 | + def test_reset_during_update_pool(self): |
| 1461 | + client = rs_or_single_client(minPoolSize=10) |
| 1462 | + self.addCleanup(client.close) |
| 1463 | + client.admin.command('ping') |
| 1464 | + pool = get_pool(client) |
| 1465 | + pool_id = pool.pool_id |
| 1466 | + |
| 1467 | + # Continuously reset the pool. |
| 1468 | + class ResetPoolThread(threading.Thread): |
| 1469 | + def __init__(self, pool): |
| 1470 | + super(ResetPoolThread, self).__init__() |
| 1471 | + self.running = True |
| 1472 | + self.pool = pool |
| 1473 | + |
| 1474 | + def stop(self): |
| 1475 | + self.running = False |
| 1476 | + |
| 1477 | + def run(self): |
| 1478 | + while self.running: |
| 1479 | + self.pool.reset() |
| 1480 | + time.sleep(0.001) |
| 1481 | + |
| 1482 | + t = ResetPoolThread(pool) |
| 1483 | + t.start() |
| 1484 | + |
| 1485 | + # Ensure that update_pool completes without error even when the pool |
| 1486 | + # is reset concurrently. |
| 1487 | + try: |
| 1488 | + while True: |
| 1489 | + for _ in range(10): |
| 1490 | + client._topology.update_pool() |
| 1491 | + if pool_id != pool.pool_id: |
| 1492 | + break |
| 1493 | + finally: |
| 1494 | + t.stop() |
| 1495 | + t.join() |
| 1496 | + client.admin.command('ping') |
| 1497 | + |
| 1498 | + def test_background_connections_do_not_hold_locks(self): |
| 1499 | + min_pool_size = 10 |
| 1500 | + client = rs_or_single_client( |
| 1501 | + serverSelectionTimeoutMS=3000, minPoolSize=min_pool_size, |
| 1502 | + connect=False) |
| 1503 | + self.addCleanup(client.close) |
| 1504 | + |
| 1505 | + # Create a single connection in the pool. |
| 1506 | + client.admin.command('ping') |
| 1507 | + |
| 1508 | + # Cause new connections stall for a few seconds. |
| 1509 | + pool = get_pool(client) |
| 1510 | + original_connect = pool.connect |
| 1511 | + |
| 1512 | + def stall_connect(*args, **kwargs): |
| 1513 | + time.sleep(2) |
| 1514 | + return original_connect(*args, **kwargs) |
| 1515 | + |
| 1516 | + pool.connect = stall_connect |
| 1517 | + |
| 1518 | + # Wait for the background thread to start creating connections |
| 1519 | + wait_until(lambda: len(pool.sockets) > 1, 'start creating connections') |
| 1520 | + |
| 1521 | + # Assert that application operations do not block. |
| 1522 | + for _ in range(10): |
| 1523 | + start = monotonic_time() |
| 1524 | + client.admin.command('ping') |
| 1525 | + total = monotonic_time() - start |
| 1526 | + # Each ping command should not take more than 2 seconds |
| 1527 | + self.assertLess(total, 2) |
| 1528 | + |
1458 | 1529 |
|
1459 | 1530 | class TestExhaustCursor(IntegrationTest):
|
1460 | 1531 | """Test that clients properly handle errors from exhaust cursors."""
|
|
0 commit comments