@@ -1102,3 +1102,129 @@ def test_receive_migrated_after_moving(self, pool_class):
1102
1102
finally :
1103
1103
if hasattr (test_redis_client .connection_pool , "disconnect" ):
1104
1104
test_redis_client .connection_pool .disconnect ()
1105
+
1106
+ @pytest .mark .parametrize ("pool_class" , [ConnectionPool , BlockingConnectionPool ])
1107
+ def test_overlapping_moving_events (self , pool_class ):
1108
+ """
1109
+ Test handling of overlapping/duplicate MOVING events (e.g., two MOVING events before the first expires).
1110
+ Ensures that the second MOVING event updates the pool and connections as expected, and that expiry/cleanup works.
1111
+ """
1112
+ test_redis_client = self ._get_client (
1113
+ pool_class , max_connections = 5 , setup_pool_handler = True
1114
+ )
1115
+ try :
1116
+ # Create and release some connections
1117
+ for _ in range (3 ):
1118
+ conn = test_redis_client .connection_pool .get_connection ()
1119
+ test_redis_client .connection_pool .release (conn )
1120
+
1121
+ # Take 2 connections to be in use
1122
+ in_use_connections = []
1123
+ for _ in range (2 ):
1124
+ conn = test_redis_client .connection_pool .get_connection ()
1125
+ in_use_connections .append (conn )
1126
+
1127
+ # Trigger first MOVING event
1128
+ key_moving1 = "key_receive_moving_0"
1129
+ value_moving1 = "value3_0"
1130
+ result1 = test_redis_client .set (key_moving1 , value_moving1 )
1131
+ assert result1 is True
1132
+ self ._validate_conn_kwargs (
1133
+ test_redis_client .connection_pool ,
1134
+ MockSocket .DEFAULT_ADDRESS .split (":" )[0 ],
1135
+ int (MockSocket .DEFAULT_ADDRESS .split (":" )[1 ]),
1136
+ MockSocket .AFTER_MOVING_ADDRESS .split (":" )[0 ],
1137
+ self .config .relax_timeout ,
1138
+ )
1139
+ # Validate all connections reflect the first MOVING event
1140
+ self ._validate_in_use_connections_state (in_use_connections )
1141
+ self ._validate_free_connections_state (
1142
+ test_redis_client .connection_pool ,
1143
+ MockSocket .AFTER_MOVING_ADDRESS .split (":" )[0 ],
1144
+ self .config .relax_timeout ,
1145
+ should_be_connected_count = 1 ,
1146
+ connected_to_tmp_addres = True ,
1147
+ )
1148
+
1149
+ # Before the first MOVING expires, trigger a second MOVING event (simulate new address)
1150
+ # Patch MockSocket to use a new address for the second event
1151
+ new_address = "5.6.7.8:6380"
1152
+ orig_after_moving = MockSocket .AFTER_MOVING_ADDRESS
1153
+ MockSocket .AFTER_MOVING_ADDRESS = new_address
1154
+ try :
1155
+ key_moving2 = "key_receive_moving_1"
1156
+ value_moving2 = "value3_1"
1157
+ result2 = test_redis_client .set (key_moving2 , value_moving2 )
1158
+ assert result2 is True
1159
+ self ._validate_conn_kwargs (
1160
+ test_redis_client .connection_pool ,
1161
+ MockSocket .DEFAULT_ADDRESS .split (":" )[0 ],
1162
+ int (MockSocket .DEFAULT_ADDRESS .split (":" )[1 ]),
1163
+ new_address .split (":" )[0 ],
1164
+ self .config .relax_timeout ,
1165
+ )
1166
+ # Validate all connections reflect the second MOVING event
1167
+ self ._validate_in_use_connections_state (in_use_connections )
1168
+ self ._validate_free_connections_state (
1169
+ test_redis_client .connection_pool ,
1170
+ new_address .split (":" )[0 ],
1171
+ self .config .relax_timeout ,
1172
+ should_be_connected_count = 1 ,
1173
+ connected_to_tmp_addres = True ,
1174
+ )
1175
+ finally :
1176
+ MockSocket .AFTER_MOVING_ADDRESS = orig_after_moving
1177
+
1178
+ # Wait for both MOVING timeouts to expire
1179
+ sleep (MockSocket .MOVING_TIMEOUT + 0.5 )
1180
+ self ._validate_conn_kwargs (
1181
+ test_redis_client .connection_pool ,
1182
+ MockSocket .DEFAULT_ADDRESS .split (":" )[0 ],
1183
+ int (MockSocket .DEFAULT_ADDRESS .split (":" )[1 ]),
1184
+ None ,
1185
+ - 1 ,
1186
+ )
1187
+ finally :
1188
+ if hasattr (test_redis_client .connection_pool , "disconnect" ):
1189
+ test_redis_client .connection_pool .disconnect ()
1190
+
1191
+ @pytest .mark .parametrize ("pool_class" , [ConnectionPool , BlockingConnectionPool ])
1192
+ def test_thread_safety_concurrent_event_handling (self , pool_class ):
1193
+ """
1194
+ Test thread-safety under concurrent maintenance event handling.
1195
+ Simulates multiple threads triggering MOVING events and performing operations concurrently.
1196
+ """
1197
+ import threading
1198
+
1199
+ test_redis_client = self ._get_client (
1200
+ pool_class , max_connections = 5 , setup_pool_handler = True
1201
+ )
1202
+ results = []
1203
+ errors = []
1204
+
1205
+ def worker (idx ):
1206
+ try :
1207
+ key = f"key_receive_moving_{ idx } "
1208
+ value = f"value3_{ idx } "
1209
+ result = test_redis_client .set (key , value )
1210
+ results .append (result )
1211
+ except Exception as e :
1212
+ errors .append (e )
1213
+
1214
+ threads = [threading .Thread (target = worker , args = (i ,)) for i in range (5 )]
1215
+ for t in threads :
1216
+ t .start ()
1217
+ for t in threads :
1218
+ t .join ()
1219
+ assert all (results ), f"Not all threads succeeded: { results } "
1220
+ assert not errors , f"Errors occurred in threads: { errors } "
1221
+ # After all threads, MOVING event should have been handled safely
1222
+ self ._validate_conn_kwargs (
1223
+ test_redis_client .connection_pool ,
1224
+ MockSocket .DEFAULT_ADDRESS .split (":" )[0 ],
1225
+ int (MockSocket .DEFAULT_ADDRESS .split (":" )[1 ]),
1226
+ MockSocket .AFTER_MOVING_ADDRESS .split (":" )[0 ],
1227
+ self .config .relax_timeout ,
1228
+ )
1229
+ if hasattr (test_redis_client .connection_pool , "disconnect" ):
1230
+ test_redis_client .connection_pool .disconnect ()
0 commit comments