Skip to content

Commit 2ac2da0

Browse files
committed
PYTHON-2672 SDAM, CMAP, and server selection changes for load balancers (#621)
Disable SRV Polling, SDAM compatibility check, logicalSessionTimeoutMinutes check. server session pool pruning, server selection, and server monitoring. A ServerType of LoadBalancer MUST be considered a data-bearing server. "drivers MUST emit the following series of SDAM events" section. Send loadBalanced:True with handshakes, validate serviceId. Add topologyVersion fallback when serviceId is missing. Don't mark load balancers unknown. (cherry picked from commit 5bf15c8)
1 parent 74b4061 commit 2ac2da0

10 files changed

+113
-50
lines changed

pymongo/client_options.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ def _parse_pool_options(options):
130130
options.get('compressors', []),
131131
options.get('zlibcompressionlevel', -1))
132132
ssl_context, ssl_match_hostname = _parse_ssl_options(options)
133+
load_balanced = options.get('loadbalanced')
133134
return PoolOptions(max_pool_size,
134135
min_pool_size,
135136
max_idle_time_seconds,
@@ -140,7 +141,8 @@ def _parse_pool_options(options):
140141
appname,
141142
driver,
142143
compression_settings,
143-
server_api=server_api)
144+
server_api=server_api,
145+
load_balanced=load_balanced)
144146

145147

146148
class ClientOptions(object):

pymongo/ismaster.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ def _get_server_type(doc):
2727
if not doc.get('ok'):
2828
return SERVER_TYPE.Unknown
2929

30-
if doc.get('isreplicaset'):
30+
if doc.get('serviceId'):
31+
return SERVER_TYPE.LoadBalancer
32+
elif doc.get('isreplicaset'):
3133
return SERVER_TYPE.RSGhost
3234
elif doc.get('setName'):
3335
if doc.get('hidden'):
@@ -59,7 +61,8 @@ def __init__(self, doc, awaitable=False):
5961
self._is_writable = self._server_type in (
6062
SERVER_TYPE.RSPrimary,
6163
SERVER_TYPE.Standalone,
62-
SERVER_TYPE.Mongos)
64+
SERVER_TYPE.Mongos,
65+
SERVER_TYPE.LoadBalancer)
6366

6467
self._is_readable = (
6568
self.server_type == SERVER_TYPE.RSSecondary
@@ -186,3 +189,7 @@ def topology_version(self):
186189
@property
187190
def awaitable(self):
188191
return self._awaitable
192+
193+
@property
194+
def service_id(self):
195+
return self._doc.get('serviceId')

pymongo/mongo_client.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1025,7 +1025,8 @@ def address(self):
10251025
'Cannot use "address" property when load balancing among'
10261026
' mongoses, use "nodes" instead.')
10271027
if topology_type not in (TOPOLOGY_TYPE.ReplicaSetWithPrimary,
1028-
TOPOLOGY_TYPE.Single):
1028+
TOPOLOGY_TYPE.Single,
1029+
TOPOLOGY_TYPE.LoadBalanced):
10291030
return None
10301031
return self._server_property('address')
10311032

pymongo/pool.py

+24-2
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ class PoolOptions(object):
296296
'__wait_queue_timeout', '__wait_queue_multiple',
297297
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
298298
'__event_listeners', '__appname', '__driver', '__metadata',
299-
'__compression_settings', '__server_api')
299+
'__compression_settings', '__server_api', '__load_balanced')
300300

301301
def __init__(self, max_pool_size=MAX_POOL_SIZE,
302302
min_pool_size=MIN_POOL_SIZE,
@@ -305,7 +305,8 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
305305
wait_queue_multiple=None, ssl_context=None,
306306
ssl_match_hostname=True, socket_keepalive=True,
307307
event_listeners=None, appname=None, driver=None,
308-
compression_settings=None, server_api=None):
308+
compression_settings=None, server_api=None,
309+
load_balanced=None):
309310
self.__max_pool_size = max_pool_size
310311
self.__min_pool_size = min_pool_size
311312
self.__max_idle_time_seconds = max_idle_time_seconds
@@ -321,6 +322,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
321322
self.__driver = driver
322323
self.__compression_settings = compression_settings
323324
self.__server_api = server_api
325+
self.__load_balanced = load_balanced
324326
self.__metadata = copy.deepcopy(_METADATA)
325327
if appname:
326328
self.__metadata['application'] = {'name': appname}
@@ -470,6 +472,12 @@ def server_api(self):
470472
"""
471473
return self.__server_api
472474

475+
@property
476+
def load_balanced(self):
477+
"""True if this Pool is configured in load balanced mode.
478+
"""
479+
return self.__load_balanced
480+
473481

474482
def _negotiate_creds(all_credentials):
475483
"""Return one credential that needs mechanism negotiation, if any.
@@ -549,6 +557,8 @@ def __init__(self, sock, pool, address, id):
549557
self.cancel_context = _CancellationContext()
550558
self.opts = pool.opts
551559
self.more_to_come = False
560+
# For load balancer support.
561+
self.service_id = None
552562

