Skip to content

Commit

Permalink
fix(indexer): implement code review comments from #1303 (#1310)
Browse files Browse the repository at this point in the history
Description
---
Fix indexer config that requires JSONRPC and GraphQL to both be
configured for web to work.
Correct GraphQL url
Clear terminated instances from list
Display settings in swarm UI

Motivation and Context
---

#1303 (review)

How Has This Been Tested?
---
Manually

What process can a PR reviewer use to test or verify this change?
---
Run swarm

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- The node details view now includes a clickable GraphQL endpoint link,
enhancing accessibility.
- Instance management has been improved with capabilities for 
removal and automatic cleanup of terminated instances.
  
- **Bug Fixes**
- Error messages have been refined to provide clearer guidance for
configuration issues, reducing unexpected runtime interruptions.
- Enhanced error handling for invalid URL formats in GraphQL address
retrieval.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
sdbondi authored Feb 26, 2025
1 parent 86f378e commit 619c78e
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 30 deletions.
2 changes: 1 addition & 1 deletion applications/tari_dan_wallet_daemon/src/jrpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async fn handler(
"get" => call_handler(context, value, token, transaction::handle_get).await,
"get_result" => call_handler(context, value, token, transaction::handle_get_result).await,
"wait_result" => call_handler(context, value, token, transaction::handle_wait_result).await,
"get_all" => call_handler(context, value, token, transaction::handle_get_all).await,
"list" | "get_all" => call_handler(context, value, token, transaction::handle_get_all).await,
_ => Ok(value.method_not_found(&value.method)),
},
Some(("accounts", method)) => match method {
Expand Down
7 changes: 5 additions & 2 deletions applications/tari_indexer/src/http_ui/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ const LOG_TARGET: &str = "tari::indexer::web_ui::server";
pub async fn run_http_ui_server(
address: SocketAddr,
json_rpc_address: Url,
graphql_address: Url,
graphql_address: Option<Url>,
) -> Result<(), anyhow::Error> {
let json_rpc_address = Arc::new(json_rpc_address);

let router = Router::new()
.route("/json_rpc_address", get(|| async move { json_rpc_address.to_string() }))
.route("/graphql_address", get(|| async move { graphql_address.to_string() }))
.route(
"/graphql_address",
get(|| async move { graphql_address.map(|a| a.to_string()).unwrap_or_default() }),
)
.fallback(handler);

info!(target: LOG_TARGET, "🕸️ HTTP UI started at {}", address);
Expand Down
40 changes: 23 additions & 17 deletions applications/tari_indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow

// Run the JSON-RPC API
let jrpc_address = config.indexer.json_rpc_address;
if let (Some(jrpc_address), Some(graphql_address)) = (jrpc_address, graphql_address) {
if let Some(jrpc_address) = jrpc_address {
info!(target: LOG_TARGET, "🌐 Started JSON-RPC server on {}", jrpc_address);
let handlers = JsonRpcHandlers::new(
&services,
Expand All @@ -166,28 +166,34 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow
// Run the http ui
if let Some(address) = config.indexer.web_ui_address {
// json rpc
let mut public_jrpc_address = config
let public_jrpc_url = config
.indexer
.web_ui_public_json_rpc_url
.unwrap_or_else(|| jrpc_address.to_string());
if !public_jrpc_address.starts_with("http://") && !public_jrpc_address.starts_with("https://") {
public_jrpc_address = format!("http://{}", public_jrpc_address);
}
let public_jrpc_address = url::Url::parse(&public_jrpc_address)
.map_err(|err| ExitError::new(ExitCode::ConfigError, format!("Invalid public JSON-rpc url: {err}")))?;
.unwrap_or_else(|| format!("http://{jrpc_address}/json_rpc"));
let public_jrpc_address = url::Url::parse(&public_jrpc_url).map_err(|err| {
ExitError::new(
ExitCode::ConfigError,
format!("Invalid public JSON-rpc url '{public_jrpc_url}': {err}"),
)
})?;

// graphql
let mut public_graphql_address = config
let public_graphql_url = config
.indexer
.web_ui_public_graphql_url
.unwrap_or_else(|| graphql_address.to_string());
if !public_graphql_address.starts_with("http://") && !public_graphql_address.starts_with("https://") {
public_graphql_address = format!("http://{}", public_graphql_address);
}
let public_graphql_address = url::Url::parse(&public_graphql_address)
.map_err(|err| ExitError::new(ExitCode::ConfigError, format!("Invalid public GraphQL url: {err}")))?;

task::spawn(run_http_ui_server(address, public_jrpc_address, public_graphql_address));
.filter(|_| graphql_address.is_some())
.or_else(|| graphql_address.map(|a| format!("http://{a}")))
.map(|addr| {
url::Url::parse(&addr).map_err(|err| {
ExitError::new(
ExitCode::ConfigError,
format!("Invalid public GraphQL url '{addr}': {err}"),
)
})
})
.transpose()?;

task::spawn(run_http_ui_server(address, public_jrpc_address, public_graphql_url));
}
}

Expand Down
2 changes: 1 addition & 1 deletion applications/tari_indexer_web_ui/src/utils/graphql.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export async function getGraphQLAddress(): Promise<URL> {
try {
return new URL(url);
} catch (e) {
throw new Error(`Invalid URL: ${url} : {e}`);
throw new Error(`Invalid URL: ${url} : ${e}`);
}
}
} catch (e) {
Expand Down
21 changes: 15 additions & 6 deletions applications/tari_swarm_daemon/src/process_manager/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ pub struct InstanceInfo {
impl InstanceInfo {
pub fn get_public_web_url(&self) -> Url {
match self.settings.get("public_web_url") {
Some(url) => url.parse().expect("Invalid web URL"),
Some(url) => url
.parse()
.expect("Invalid web URL. Please check `public_web_url` in your config."),
None => {
let public_ip = self
.settings
Expand All @@ -100,7 +102,9 @@ impl InstanceInfo {

pub fn get_public_json_rpc_url(&self) -> Url {
match self.settings.get("public_json_rpc_url") {
Some(url) => url.parse().expect("Invalid JSON RPC URL"),
Some(url) => url
.parse()
.expect("Invalid JSON RPC URL. Please check `public_json_rpc_url` in your config."),
None => {
let public_ip = self
.settings
Expand All @@ -117,17 +121,22 @@ impl InstanceInfo {

pub fn get_public_graphql_url(&self) -> Url {
match self.settings.get("public_graphql_url") {
Some(url) => url.parse().expect("Invalid JSON RPC URL"),
Some(url) => url
.parse()
.expect("Invalid GraphQL URL. Please check `public_graphql_url` in your config."),
None => {
let public_ip = self
.settings
.get("public_ip")
.map(|s| s.as_str())
.unwrap_or("127.0.0.1");
let web_port = self.ports.get("jrpc").expect("jrpc port not found");
format!("http://{public_ip}:{web_port}/json_rpc")
let port = self
.ports
.get("graphql")
.expect("Graphql port must be allocated before calling get_graphql_url");
format!("http://{public_ip}:{port}")
.parse()
.expect("Invalid web URL")
.expect("Invalid GraphQL URL")
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,45 @@ impl InstanceManager {
Ok(())
}

pub fn remove_instance(&mut self, id: InstanceId) -> anyhow::Result<()> {
let instance = self
.instances()
.find(|i| i.id() == id)
.ok_or_else(|| anyhow!("Instance not found"))?;

match instance.instance_type() {
InstanceType::MinoTariNode => {
self.minotari_nodes.remove(&id);
},
InstanceType::MinoTariConsoleWallet => {
self.minotari_wallets.remove(&id);
},
InstanceType::MinoTariMiner => {
self.minotari_miners.remove(&id);
},
InstanceType::TariValidatorNode => {
self.validator_nodes.remove(&id);
},
InstanceType::TariIndexer => {
self.indexers.remove(&id);
},
InstanceType::TariSignalingServer => {
self.signaling_servers.remove(&id);
},
InstanceType::TariWalletDaemon => {
self.wallet_daemons.remove(&id);
},
InstanceType::TariWalletDaemonCreateKey => {
self.wallet_daemons.remove(&id);
},
}

// Remove allocated any ports for instance
self.port_allocator.unregister(id);

Ok(())
}

pub fn instances_mut(&mut self) -> impl Iterator<Item = &mut Instance> {
self.minotari_nodes
.values_mut()
Expand Down
48 changes: 45 additions & 3 deletions applications/tari_swarm_daemon/src/process_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
fs::File,
net::SocketAddr,
path::PathBuf,
time::Duration,
time::{Duration, Instant},
};

use anyhow::{anyhow, Context};
Expand All @@ -17,7 +17,7 @@ use tari_dan_engine::wasm::WasmModule;
use tari_engine_types::{calculate_template_binary_hash, TemplateAddress};
use tari_shutdown::ShutdownSignal;
use tari_validator_node_client::types::GetTemplatesRequest;
use tokio::{sync::mpsc, time::sleep};
use tokio::{sync::mpsc, time, time::sleep};
use url::Url;

use crate::{
Expand Down Expand Up @@ -210,6 +210,37 @@ impl ProcessManager {
Ok(())
}

fn clear_terminated_instances(&mut self) -> anyhow::Result<()> {
let mut instances_to_remove = vec![];
for instance in self.instance_manager.instances_mut() {
if let Some(status) = instance.check_running()? {
if status.success() {
info!(
"Instance exited cleanly: {} {}",
instance.name(),
instance.instance_type()
);
} else {
log::error!(
"Instance exited with status {}: {} {}",
status.code().unwrap_or(-1),
instance.name(),
instance.instance_type()
);
}
// We only want to clear the miners, the rest we keep around to display that they are terminated
if instance.instance_type().is_miner() {
instances_to_remove.push(instance.id());
}
}
}

for instance_id in instances_to_remove {
self.instance_manager.remove_instance(instance_id)?;
}
Ok(())
}

pub async fn start(mut self) -> anyhow::Result<()> {
let mut shutdown_signal = self.shutdown_signal.clone();

Expand All @@ -223,13 +254,24 @@ impl ProcessManager {
}
}

let mut check_interval = time::interval_at(
time::Instant::from_std(Instant::now() + Duration::from_secs(10)),
Duration::from_secs(10),
);

loop {
tokio::select! {
Some(req) = self.rx_request.recv() => {
if let Err(err) = self.handle_request(req).await {
log::error!("Error handling request: {:?}", err);
}
}
},

_ = check_interval.tick() => {
if let Err(err) = self.clear_terminated_instances() {
log::error!("Error checking instances: {:?}", err);
}
},

_ = self.shutdown_signal.wait() => {
info!("Shutting down process manager");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ impl PortAllocator {
self.current_port = ports.current_port;
self.instances.insert(instance_id, ports);
}

pub fn unregister(&mut self, instance_id: InstanceId) {
self.instances.remove(&instance_id);
}
}

#[derive(Debug, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_swarm_daemon/src/webserver/rpc/instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ pub struct InstanceInfo {
pub id: InstanceId,
pub name: String,
pub ports: HashMap<&'static str, u16>,
pub settings: HashMap<String, String>,
pub base_path: PathBuf,
pub instance_type: InstanceType,
pub is_running: bool,
Expand All @@ -155,6 +156,7 @@ impl From<crate::process_manager::InstanceInfo> for InstanceInfo {
id: value.id,
name: value.name,
ports: value.ports.into_ports(),
settings: value.settings,
base_path: value.base_path,
instance_type: value.instance_type,
is_running: value.is_running,
Expand Down
7 changes: 7 additions & 0 deletions applications/tari_swarm_daemon/webui/src/routes/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,12 @@ function ShowInfo(params: any) {
<a href={`${node.jrpc}`} target="_blank">{node.jrpc}</a>
</div>
);
const graphQLInfo = node?.graphql && (
<div>
<b>GraphQL</b>
<a href={`${node.graphql}`} target="_blank">{node.graphql}</a>
</div>
);
const grpcInfo = node?.grpc && (
<div>
<b>GRPC</b>
Expand Down Expand Up @@ -331,6 +337,7 @@ function ShowInfo(params: any) {
{nameInfo}
{httpInfo}
{jrpcInfo}
{graphQLInfo}
{grpcInfo}
{showLogs && logInfo}
{executable === Executable.ValidatorNode && node?.jrpc &&
Expand Down

0 comments on commit 619c78e

Please sign in to comment.