Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add simple web-based dashboard for cluster states #62

Merged
merged 9 commits into from
Feb 24, 2025
16 changes: 15 additions & 1 deletion trino-lb-core/src/trino_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::SystemTime;
use std::{fmt::Display, time::SystemTime};

use serde::{Deserialize, Serialize};
use strum::IntoStaticStr;
Expand All @@ -23,6 +23,20 @@ pub enum ClusterState {
Deactivated,
}

impl Display for ClusterState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ClusterState::Unknown => f.write_str("Unknown"),
ClusterState::Stopped => f.write_str("Stopped"),
ClusterState::Starting => f.write_str("Starting"),
ClusterState::Ready => f.write_str("Ready"),
ClusterState::Draining { .. } => f.write_str("Draining"),
ClusterState::Terminating => f.write_str("Terminating"),
ClusterState::Deactivated => f.write_str("Deactivated"),
}
}
}

impl ClusterState {
pub fn start(&self) -> Self {
match self {
Expand Down
1 change: 1 addition & 0 deletions trino-lb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ clap.workspace = true
enum_dispatch.workspace = true
futures.workspace = true
http.workspace = true
indoc.workspace = true
k8s-openapi.workspace = true
kube.workspace = true
main_error.workspace = true
Expand Down
85 changes: 62 additions & 23 deletions trino-lb/src/cluster_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::{debug, instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use trino_lb_core::{
config::Config, sanitization::Sanitize, trino_api::TrinoQueryApiResponse,
trino_query::TrinoQuery,
trino_cluster::ClusterState, trino_query::TrinoQuery,
};
use trino_lb_persistence::{Persistence, PersistenceImplementation};
use url::Url;
Expand Down Expand Up @@ -72,13 +72,19 @@ pub struct ClusterGroupManager {
http_client: Client,
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct TrinoCluster {
pub name: String,
pub max_running_queries: u64,
pub endpoint: Url,
}

#[derive(Clone, Debug)]
pub struct ClusterStats {
pub state: ClusterState,
pub query_counter: u64,
}

pub enum SendToTrinoResponse {
HandedOver {
trino_query_api_response: TrinoQueryApiResponse,
Expand Down Expand Up @@ -251,13 +257,32 @@ impl ClusterGroupManager {
Ok(())
}

/// Tries to find the best cluster from the specified `cluster_group`. If all clusters of the requested group have reached their
/// configured query limit, this function returns [`None`].
/// Tries to find the best cluster from the specified `cluster_group`. If all clusters of the requested group have
/// reached their configured query limit, this function returns [`None`].
#[instrument(skip(self))]
pub async fn try_find_best_cluster_for_group(
&self,
cluster_group: &str,
) -> Result<Option<&TrinoCluster>, Error> {
let cluster_stats = self
.get_cluster_stats_for_cluster_group(cluster_group)
.await?;

let cluster_with_min_queries = cluster_stats
.into_iter()
.filter(|(cluster, stats)| stats.query_counter < cluster.max_running_queries)
.min_by_key(|(_, stats)| stats.query_counter)
.map(|(cluster, _)| cluster);

Ok(cluster_with_min_queries)
}

/// Collect statistics (such as state and query counter) for all Trino clusters in a given clusterGroup
#[instrument(skip(self))]
pub async fn get_cluster_stats_for_cluster_group(
&self,
cluster_group: &str,
) -> Result<HashMap<&TrinoCluster, ClusterStats>, Error> {
let clusters = self
.groups
.get(cluster_group)
Expand All @@ -273,13 +298,6 @@ impl ClusterGroupManager {
.await
.context(ReadCurrentClusterStateForClusterGroupFromPersistenceSnafu { cluster_group })?;

let clusters = clusters
.iter()
.zip(cluster_states)
.filter(|(_, state)| state.ready_to_accept_queries())
.map(|(c, _)| c)
.collect::<Vec<_>>();

let cluster_query_counters = try_join_all(
clusters
.iter()
Expand All @@ -288,21 +306,42 @@ impl ClusterGroupManager {
.await
.context(GetQueryCounterForGroupSnafu { cluster_group })?;

let debug_output = clusters
let cluster_stats = clusters
.iter()
.map(|c| &c.name)
.zip(cluster_query_counters.iter())
.collect::<Vec<_>>();
debug!(query_counters = ?debug_output, "Clusters had the following query counters");

let cluster_with_min_queries = clusters
.into_iter()
.zip(cluster_states)
.zip(cluster_query_counters)
.filter(|(cluster, counter)| *counter < cluster.max_running_queries)
.min_by_key(|(_, counter)| *counter)
.map(|(c, _)| c);
.map(|((trino_cluster, state), query_counter)| {
(
trino_cluster,
ClusterStats {
state,
query_counter,
},
)
})
.collect();

debug!(?cluster_stats, "Clusters had the following stats");

Ok(cluster_stats)
}

Ok(cluster_with_min_queries)
/// Get the stats for all clusters, regardless the cluster group membership
pub async fn get_all_cluster_stats(
&self,
) -> Result<HashMap<&TrinoCluster, ClusterStats>, Error> {
let cluster_stats = try_join_all(
self.groups
.keys()
.map(|cluster_group| self.get_cluster_stats_for_cluster_group(cluster_group)),
)
.await?;

let mut all_cluster_stats = HashMap::new();
for cluster_stat in cluster_stats {
all_cluster_stats.extend(cluster_stat);
}
Ok(all_cluster_stats)
}
}

Expand Down
2 changes: 2 additions & 0 deletions trino-lb/src/http_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub async fn start_http_server(
});

let app = Router::new()
.route("/", get(|| async { Redirect::permanent("/ui/index.html") }))
.route("/v1/statement", post(v1::statement::post_statement))
.route(
"/v1/statement/queued_in_trino_lb/{query_id}/{sequence_number}",
Expand All @@ -119,6 +120,7 @@ pub async fn start_http_server(
"/v1/statement/executing/{query_id}/{slug}/{token}",
delete(v1::statement::delete_trino_executing_statement),
)
.route("/ui/index.html", get(ui::index::get_ui_index))
.route("/ui/query.html", get(ui::query::get_ui_query))
.with_state(app_state);

Expand Down
83 changes: 83 additions & 0 deletions trino-lb/src/http_server/ui/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::{collections::BTreeMap, sync::Arc};

use axum::{
extract::State,
response::{Html, IntoResponse, Response},
};
use http::StatusCode;
use indoc::{formatdoc, indoc};
use opentelemetry::KeyValue;
use snafu::{ResultExt, Snafu};
use tracing::{instrument, warn};

use crate::{cluster_group_manager, http_server::AppState};

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("Failed to get all cluster states"))]
GetAllClusterStates {
source: cluster_group_manager::Error,
},
}

impl IntoResponse for Error {
fn into_response(self) -> Response {
warn!(error = ?self, "Error while processing ui query request");
let status_code = match self {
Error::GetAllClusterStates { .. } => StatusCode::INTERNAL_SERVER_ERROR,
};
(status_code, format!("{self}")).into_response()
}
}

/// Show some information to the user about the query state
#[instrument(name = "GET /ui/index.html", skip(state))]
pub async fn get_ui_index(State(state): State<Arc<AppState>>) -> Result<Html<String>, Error> {
state
.metrics
.http_counter
.add(1, &[KeyValue::new("resource", "get_ui_index")]);

let cluster_stats = state
.cluster_group_manager
.get_all_cluster_stats()
.await
.context(GetAllClusterStatesSnafu)?;

// Sort the clusters alphabetically
let cluster_stats: BTreeMap<_, _> = cluster_stats
.into_iter()
.map(|(cluster, stats)| (&cluster.name, stats))
.collect();

let mut html: String = indoc! {"
<h1>Cluster stats</h1>
<br>
<table>
<tr>
<th>Cluster</th>
<th>State</th>
<th>Query counter</th>
</tr>
"}
.to_owned();

for (cluster, stats) in cluster_stats {
html.push_str(&formatdoc! {"
<tr>
<td>{cluster}</td>
<td>{state}</td>
<td>{query_counter}</td>
</tr>
",
state = stats.state,
query_counter = stats.query_counter,
});
}

html.push_str(indoc! {"
</table>
"});

Ok(Html(html))
}
1 change: 1 addition & 0 deletions trino-lb/src/http_server/ui/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod index;
pub mod query;
Loading