553563
def hello_cmd(self):
554564
if self.opts.server_api:
@@ -569,6 +579,8 @@ def _ismaster(self, cluster_time, topology_version,
569579
cmd['client'] = self.opts.metadata
570580
if self.compression_settings:
571581
cmd['compression'] = self.compression_settings.compressors
582+
if self.opts.load_balanced:
583+
cmd['loadBalanced'] = True
572584
elif topology_version is not None:
573585
cmd['topologyVersion'] = topology_version
574586
cmd['maxAwaitTimeMS'] = int(heartbeat_frequency*1000)
@@ -592,6 +604,10 @@ def _ismaster(self, cluster_time, topology_version,
592604

593605
doc = self.command('admin', cmd, publish_events=False,
594606
exhaust_allowed=awaitable)
607+
# PYTHON-2712 will remove this topologyVersion fallback logic.
608+
if self.opts.load_balanced:
609+
process_id = doc.get('topologyVersion', {}).get('processId')
610+
doc.setdefault('serviceId', process_id)
595611
ismaster = IsMaster(doc, awaitable=awaitable)
596612
self.is_writable = ismaster.is_writable
597613
self.max_wire_version = ismaster.max_wire_version
@@ -613,6 +629,12 @@ def _ismaster(self, cluster_time, topology_version,
613629
auth_ctx.parse_response(ismaster)
614630
if auth_ctx.speculate_succeeded():
615631
self.auth_ctx[auth_ctx.credentials] = auth_ctx
632+
if self.opts.load_balanced:
633+
if not ismaster.service_id:
634+
raise ConfigurationError(
635+
'Driver attempted to initialize in load balancing mode'
636+
' but the server does not support this mode')
637+
self.service_id = ismaster.service_id
616638
return ismaster
617639

618640
def _next_reply(self):

pymongo/server.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ def open(self):
4646
4747
Multiple calls have no effect.
4848
"""
49-
self._monitor.open()
49+
if not self._pool.opts.load_balanced:
50+
self._monitor.open()
5051

5152
def reset(self):
5253
"""Clear the connection pool."""

pymongo/server_description.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ def retryable_writes_supported(self):
205205
"""Checks if this server supports retryable writes."""
206206
return (
207207
self._ls_timeout_minutes is not None and
208-
self._server_type in (SERVER_TYPE.Mongos, SERVER_TYPE.RSPrimary))
208+
self._server_type in (SERVER_TYPE.Mongos, SERVER_TYPE.RSPrimary,
209+
SERVER_TYPE.LoadBalancer))
209210

210211
@property
211212
def retryable_reads_supported(self):

pymongo/server_type.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@
2020
SERVER_TYPE = namedtuple('ServerType',
2121
['Unknown', 'Mongos', 'RSPrimary', 'RSSecondary',
2222
'RSArbiter', 'RSOther', 'RSGhost',
23-
'Standalone'])(*range(8))
23+
'Standalone', 'LoadBalancer'])(*range(9))

pymongo/settings.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ def load_balanced(self):
132132
return self._load_balanced
133133

134134
def get_topology_type(self):
135-
if self.direct:
135+
if self.load_balanced:
136+
return TOPOLOGY_TYPE.LoadBalanced
137+
elif self.direct:
136138
return TOPOLOGY_TYPE.Single
137139
elif self.replica_set_name is not None:
138140
return TOPOLOGY_TYPE.ReplicaSetNoPrimary

pymongo/topology.py

+39-18
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
OperationFailure,
4242
ServerSelectionTimeoutError,
4343
WriteError)
44+
from pymongo.ismaster import IsMaster
4445
from pymongo.monitor import SrvMonitor
4546
from pymongo.monotonic import time as _time
4647
from pymongo.server import Server
@@ -140,7 +141,8 @@ def target():
140141
executor.open()
141142

142143
self._srv_monitor = None
143-
if self._settings.fqdn is not None:
144+
if (self._settings.fqdn is not None and
145+
not self._settings.load_balanced):
144146
self._srv_monitor = SrvMonitor(self, self._settings)
145147

146148
def open(self):
@@ -486,29 +488,38 @@ def pop_all_sessions(self):
486488
with self._lock:
487489
return self._session_pool.pop_all()
488490

489-
def get_server_session(self):
490-
"""Start or resume a server session, or raise ConfigurationError."""
491-
with self._lock:
492-
session_timeout = self._description.logical_session_timeout_minutes
493-
if session_timeout is None:
494-
# Maybe we need an initial scan? Can raise ServerSelectionError.
495-
if self._description.topology_type == TOPOLOGY_TYPE.Single:
496-
if not self._description.has_known_servers:
497-
self._select_servers_loop(
498-
any_server_selector,
499-
self._settings.server_selection_timeout,
500-
None)
501-
elif not self._description.readable_servers:
491+
def _check_session_support(self):
492+
"""Internal check for session support on non-load balanced clusters."""
493+
session_timeout = self._description.logical_session_timeout_minutes
494+
if session_timeout is None:
495+
# Maybe we need an initial scan? Can raise ServerSelectionError.
496+
if self._description.topology_type == TOPOLOGY_TYPE.Single:
497+
if not self._description.has_known_servers:
502498
self._select_servers_loop(
503-
readable_server_selector,
499+
any_server_selector,
504500
self._settings.server_selection_timeout,
505501
None)
502+
elif not self._description.readable_servers:
503+
self._select_servers_loop(
504+
readable_server_selector,
505+
self._settings.server_selection_timeout,
506+
None)
506507

507508
session_timeout = self._description.logical_session_timeout_minutes
508509
if session_timeout is None:
509510
raise ConfigurationError(
510511
"Sessions are not supported by this MongoDB deployment")
512+
return session_timeout
511513

514+
def get_server_session(self):
515+
"""Start or resume a server session, or raise ConfigurationError."""
516+
with self._lock:
517+
# Sessions are always supported in load balanced mode.
518+
if not self._settings.load_balanced:
519+
session_timeout = self._check_session_support()
520+
else:
521+
# Sessions never time out in load balanced mode.
522+
session_timeout = float('inf')
512523
return self._session_pool.get_server_session(session_timeout)
513524

514525
def return_server_session(self, server_session, lock):
@@ -548,6 +559,12 @@ def _ensure_opened(self):
548559
SRV_POLLING_TOPOLOGIES):
549560
self._srv_monitor.open()
550561

562+
if self._settings.load_balanced:
563+
# Emit initial SDAM events for load balancer mode.
564+
self._process_change(ServerDescription(
565+
self._seed_addresses[0],
566+
IsMaster({'ok': 1, 'serviceId': self._topology_id})))
567+
551568
# Ensure that the monitors are open.
552569
for server in itervalues(self._servers):
553570
server.open()
@@ -601,15 +618,17 @@ def _handle_error(self, address, err_ctx):
601618
err_code = error.details.get('code', -1)
602619
is_shutting_down = err_code in helpers._SHUTDOWN_CODES
603620
# Mark server Unknown, clear the pool, and request check.
604-
self._process_change(ServerDescription(address, error=error))
621+
if not self._settings.load_balanced:
622+
self._process_change(ServerDescription(address, error=error))
605623
if is_shutting_down or (err_ctx.max_wire_version <= 7):
606624
# Clear the pool.
607625
server.reset()
608626
server.request_check()
609627
elif issubclass(exc_type, ConnectionFailure):
610628
# "Client MUST replace the server's description with type Unknown
611629
# ... MUST NOT request an immediate check of the server."
612-
self._process_change(ServerDescription(address, error=error))
630+
if not self._settings.load_balanced:
631+
self._process_change(ServerDescription(address, error=error))
613632
# Clear the pool.
614633
server.reset()
615634
# "When a client marks a server Unknown from `Network error when
@@ -620,7 +639,9 @@ def _handle_error(self, address, err_ctx):
620639
# Do not request an immediate check since the server is likely
621640
# shutting down.
622641
if error.code in helpers._NOT_MASTER_CODES:
623-
self._process_change(ServerDescription(address, error=error))
642+
if not self._settings.load_balanced:
643+
self._process_change(
644+
ServerDescription(address, error=error))
624645
# Clear the pool.
625646
server.reset()
626647

pymongo/topology_description.py

+28-22
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525

2626

2727
# Enumeration for various kinds of MongoDB cluster topologies.
28-
TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary',
29-
'ReplicaSetWithPrimary', 'Sharded',
30-
'Unknown'])(*range(5))
28+
TOPOLOGY_TYPE = namedtuple('TopologyType', [
29+
'Single', 'ReplicaSetNoPrimary', 'ReplicaSetWithPrimary', 'Sharded',
30+
'Unknown', 'LoadBalanced'])(*range(6))
3131

3232
# Topologies compatible with SRV record polling.
3333
SRV_POLLING_TOPOLOGIES = (TOPOLOGY_TYPE.Unknown, TOPOLOGY_TYPE.Sharded)
@@ -63,7 +63,28 @@ def __init__(self,
6363

6464
# Is PyMongo compatible with all servers' wire protocols?
6565
self._incompatible_err = None
66+
if self._topology_type != TOPOLOGY_TYPE.LoadBalanced:
67+
self._init_incompatible_err()
6668

69+
# Server Discovery And Monitoring Spec: Whenever a client updates the
70+
# TopologyDescription from an ismaster response, it MUST set
71+
# TopologyDescription.logicalSessionTimeoutMinutes to the smallest
72+
# logicalSessionTimeoutMinutes value among ServerDescriptions of all
73+
# data-bearing server types. If any have a null
74+
# logicalSessionTimeoutMinutes, then
75+
# TopologyDescription.logicalSessionTimeoutMinutes MUST be set to null.
76+
readable_servers = self.readable_servers
77+
if not readable_servers:
78+
self._ls_timeout_minutes = None
79+
elif any(s.logical_session_timeout_minutes is None
80+
for s in readable_servers):
81+
self._ls_timeout_minutes = None
82+
else:
83+
self._ls_timeout_minutes = min(s.logical_session_timeout_minutes
84+
for s in readable_servers)
85+
86+
def _init_incompatible_err(self):
87+
"""Internal compatibility check for non-load balanced topologies."""
6788
for s in self._server_descriptions.values():
6889
if not s.is_server_type_known:
6990
continue
@@ -98,23 +119,6 @@ def __init__(self,
98119

99120
break
100121

101-
# Server Discovery And Monitoring Spec: Whenever a client updates the
102-
# TopologyDescription from an ismaster response, it MUST set
103-
# TopologyDescription.logicalSessionTimeoutMinutes to the smallest
104-
# logicalSessionTimeoutMinutes value among ServerDescriptions of all
105-
# data-bearing server types. If any have a null
106-
# logicalSessionTimeoutMinutes, then
107-
# TopologyDescription.logicalSessionTimeoutMinutes MUST be set to null.
108-
readable_servers = self.readable_servers
109-
if not readable_servers:
110-
self._ls_timeout_minutes = None
111-
elif any(s.logical_session_timeout_minutes is None
112-
for s in readable_servers):
113-
self._ls_timeout_minutes = None
114-
else:
115-
self._ls_timeout_minutes = min(s.logical_session_timeout_minutes
116-
for s in readable_servers)
117-
118122
def check_compatible(self):
119123
"""Raise ConfigurationError if any server is incompatible.
120124
@@ -243,8 +247,9 @@ def apply_local_threshold(selection):
243247
selector.min_wire_version,
244248
common_wv))
245249

246-
if self.topology_type == TOPOLOGY_TYPE.Single:
247-
# Ignore selectors for standalone.
250+
if self.topology_type in (TOPOLOGY_TYPE.Single,
251+
TOPOLOGY_TYPE.LoadBalanced):
252+
# Ignore selectors for standalone and load balancer mode.
248253
return self.known_servers
249254
elif address:
250255
# Ignore selectors when explicit address is requested.
@@ -306,6 +311,7 @@ def __repr__(self):
306311
SERVER_TYPE.RSSecondary: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
307312
SERVER_TYPE.RSArbiter: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
308313
SERVER_TYPE.RSOther: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
314+
# Note: SERVER_TYPE.LoadBalancer and Unknown are intentionally left out.
309315
}
310316

311317

0 commit comments

Comments
 (0)