@@ -730,25 +730,27 @@ def ensure_string(key):
730
730
731
731
732
732
class CacheProxyConnection (ConnectionInterface ):
733
+ CACHE_DUMMY_STATUS = "caching-in-progress"
734
+ KEYS_MAPPING_CACHE_SIZE = 10000
735
+
733
736
def __init__ (
734
737
self ,
735
738
conn : ConnectionInterface ,
736
739
cache : CacheInterface ,
737
- conf : CacheConfiguration ,
738
- cache_lock : threading .Lock ,
740
+ conf : CacheConfiguration
739
741
):
740
742
self .pid = os .getpid ()
741
743
self ._conn = conn
742
744
self .retry = self ._conn .retry
743
745
self .host = self ._conn .host
744
746
self .port = self ._conn .port
745
747
self ._cache = cache
746
- self ._cache_lock = cache_lock
748
+ self ._cache_lock = threading . Lock ()
747
749
self ._conf = conf
748
750
self ._current_command_hash = None
749
751
self ._current_command_keys = None
750
752
self ._current_options = None
751
- self ._keys_mapping = LRUCache (maxsize = 10000 )
753
+ self ._keys_mapping = LRUCache (maxsize = self . KEYS_MAPPING_CACHE_SIZE )
752
754
self .register_connect_callback (self ._enable_tracking_callback )
753
755
754
756
def repr_pieces (self ):
@@ -772,6 +774,7 @@ def on_connect(self):
772
774
def disconnect (self , * args ):
773
775
with self ._cache_lock :
774
776
self ._cache .clear ()
777
+ self ._keys_mapping .clear ()
775
778
self ._conn .disconnect (* args )
776
779
777
780
def check_health (self ):
@@ -811,7 +814,7 @@ def send_command(self, *args, **kwargs):
811
814
812
815
# Set temporary entry as a status to prevent
813
816
# race condition from another connection.
814
- self ._cache .set (self ._current_command_hash , "caching-in-progress" )
817
+ self ._cache .set (self ._current_command_hash , self . CACHE_DUMMY_STATUS )
815
818
816
819
# Send command over socket only if it's allowed
817
820
# read-only command that not yet cached.
@@ -827,7 +830,7 @@ def read_response(
827
830
# Check if command response exists in a cache and it's not in progress.
828
831
if (
829
832
self ._cache .exists (self ._current_command_hash )
830
- and self ._cache .get (self ._current_command_hash ) != "caching-in-progress"
833
+ and self ._cache .get (self ._current_command_hash ) != self . CACHE_DUMMY_STATUS
831
834
):
832
835
return copy .deepcopy (self ._cache .get (self ._current_command_hash ))
833
836
@@ -1264,15 +1267,13 @@ def __init__(
1264
1267
self .cache = None
1265
1268
self ._cache_conf = None
1266
1269
self ._cache_factory = cache_factory
1267
- self .cache_lock = None
1268
1270
self .scheduler = None
1269
1271
1270
1272
if connection_kwargs .get ("use_cache" ):
1271
1273
if connection_kwargs .get ("protocol" ) not in [3 , "3" ]:
1272
1274
raise RedisError ("Client caching is only supported with RESP version 3" )
1273
1275
1274
1276
self ._cache_conf = CacheConfiguration (** self .connection_kwargs )
1275
- self ._cache_lock = threading .Lock ()
1276
1277
1277
1278
cache = self .connection_kwargs .get ("cache" )
1278
1279
@@ -1443,8 +1444,7 @@ def make_connection(self) -> "ConnectionInterface":
1443
1444
return CacheProxyConnection (
1444
1445
self .connection_class (** self .connection_kwargs ),
1445
1446
self .cache ,
1446
- self ._cache_conf ,
1447
- self ._cache_lock ,
1447
+ self ._cache_conf
1448
1448
)
1449
1449
1450
1450
return self .connection_class (** self .connection_kwargs )
0 commit comments