6
6
7
7
class DummyConnection (ConnectionInterface ):
8
8
"""A dummy connection class for testing that doesn't actually connect to Redis"""
9
+
9
10
def __init__ (self , * args , ** kwargs ):
10
11
self .connected = False
11
12
@@ -15,23 +16,43 @@ def connect(self):
15
16
def disconnect (self ):
16
17
self .connected = False
17
18
18
- def register_connect_callback (self , callback ): pass
19
- def deregister_connect_callback (self , callback ): pass
20
- def set_parser (self , parser_class ): pass
21
- def get_protocol (self ): return 2
22
- def on_connect (self ): pass
23
- def check_health (self ): return True
24
- def send_packed_command (self , command , check_health = True ): pass
25
- def send_command (self , * args , ** kwargs ): pass
26
- def can_read (self , timeout = 0 ): return False
27
- def read_response (self , disable_decoding = False , ** kwargs ): return "PONG"
19
+ def register_connect_callback (self , callback ):
20
+ pass
21
+
22
+ def deregister_connect_callback (self , callback ):
23
+ pass
24
+
25
+ def set_parser (self , parser_class ):
26
+ pass
27
+
28
+ def get_protocol (self ):
29
+ return 2
30
+
31
+ def on_connect (self ):
32
+ pass
33
+
34
+ def check_health (self ):
35
+ return True
36
+
37
+ def send_packed_command (self , command , check_health = True ):
38
+ pass
39
+
40
+ def send_command (self , * args , ** kwargs ):
41
+ pass
42
+
43
+ def can_read (self , timeout = 0 ):
44
+ return False
45
+
46
+ def read_response (self , disable_decoding = False , ** kwargs ):
47
+ return "PONG"
28
48
29
49
30
50
@pytest .mark .onlynoncluster
31
51
def test_max_connections_error_inheritance ():
32
52
"""Test that MaxConnectionsError is a subclass of ConnectionError"""
33
53
assert issubclass (redis .MaxConnectionsError , redis .ConnectionError )
34
54
55
+
35
56
@pytest .mark .onlynoncluster
36
57
def test_connection_pool_raises_max_connections_error ():
37
58
"""Test that ConnectionPool raises MaxConnectionsError and not ConnectionError"""
@@ -43,7 +64,9 @@ def test_connection_pool_raises_max_connections_error():
43
64
pool .get_connection ()
44
65
45
66
46
- @pytest .mark .skipif (not hasattr (redis , "RedisCluster" ), reason = "RedisCluster not available" )
67
+ @pytest .mark .skipif (
68
+ not hasattr (redis , "RedisCluster" ), reason = "RedisCluster not available"
69
+ )
47
70
def test_cluster_handles_max_connections_error ():
48
71
"""
49
72
Test that RedisCluster doesn't reinitialize when MaxConnectionsError is raised
@@ -63,9 +86,11 @@ def test_cluster_handles_max_connections_error():
63
86
connection = mock .MagicMock ()
64
87
65
88
# Patch the get_connection function in the cluster module
66
- with mock .patch (' redis.cluster.get_connection' , return_value = connection ):
89
+ with mock .patch (" redis.cluster.get_connection" , return_value = connection ):
67
90
# Test MaxConnectionsError
68
- connection .send_command .side_effect = redis .MaxConnectionsError ("Too many connections" )
91
+ connection .send_command .side_effect = redis .MaxConnectionsError (
92
+ "Too many connections"
93
+ )
69
94
70
95
# Call the method and check that the exception is raised
71
96
with pytest .raises (redis .MaxConnectionsError ):
0 commit comments