1+ import logging , time
2+ from valkey_bloom_test_case import ValkeyBloomTestCaseBase
3+ from valkeytests .conftest import resource_port_tracker
4+
5+ class TestKeyEventNotifications (ValkeyBloomTestCaseBase ):
6+ RESERVE_KEYSPACE_MESSAGE = {'type' : 'pmessage' , 'pattern' : b'__key*__:*' , 'channel' : b'__keyspace@0__:intermediate_val' , 'data' : b'bloom.reserve' }
7+ RESERVE_KEYEVENT_MESSAGE = {'type' : 'pmessage' , 'pattern' : b'__key*__:*' , 'channel' : b'__keyevent@0__:bloom.reserve' , 'data' : b'intermediate_val' }
8+ ADD_KEYSPACE_MESSAGE = {'type' : 'pmessage' , 'pattern' : b'__key*__:*' , 'channel' : b'__keyspace@0__:intermediate_val' , 'data' : b'bloom.add' }
9+ ADD_KEYEVENT_MESSAGE = {'type' : 'pmessage' , 'pattern' : b'__key*__:*' , 'channel' : b'__keyevent@0__:bloom.add' , 'data' : b'intermediate_val' }
10+
11+ def create_expected_message_list (self , reserve_expected , add_expected , key_name ):
12+ expected_messages = []
13+ self .RESERVE_KEYSPACE_MESSAGE ['channel' ] = f"__keyspace@0__:{ key_name } " .encode ('utf-8' )
14+ self .RESERVE_KEYEVENT_MESSAGE ['data' ] = f"{ key_name } " .encode ('utf-8' )
15+ self .ADD_KEYSPACE_MESSAGE ['channel' ] = f"__keyspace@0__:{ key_name } " .encode ('utf-8' )
16+ self .ADD_KEYEVENT_MESSAGE ['data' ] = f"{ key_name } " .encode ('utf-8' )
17+ if reserve_expected :
18+ expected_messages .append (self .RESERVE_KEYEVENT_MESSAGE )
19+ expected_messages .append (self .RESERVE_KEYSPACE_MESSAGE )
20+ if add_expected :
21+ expected_messages .append (self .ADD_KEYSPACE_MESSAGE )
22+ expected_messages .append (self .ADD_KEYEVENT_MESSAGE )
23+ return expected_messages
24+
25+ def check_response (self , result_messages , expected_messages ):
26+ extra_message = self .keyspace_client_subscribe .get_message ()
27+ if extra_message :
28+ assert False , f"Unexpected extra message returned: { extra_message } "
29+ for message in expected_messages :
30+ assert message in result_messages , f"{ message } was not found in messages received"
31+
32+ def get_subscribe_client_messages (self , client , cmd , expected_message_count ):
33+ client .execute_command (cmd )
34+ count = 0
35+ messages = []
36+ timeout = time .time () + 5
37+ while expected_message_count != count :
38+ message = self .keyspace_client_subscribe .get_message ()
39+ if message :
40+ # Only for the first time we get messages we should skip the first message gotten
41+ if count > 0 or "BF.ADD" not in cmd :
42+ messages .append (message )
43+ count = count + 1
44+ if timeout < time .time ():
45+ assert False , f"The number of expected messages failed tor eturn in time, messages received so far { messages } "
46+ return messages
47+
48+ def test_keyspace_bloom_commands (self ):
49+ self .create_subscribe_clients ()
50+ # The first call to get messages will return message that shows we subscribed to messages so we expect one more message than we need to check for
51+ # the first time we look at messages
52+ bloom_commands = [
53+ ('BF.ADD add_test key' , True , True , 5 ),
54+ ('BF.MADD madd_test key1 key2' , True , True , 4 ),
55+ ('BF.EXISTS exists_test key' , False , False , 0 ),
56+ ('BF.INSERT insert_test ITEMS key1 key2' , True , True , 4 ),
57+ ('BF.RESERVE reserve_test 0.01 1000' , True , False , 2 )
58+ ]
59+
60+ for command , reserve_expected , add_expected , expected_message_count in bloom_commands :
61+ expected_messages = self .create_expected_message_list (reserve_expected , add_expected , command .split ()[1 ]) if reserve_expected else []
62+ result_messages = self .get_subscribe_client_messages (self .keyspace_client , command , expected_message_count )
63+ self .check_response (result_messages , expected_messages )
64+
65+ def create_subscribe_clients (self ):
66+ self .keyspace_client = self .server .get_new_client ()
67+ self .keyspace_client_subscribe = self .keyspace_client .pubsub ()
68+ self .keyspace_client_subscribe .psubscribe ('__key*__:*' )
69+ self .keyspace_client .execute_command ('CONFIG' ,'SET' ,'notify-keyspace-events' , 'KEA' )
70+
0 commit comments