Skip to content

Commit bf11a07

Browse files
committed
feat: add reload_plugin and unload_plugin admin RPCs
1 parent c67efdb commit bf11a07

File tree

3 files changed

+194
-35
lines changed

3 files changed

+194
-35
lines changed

crates/core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ pub async fn start_local_surfnet(
5454
#[derive(Debug)]
5555
pub enum PluginManagerCommand {
5656
LoadConfig(Uuid, PluginConfig, Sender<String>),
57+
UnloadPlugin(Uuid, Sender<Result<(), String>>),
58+
ReloadPlugin(Uuid, PluginConfig, Sender<Result<String, String>>),
5759
}
5860

5961
#[cfg(test)]

crates/core/src/rpc/admin.rs

Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -770,15 +770,71 @@ impl AdminRpc for SurfpoolAdminRpc {
770770

771771
fn reload_plugin(
772772
&self,
773-
_meta: Self::Metadata,
774-
_name: String,
775-
_config_file: String,
773+
meta: Self::Metadata,
774+
name: String,
775+
config_file: String,
776776
) -> BoxFuture<Result<()>> {
777-
not_implemented_err_async("reload_plugin")
777+
// Parse the UUID from the name parameter
778+
let uuid = match Uuid::parse_str(&name) {
779+
Ok(uuid) => uuid,
780+
Err(e) => return Box::pin(async move {
781+
Err(jsonrpc_core::Error::invalid_params(format!("Invalid UUID: {}", e)))
782+
}),
783+
};
784+
785+
// Parse the new configuration
786+
let config = match serde_json::from_str::<PluginConfig>(&config_file)
787+
.map_err(|e| format!("failed to deserialize plugin config: {e}"))
788+
{
789+
Ok(config) => config,
790+
Err(e) => return Box::pin(async move { Err(jsonrpc_core::Error::invalid_params(&e)) }),
791+
};
792+
793+
let Some(ctx) = meta else {
794+
return Box::pin(async move { Err(jsonrpc_core::Error::internal_error()) });
795+
};
796+
797+
let (tx, rx) = crossbeam_channel::bounded(1);
798+
let _ = ctx
799+
.plugin_manager_commands_tx
800+
.send(PluginManagerCommand::ReloadPlugin(uuid, config, tx));
801+
802+
let Ok(result) = rx.recv_timeout(Duration::from_secs(10)) else {
803+
return Box::pin(async move { Err(jsonrpc_core::Error::internal_error()) });
804+
};
805+
806+
Box::pin(async move {
807+
result
808+
.map(|_| ()) // Convert Ok(String) to Ok(())
809+
.map_err(|e| jsonrpc_core::Error::invalid_params(&e))
810+
})
778811
}
779812

780-
fn unload_plugin(&self, _meta: Self::Metadata, _name: String) -> BoxFuture<Result<()>> {
781-
not_implemented_err_async("unload_plugin")
813+
fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
814+
// Parse the UUID from the name parameter
815+
let uuid = match Uuid::parse_str(&name) {
816+
Ok(uuid) => uuid,
817+
Err(e) => return Box::pin(async move {
818+
Err(jsonrpc_core::Error::invalid_params(format!("Invalid UUID: {}", e)))
819+
}),
820+
};
821+
822+
let Some(ctx) = meta else {
823+
return Box::pin(async move { Err(jsonrpc_core::Error::internal_error()) });
824+
};
825+
826+
let (tx, rx) = crossbeam_channel::bounded(1);
827+
let _ = ctx
828+
.plugin_manager_commands_tx
829+
.send(PluginManagerCommand::UnloadPlugin(uuid, tx));
830+
831+
let Ok(result) = rx.recv_timeout(Duration::from_secs(10)) else {
832+
return Box::pin(async move { Err(jsonrpc_core::Error::internal_error()) });
833+
};
834+
835+
Box::pin(async move {
836+
result.map_err(|e| jsonrpc_core::Error::invalid_params(&e))
837+
})
782838
}
783839

784840
fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
@@ -798,7 +854,9 @@ impl AdminRpc for SurfpoolAdminRpc {
798854
let Ok(endpoint_url) = rx.recv_timeout(Duration::from_secs(10)) else {
799855
return Box::pin(async move { Err(jsonrpc_core::Error::internal_error()) });
800856
};
801-
Box::pin(async move { Ok(endpoint_url) })
857+
// Return a JSON string containing both UUID and endpoint URL
858+
let response = format!(r#"{{"uuid": "{}", "endpoint": "{}"}}"#, uuid, endpoint_url);
859+
Box::pin(async move { Ok(response) })
802860
}
803861

804862
fn list_plugins(&self, _meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {

crates/core/src/runloops/mod.rs

Lines changed: 127 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,10 @@ fn start_geyser_runloop(
314314

315315
let mut surfpool_plugin_manager: Vec<Box<dyn GeyserPlugin>> = vec![];
316316

317+
// Map between each plugin's UUID to its position (index) in the surfpool_plugin_manager Vec.
318+
// Allows for easier reload/unload
319+
let mut plugin_uuid_map: HashMap<crate::Uuid, usize> = HashMap::new();
320+
317321
#[cfg(feature = "geyser_plugin")]
318322
for plugin_config_path in plugin_config_paths.into_iter() {
319323
let plugin_manifest_location = FileLocation::from_path(plugin_config_path);
@@ -367,6 +371,92 @@ fn start_geyser_runloop(
367371

368372
let ipc_router = RouterProxy::new();
369373

374+
// Helper function to load a subgraph plugin
375+
#[cfg(feature = "subgraph")]
376+
let load_subgraph_plugin = |uuid: uuid::Uuid,
377+
config: txtx_addon_network_svm_types::subgraph::PluginConfig,
378+
surfpool_plugin_manager: &mut Vec<Box<dyn GeyserPlugin>>,
379+
plugin_uuid_map: &mut HashMap<uuid::Uuid, usize>,
380+
indexing_enabled: &mut bool|
381+
-> Result<String, String> {
382+
let _ = subgraph_commands_tx.send(SubgraphCommand::CreateCollection(
383+
uuid,
384+
config.data.clone(),
385+
crossbeam_channel::bounded(0).0, // Temporary sender, will be replaced
386+
));
387+
let mut plugin = SurfpoolSubgraphPlugin::default();
388+
389+
let (server, ipc_token) =
390+
IpcOneShotServer::<IpcReceiver<DataIndexingCommand>>::new()
391+
.expect("Failed to create IPC one-shot server.");
392+
let subgraph_plugin_config = SubgraphPluginConfig {
393+
uuid,
394+
ipc_token,
395+
subgraph_request: config.data.clone(),
396+
};
397+
398+
let config_file = serde_json::to_string(&subgraph_plugin_config)
399+
.map_err(|e| format!("Failed to serialize subgraph plugin config: {:?}", e))?;
400+
401+
plugin
402+
.on_load(&config_file, false)
403+
.map_err(|e| format!("Failed to load Geyser plugin: {:?}", e))?;
404+
405+
if let Ok((_, rx)) = server.accept() {
406+
let subgraph_rx = ipc_router
407+
.route_ipc_receiver_to_new_crossbeam_receiver::<DataIndexingCommand>(rx);
408+
let _ = subgraph_commands_tx.send(SubgraphCommand::ObserveCollection(subgraph_rx));
409+
};
410+
411+
*indexing_enabled = true;
412+
413+
let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
414+
let plugin_index = surfpool_plugin_manager.len();
415+
surfpool_plugin_manager.push(plugin);
416+
plugin_uuid_map.insert(uuid, plugin_index);
417+
418+
let _ = simnet_events_tx.send(SimnetEvent::PluginLoaded("surfpool-subgraph".into()));
419+
Ok(format!("http://localhost:8899/subgraph/{}", uuid)) // Return endpoint URL
420+
};
421+
422+
// Helper function to unload a plugin by UUID
423+
#[cfg(feature = "subgraph")]
424+
let unload_plugin_by_uuid = |uuid: uuid::Uuid,
425+
surfpool_plugin_manager: &mut Vec<Box<dyn GeyserPlugin>>,
426+
plugin_uuid_map: &mut HashMap<uuid::Uuid, usize>,
427+
indexing_enabled: &mut bool|
428+
-> Result<(), String> {
429+
let plugin_index = *plugin_uuid_map
430+
.get(&uuid)
431+
.ok_or_else(|| format!("Plugin {} not found", uuid))?;
432+
433+
if plugin_index >= surfpool_plugin_manager.len() {
434+
return Err(format!("Plugin index {} out of bounds", plugin_index));
435+
}
436+
437+
// Call on_unload before removing
438+
surfpool_plugin_manager[plugin_index].on_unload();
439+
440+
// Remove the plugin from the list
441+
surfpool_plugin_manager.remove(plugin_index);
442+
plugin_uuid_map.remove(&uuid);
443+
444+
// Update all UUIDs that had indices after the removed one
445+
for (_, idx) in plugin_uuid_map.iter_mut() {
446+
if *idx > plugin_index {
447+
*idx -= 1;
448+
}
449+
}
450+
451+
// Check if we should disable indexing
452+
if surfpool_plugin_manager.is_empty() {
453+
*indexing_enabled = false;
454+
}
455+
456+
let _ = simnet_events_tx.send(SimnetEvent::info(format!("Plugin {} unloaded", uuid)));
457+
Ok(())
458+
};
459+
370460
let err = loop {
371461
use agave_geyser_plugin_interface::geyser_plugin_interface::{ReplicaAccountInfoV3, ReplicaAccountInfoVersions};
372462

@@ -383,37 +473,46 @@ fn start_geyser_runloop(
383473
}
384474
#[cfg(feature = "subgraph")]
385475
PluginManagerCommand::LoadConfig(uuid, config, notifier) => {
386-
let _ = subgraph_commands_tx.send(SubgraphCommand::CreateCollection(uuid, config.data.clone(), notifier));
387-
let mut plugin = SurfpoolSubgraphPlugin::default();
388-
389-
let (server, ipc_token) = IpcOneShotServer::<IpcReceiver<DataIndexingCommand>>::new().expect("Failed to create IPC one-shot server.");
390-
let subgraph_plugin_config = SubgraphPluginConfig {
391-
uuid,
392-
ipc_token,
393-
subgraph_request: config.data.clone()
394-
};
395-
396-
let config_file = match serde_json::to_string(&subgraph_plugin_config) {
397-
Ok(c) => c,
476+
match load_subgraph_plugin(uuid, config, &mut surfpool_plugin_manager, &mut plugin_uuid_map, &mut indexing_enabled) {
477+
Ok(endpoint_url) => {
478+
let _ = notifier.send(endpoint_url);
479+
}
398480
Err(e) => {
399-
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to serialize subgraph plugin config: {:?}", e)));
400-
continue;
481+
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to load plugin: {}", e)));
401482
}
402-
};
403-
404-
if let Err(e) = plugin.on_load(&config_file, false) {
405-
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to load Geyser plugin: {:?}", e)));
406-
};
407-
if let Ok((_, rx)) = server.accept() {
408-
let subgraph_rx = ipc_router.route_ipc_receiver_to_new_crossbeam_receiver::<DataIndexingCommand>(rx);
409-
let _ = subgraph_commands_tx.send(SubgraphCommand::ObserveCollection(subgraph_rx));
410-
};
411-
412-
indexing_enabled = true;
483+
}
484+
}
485+
#[cfg(not(feature = "subgraph"))]
486+
PluginManagerCommand::UnloadPlugin(_, _) => {
487+
continue;
488+
}
489+
#[cfg(feature = "subgraph")]
490+
PluginManagerCommand::UnloadPlugin(uuid, notifier) => {
491+
let result = unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_uuid_map, &mut indexing_enabled);
492+
let _ = notifier.send(result);
493+
}
494+
#[cfg(not(feature = "subgraph"))]
495+
PluginManagerCommand::ReloadPlugin(_, _, _) => {
496+
continue;
497+
}
498+
#[cfg(feature = "subgraph")]
499+
PluginManagerCommand::ReloadPlugin(uuid, config, notifier) => {
500+
// First, unload the old plugin
501+
if let Err(e) = unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_uuid_map, &mut indexing_enabled) {
502+
let _ = notifier.send(Err(e));
503+
continue;
504+
}
413505

414-
let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
415-
surfpool_plugin_manager.push(plugin);
416-
let _ = simnet_events_tx.send(SimnetEvent::PluginLoaded("surfpool-subgraph".into()));
506+
// Then, load the new plugin with the same UUID
507+
match load_subgraph_plugin(uuid, config, &mut surfpool_plugin_manager, &mut plugin_uuid_map, &mut indexing_enabled) {
508+
Ok(endpoint_url) => {
509+
let _ = notifier.send(Ok(endpoint_url));
510+
let _ = simnet_events_tx.send(SimnetEvent::info(format!("Plugin {} reloaded", uuid)));
511+
}
512+
Err(e) => {
513+
let _ = notifier.send(Err(e));
514+
}
515+
}
417516
}
418517
}
419518
},

0 commit comments

Comments
 (0)