@@ -2,16 +2,22 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
22
33use alloy:: primitives:: U256 ;
44use cb_common:: {
5- config:: { HTTP_TIMEOUT_SECONDS_DEFAULT , MUXER_HTTP_MAX_LENGTH , RuntimeMuxConfig } ,
6- interop:: ssv:: utils:: { request_ssv_pubkeys_from_public_api, request_ssv_pubkeys_from_ssv_node} ,
5+ config:: {
6+ HTTP_TIMEOUT_SECONDS_DEFAULT , MUXER_HTTP_MAX_LENGTH , MuxConfig , MuxKeysLoader , PbsMuxes ,
7+ RuntimeMuxConfig ,
8+ } ,
9+ interop:: ssv:: {
10+ types:: { SSVNodeValidator , SSVPublicValidator } ,
11+ utils:: { request_ssv_pubkeys_from_public_api, request_ssv_pubkeys_from_ssv_node} ,
12+ } ,
713 signer:: random_secret,
814 types:: Chain ,
915 utils:: { ResponseReadError , set_ignore_content_length} ,
1016} ;
1117use cb_pbs:: { DefaultBuilderApi , PbsService , PbsState } ;
1218use cb_tests:: {
1319 mock_relay:: { MockRelayState , start_mock_relay_service} ,
14- mock_ssv_node:: create_mock_ssv_node_server,
20+ mock_ssv_node:: { SsvNodeMockState , create_mock_ssv_node_server} ,
1521 mock_ssv_public:: { PublicSsvMockState , TEST_HTTP_TIMEOUT , create_mock_public_ssv_server} ,
1622 mock_validator:: MockValidator ,
1723 utils:: {
@@ -31,7 +37,7 @@ use url::Url;
3137async fn test_ssv_public_network_fetch ( ) -> Result < ( ) > {
3238 // Start the mock server
3339 let port = 30100 ;
34- let _server_handle = create_mock_public_ssv_server ( port, None ) . await ?;
40+ let server_handle = create_mock_public_ssv_server ( port, None ) . await ?;
3541 let url =
3642 Url :: parse ( & format ! ( "http://localhost:{port}/api/v4/test_chain/validators/in_operator/1" ) )
3743 . unwrap ( ) ;
@@ -44,7 +50,7 @@ async fn test_ssv_public_network_fetch() -> Result<()> {
4450 assert_eq ! ( response. validators. len( ) , 3 ) ;
4551 let expected_pubkeys = [
4652 bls_pubkey_from_hex_unchecked (
47- "aa370f6250d421d00437b9900407a7ad93b041aeb7259d99b55ab8b163277746680e93e841f87350737bceee46aa104d " ,
53+ "967ba17a3e7f82a25aa5350ec34d6923e28ad8237b5a41efe2c5e325240d74d87a015bf04634f21900963539c8229b2a " ,
4854 ) ,
4955 bls_pubkey_from_hex_unchecked (
5056 "ac769e8cec802e8ffee34de3253be8f438a0c17ee84bdff0b6730280d24b5ecb77ebc9c985281b41ee3bda8663b6658c" ,
@@ -58,7 +64,7 @@ async fn test_ssv_public_network_fetch() -> Result<()> {
5864 }
5965
6066 // Clean up the server handle
61- _server_handle . abort ( ) ;
67+ server_handle . abort ( ) ;
6268
6369 Ok ( ( ) )
6470}
@@ -265,3 +271,196 @@ async fn test_mux() -> Result<()> {
265271
266272 Ok ( ( ) )
267273}
274+
275+ /// Tests the SSV mux with dynamic registry fetching from an SSV node
276+ #[ tokio:: test]
277+ async fn test_ssv_multi_with_node ( ) -> Result < ( ) > {
278+ // Generate keys
279+ let signer = random_secret ( ) ;
280+ let pubkey = signer. public_key ( ) ;
281+ let signer2 = random_secret ( ) ;
282+ let pubkey2 = signer2. public_key ( ) ;
283+
284+ let chain = Chain :: Hoodi ;
285+ let pbs_port = 3711 ;
286+
287+ // Start the mock SSV node
288+ let ssv_node_port = pbs_port + 1 ;
289+ let ssv_node_url = Url :: parse ( & format ! ( "http://localhost:{ssv_node_port}/v1/" ) ) ?;
290+ let mock_ssv_node_state = SsvNodeMockState {
291+ validators : Arc :: new ( RwLock :: new ( vec ! [
292+ SSVNodeValidator { public_key: pubkey. clone( ) } ,
293+ SSVNodeValidator { public_key: pubkey2. clone( ) } ,
294+ ] ) ) ,
295+ force_timeout : Arc :: new ( RwLock :: new ( false ) ) ,
296+ } ;
297+ let ssv_node_handle =
298+ create_mock_ssv_node_server ( ssv_node_port, Some ( mock_ssv_node_state. clone ( ) ) ) . await ?;
299+
300+ // Start the mock SSV public API
301+ let ssv_public_port = ssv_node_port + 1 ;
302+ let ssv_public_url = Url :: parse ( & format ! ( "http://localhost:{ssv_public_port}/api/v4/" ) ) ?;
303+ let mock_ssv_public_state = PublicSsvMockState {
304+ validators : Arc :: new ( RwLock :: new ( vec ! [ SSVPublicValidator { pubkey: pubkey. clone( ) } ] ) ) ,
305+ force_timeout : Arc :: new ( RwLock :: new ( false ) ) ,
306+ } ;
307+ let ssv_public_handle =
308+ create_mock_public_ssv_server ( ssv_public_port, Some ( mock_ssv_public_state. clone ( ) ) ) . await ?;
309+
310+ // Start a mock relay to be used by the mux
311+ let relay_port = ssv_public_port + 1 ;
312+ let relay = generate_mock_relay ( relay_port, pubkey. clone ( ) ) ?;
313+ let relay_id = relay. id . clone ( ) . to_string ( ) ;
314+ let relay_state = Arc :: new ( MockRelayState :: new ( chain, signer) ) ;
315+ let relay_task = tokio:: spawn ( start_mock_relay_service ( relay_state. clone ( ) , relay_port) ) ;
316+
317+ // Create the registry mux
318+ let loader = MuxKeysLoader :: Registry {
319+ enable_refreshing : true ,
320+ node_operator_id : 1 ,
321+ lido_module_id : None ,
322+ registry : cb_common:: config:: NORegistry :: SSV ,
323+ } ;
324+ let muxes = PbsMuxes {
325+ muxes : vec ! [ MuxConfig {
326+ id: relay_id. clone( ) ,
327+ loader: Some ( loader) ,
328+ late_in_slot_time_ms: Some ( u64 :: MAX ) ,
329+ relays: vec![ ( * relay. config) . clone( ) ] ,
330+ timeout_get_header_ms: Some ( u64 :: MAX - 1 ) ,
331+ validator_pubkeys: vec![ ] ,
332+ } ] ,
333+ } ;
334+
335+ // Set up the PBS config
336+ let mut pbs_config = get_pbs_static_config ( pbs_port) ;
337+ pbs_config. ssv_node_api_url = ssv_node_url. clone ( ) ;
338+ pbs_config. ssv_public_api_url = ssv_public_url. clone ( ) ;
339+ pbs_config. mux_registry_refresh_interval_seconds = 1 ; // Refresh the mux every second
340+ let ( mux_lookup, registry_muxes) = muxes. validate_and_fill ( chain, & pbs_config) . await ?;
341+ let relays = vec ! [ relay. clone( ) ] ; // Default relay only
342+ let mut config = to_pbs_config ( chain, pbs_config, relays) ;
343+ config. all_relays . push ( relay. clone ( ) ) ; // Add the mux relay to just this field
344+ config. mux_lookup = Some ( mux_lookup) ;
345+ config. registry_muxes = Some ( registry_muxes) ;
346+
347+ // Run PBS service
348+ let state = PbsState :: new ( config) ;
349+ let pbs_server = tokio:: spawn ( PbsService :: run :: < ( ) , DefaultBuilderApi > ( state) ) ;
350+ info ! ( "Started PBS server with pubkey {pubkey}" ) ;
351+
352+ // Wait for the server to start
353+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
354+
355+ // Try to run a get_header on the new pubkey, which should use the default
356+ // relay only since it hasn't been seen in the mux yet
357+ let mock_validator = MockValidator :: new ( pbs_port) ?;
358+ info ! ( "Sending get header" ) ;
359+ let res = mock_validator. do_get_header ( Some ( pubkey2. clone ( ) ) ) . await ?;
360+ assert_eq ! ( res. status( ) , StatusCode :: OK ) ;
361+ assert_eq ! ( relay_state. received_get_header( ) , 1 ) ; // pubkey2 was loaded from the SSV node
362+
363+ // Shut down the server handles
364+ pbs_server. abort ( ) ;
365+ ssv_node_handle. abort ( ) ;
366+ ssv_public_handle. abort ( ) ;
367+ relay_task. abort ( ) ;
368+
369+ Ok ( ( ) )
370+ }
371+
372+ /// Tests the SSV mux with dynamic registry fetching from the public SSV API
373+ /// when the local node is down
374+ #[ tokio:: test]
375+ async fn test_ssv_multi_with_public ( ) -> Result < ( ) > {
376+ // Generate keys
377+ let signer = random_secret ( ) ;
378+ let pubkey = signer. public_key ( ) ;
379+ let signer2 = random_secret ( ) ;
380+ let pubkey2 = signer2. public_key ( ) ;
381+
382+ let chain = Chain :: Hoodi ;
383+ let pbs_port = 3720 ;
384+
385+ // Start the mock SSV node
386+ let ssv_node_port = pbs_port + 1 ;
387+ let ssv_node_url = Url :: parse ( & format ! ( "http://localhost:{ssv_node_port}/v1/" ) ) ?;
388+
389+ // Don't start the SSV node server to simulate it being down
390+ // let ssv_node_handle = create_mock_ssv_node_server(ssv_node_port,
391+ // Some(mock_ssv_node_state.clone())).await?;
392+
393+ // Start the mock SSV public API
394+ let ssv_public_port = ssv_node_port + 1 ;
395+ let ssv_public_url = Url :: parse ( & format ! ( "http://localhost:{ssv_public_port}/api/v4/" ) ) ?;
396+ let mock_ssv_public_state = PublicSsvMockState {
397+ validators : Arc :: new ( RwLock :: new ( vec ! [
398+ SSVPublicValidator { pubkey: pubkey. clone( ) } ,
399+ SSVPublicValidator { pubkey: pubkey2. clone( ) } ,
400+ ] ) ) ,
401+ force_timeout : Arc :: new ( RwLock :: new ( false ) ) ,
402+ } ;
403+ let ssv_public_handle =
404+ create_mock_public_ssv_server ( ssv_public_port, Some ( mock_ssv_public_state. clone ( ) ) ) . await ?;
405+
406+ // Start a mock relay to be used by the mux
407+ let relay_port = ssv_public_port + 1 ;
408+ let relay = generate_mock_relay ( relay_port, pubkey. clone ( ) ) ?;
409+ let relay_id = relay. id . clone ( ) . to_string ( ) ;
410+ let relay_state = Arc :: new ( MockRelayState :: new ( chain, signer) ) ;
411+ let relay_task = tokio:: spawn ( start_mock_relay_service ( relay_state. clone ( ) , relay_port) ) ;
412+
413+ // Create the registry mux
414+ let loader = MuxKeysLoader :: Registry {
415+ enable_refreshing : true ,
416+ node_operator_id : 1 ,
417+ lido_module_id : None ,
418+ registry : cb_common:: config:: NORegistry :: SSV ,
419+ } ;
420+ let muxes = PbsMuxes {
421+ muxes : vec ! [ MuxConfig {
422+ id: relay_id. clone( ) ,
423+ loader: Some ( loader) ,
424+ late_in_slot_time_ms: Some ( u64 :: MAX ) ,
425+ relays: vec![ ( * relay. config) . clone( ) ] ,
426+ timeout_get_header_ms: Some ( u64 :: MAX - 1 ) ,
427+ validator_pubkeys: vec![ ] ,
428+ } ] ,
429+ } ;
430+
431+ // Set up the PBS config
432+ let mut pbs_config = get_pbs_static_config ( pbs_port) ;
433+ pbs_config. ssv_node_api_url = ssv_node_url. clone ( ) ;
434+ pbs_config. ssv_public_api_url = ssv_public_url. clone ( ) ;
435+ pbs_config. mux_registry_refresh_interval_seconds = 1 ; // Refresh the mux every second
436+ let ( mux_lookup, registry_muxes) = muxes. validate_and_fill ( chain, & pbs_config) . await ?;
437+ let relays = vec ! [ relay. clone( ) ] ; // Default relay only
438+ let mut config = to_pbs_config ( chain, pbs_config, relays) ;
439+ config. all_relays . push ( relay. clone ( ) ) ; // Add the mux relay to just this field
440+ config. mux_lookup = Some ( mux_lookup) ;
441+ config. registry_muxes = Some ( registry_muxes) ;
442+
443+ // Run PBS service
444+ let state = PbsState :: new ( config) ;
445+ let pbs_server = tokio:: spawn ( PbsService :: run :: < ( ) , DefaultBuilderApi > ( state) ) ;
446+ info ! ( "Started PBS server with pubkey {pubkey}" ) ;
447+
448+ // Wait for the server to start
449+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
450+
451+ // Try to run a get_header on the new pubkey, which should use the default
452+ // relay only since it hasn't been seen in the mux yet
453+ let mock_validator = MockValidator :: new ( pbs_port) ?;
454+ info ! ( "Sending get header" ) ;
455+ let res = mock_validator. do_get_header ( Some ( pubkey2. clone ( ) ) ) . await ?;
456+ assert_eq ! ( res. status( ) , StatusCode :: OK ) ;
457+ assert_eq ! ( relay_state. received_get_header( ) , 1 ) ; // pubkey2 was loaded from the SSV public API
458+
459+ // Shut down the server handles
460+ pbs_server. abort ( ) ;
461+ //ssv_node_handle.abort();
462+ ssv_public_handle. abort ( ) ;
463+ relay_task. abort ( ) ;
464+
465+ Ok ( ( ) )
466+ }
0 commit comments