Skip to content

Commit f22bb84

Browse files
Add Sentinel +switch-master linstener option to monitor failovers.
1 parent dfcf08c commit f22bb84

File tree

1 file changed

+134
-11
lines changed

1 file changed

+134
-11
lines changed

redis/sentinel.py

Lines changed: 134 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import random
2+
import time
23
import weakref
4+
from collections import defaultdict
5+
from threading import Event, Lock, Thread
36
from typing import Optional
47

58
from redis.client import Redis
@@ -104,21 +107,18 @@ def __init__(
104107
self.check_connection = check_connection
105108
self.service_name = service_name
106109
self.sentinel_manager = sentinel_manager
110+
111+
self._lock = Lock()
107112
self.reset()
108113

109114
def reset(self):
110-
self.master_address = None
111115
self.slave_rr_counter = None
116+
with self._lock:
117+
self.master_address = None
112118

113119
def get_master_address(self):
114120
master_address = self.sentinel_manager.discover_master(self.service_name)
115-
if self.is_master and self.master_address != master_address:
116-
self.master_address = master_address
117-
# disconnect any idle connections so that they reconnect
118-
# to the new master the next time that they are used.
119-
connection_pool = self.connection_pool_ref()
120-
if connection_pool is not None:
121-
connection_pool.disconnect(inuse_connections=False)
121+
self.update_master_address(master_address)
122122
return master_address
123123

124124
def rotate_slaves(self):
@@ -137,6 +137,23 @@ def rotate_slaves(self):
137137
pass
138138
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
139139

140+
def update_master_address(self, master_address):
141+
if not self.is_master:
142+
return
143+
144+
changed = False
145+
with self._lock:
146+
if self.master_address != master_address:
147+
self.master_address = master_address
148+
changed = True
149+
150+
if changed:
151+
# disconnect any idle connections so that they reconnect
152+
# to the new master the next time that they are used.
153+
connection_pool = self.connection_pool_ref()
154+
if connection_pool is not None:
155+
connection_pool.disconnect(inuse_connections=False)
156+
140157

141158
class SentinelConnectionPool(ConnectionPool):
142159
"""
@@ -224,6 +241,11 @@ class Sentinel(SentinelCommands):
224241
not specified, any socket_timeout and socket_keepalive options specified
225242
in ``connection_kwargs`` will be used.
226243
244+
``m̀onitor_failover`` indicates whether the Sentinel client should monitor
245+
for master failover events. If set to True, the client will subscribe to
246+
Sentinel's "+switch-master" notifications and update any registered
247+
connection pools automatically when a failover occurs.
248+
227249
``connection_kwargs`` are keyword arguments that will be used when
228250
establishing a connection to a Redis server.
229251
"""
@@ -234,6 +256,7 @@ def __init__(
234256
min_other_sentinels=0,
235257
sentinel_kwargs=None,
236258
force_master_ip=None,
259+
monitor_failover: bool = False,
237260
**connection_kwargs,
238261
):
239262
# if sentinel_kwargs isn't defined, use the socket_* options from
@@ -252,6 +275,11 @@ def __init__(
252275
self.connection_kwargs = connection_kwargs
253276
self._force_master_ip = force_master_ip
254277

278+
self._monitor_failover = monitor_failover
279+
self._listener_lock = Lock()
280+
self._switch_master_listener = None
281+
self._proxies_by_service = defaultdict(weakref.WeakSet)
282+
255283
def execute_command(self, *args, **kwargs):
256284
"""
257285
Execute Sentinel command in sentinel nodes.
@@ -355,6 +383,29 @@ def discover_slaves(self, service_name):
355383
return slaves
356384
return []
357385

386+
def close(self):
387+
with self._listener_lock:
388+
if self._switch_master_listener is not None:
389+
self._switch_master_listener.stop()
390+
self._switch_master_listener.join(timeout=2)
391+
self._switch_master_listener = None
392+
393+
def _register_proxy(self, service_name: str, proxy: SentinelConnectionPoolProxy):
394+
self._proxies_by_service[service_name].add(proxy)
395+
if not self._monitor_failover:
396+
return
397+
with self._listener_lock:
398+
if self._switch_master_listener is None:
399+
self._switch_master_listener = _SwitchMasterListener(self)
400+
self._switch_master_listener.start()
401+
402+
def _on_switch_master(self, service_name: str, new_master_address: tuple[str, int]):
403+
proxies = self._proxies_by_service.get(service_name)
404+
if not proxies:
405+
return
406+
for proxy in list(proxies):
407+
proxy.update_master_address(new_master_address)
408+
358409
def master_for(
359410
self,
360411
service_name,
@@ -389,9 +440,11 @@ def master_for(
389440
kwargs["is_master"] = True
390441
connection_kwargs = dict(self.connection_kwargs)
391442
connection_kwargs.update(kwargs)
392-
return redis_class.from_pool(
393-
connection_pool_class(service_name, self, **connection_kwargs)
394-
)
443+
444+
pool = connection_pool_class(service_name, self, **connection_kwargs)
445+
self._register_proxy(service_name, pool.proxy)
446+
447+
return redis_class.from_pool(pool)
395448

396449
def slave_for(
397450
self,
@@ -423,3 +476,73 @@ def slave_for(
423476
return redis_class.from_pool(
424477
connection_pool_class(service_name, self, **connection_kwargs)
425478
)
479+
480+
481+
class _SwitchMasterListener(Thread):
482+
def __init__(self, sentinel: Sentinel):
483+
super().__init__(daemon=True)
484+
self._sentinel = sentinel
485+
self._stop = Event()
486+
487+
def stop(self):
488+
self._stop.set()
489+
490+
def run(self):
491+
while not self._stop.is_set():
492+
pubsub = None
493+
494+
try:
495+
sentinel = self._get_working_sentinel()
496+
if sentinel is None:
497+
time.sleep(1)
498+
continue
499+
500+
pubsub = sentinel.pubsub(ignore_subscribe_messages=True)
501+
pubsub.subscribe("+switch-master")
502+
503+
while not self._stop.is_set():
504+
msg = pubsub.get_message(timeout=1)
505+
if msg is None:
506+
continue
507+
if msg.get("type") != "message":
508+
continue
509+
510+
if (data := msg.get("data")) is None:
511+
continue
512+
if isinstance(data, bytes):
513+
data = data.decode("utf-8", errors="replace")
514+
515+
parts = str(data).split()
516+
if len(parts) < 5:
517+
continue
518+
519+
service_name = parts[0]
520+
new_master_ip = parts[3]
521+
try:
522+
new_master_port = int(parts[4])
523+
except ValueError:
524+
continue
525+
526+
self._sentinel._on_switch_master(
527+
service_name, (new_master_ip, new_master_port)
528+
)
529+
530+
except (ConnectionError, TimeoutError):
531+
time.sleep(0.5)
532+
finally:
533+
try:
534+
if pubsub is not None:
535+
pubsub.close()
536+
except Exception:
537+
pass
538+
539+
def _get_working_sentinel(self):
540+
sentinels = list(self._sentinel.sentinels)
541+
for sentinel in sentinels:
542+
try:
543+
sentinel.ping()
544+
return sentinel
545+
except (ConnectionError, TimeoutError):
546+
continue
547+
548+
return None

0 commit comments

Comments
 (0)