Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add simple web-based dashboard for cluster states #62

Merged
merged 9 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ All notable changes to this project will be documented in this file.
### Added

- Support configuring the scaler reconcile interval ([#61]).
- Add simple web-based dashboard that shows the current state and query counts of all clusters.
This makes it easier to debug state transitions of clusters ([#62]).

### Changed

Expand All @@ -18,6 +20,7 @@ All notable changes to this project will be documented in this file.

[#57]: https://github.com/stackabletech/trino-lb/pull/57
[#61]: https://github.com/stackabletech/trino-lb/pull/61
[#62]: https://github.com/stackabletech/trino-lb/pull/62

## [0.3.2] - 2024-08-20

Expand Down
95 changes: 95 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ edition = "2021"
repository = "https://github.com/stackabletech/trino-lb"

[workspace.dependencies]
askama = "0.12"
axum = { version = "0.8", features = ["tracing"] }
# If we use the feature "tls-rustls" it will pull in the "aws-lc-rs" crate, which as of 2024-08-16 I did not get to build in the "make run-dev" workflow :/
axum-server = { version = "0.7", features = ["tls-rustls-no-provider"] }
Expand Down
2 changes: 1 addition & 1 deletion trino-lb-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl Default for TrinoLbPortsConfig {
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub enum PersistenceConfig {
InMemory,
InMemory {},
Redis(RedisConfig),
Postgres(PostgresConfig),
}
Expand Down
16 changes: 15 additions & 1 deletion trino-lb-core/src/trino_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::SystemTime;
use std::{fmt::Display, time::SystemTime};

use serde::{Deserialize, Serialize};
use strum::IntoStaticStr;
Expand All @@ -23,6 +23,20 @@ pub enum ClusterState {
Deactivated,
}

impl Display for ClusterState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ClusterState::Unknown => f.write_str("Unknown"),
ClusterState::Stopped => f.write_str("Stopped"),
ClusterState::Starting => f.write_str("Starting"),
ClusterState::Ready => f.write_str("Ready"),
ClusterState::Draining { .. } => f.write_str("Draining"),
ClusterState::Terminating => f.write_str("Terminating"),
ClusterState::Deactivated => f.write_str("Deactivated"),
}
}
}

impl ClusterState {
pub fn start(&self) -> Self {
match self {
Expand Down
2 changes: 2 additions & 0 deletions trino-lb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ default-run = "trino-lb"
trino-lb-core = { path = "../trino-lb-core" }
trino-lb-persistence = { path = "../trino-lb-persistence" }

askama.workspace = true
axum-server.workspace = true
axum.workspace = true
chrono.workspace = true
clap.workspace = true
enum_dispatch.workspace = true
futures.workspace = true
http.workspace = true
indoc.workspace = true
k8s-openapi.workspace = true
kube.workspace = true
main_error.workspace = true
Expand Down
89 changes: 66 additions & 23 deletions trino-lb/src/cluster_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::{debug, instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use trino_lb_core::{
config::Config, sanitization::Sanitize, trino_api::TrinoQueryApiResponse,
trino_query::TrinoQuery,
trino_cluster::ClusterState, trino_query::TrinoQuery,
};
use trino_lb_persistence::{Persistence, PersistenceImplementation};
use url::Url;
Expand Down Expand Up @@ -72,13 +72,19 @@ pub struct ClusterGroupManager {
http_client: Client,
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct TrinoCluster {
pub name: String,
pub max_running_queries: u64,
pub endpoint: Url,
}

#[derive(Clone, Debug)]
pub struct ClusterStats {
pub state: ClusterState,
pub query_counter: u64,
}

pub enum SendToTrinoResponse {
HandedOver {
trino_query_api_response: TrinoQueryApiResponse,
Expand Down Expand Up @@ -251,13 +257,36 @@ impl ClusterGroupManager {
Ok(())
}

/// Tries to find the best cluster from the specified `cluster_group`. If all clusters of the requested group have reached their
/// configured query limit, this function returns [`None`].
/// Tries to find the best cluster from the specified `cluster_group`. If all clusters of the requested group have
/// reached their configured query limit, this function returns [`None`].
#[instrument(skip(self))]
pub async fn try_find_best_cluster_for_group(
&self,
cluster_group: &str,
) -> Result<Option<&TrinoCluster>, Error> {
let cluster_stats = self
.get_cluster_stats_for_cluster_group(cluster_group)
.await?;

let cluster_with_min_queries = cluster_stats
.into_iter()
// Only send queries to clusters that are actually able to accept them
.filter(|(_, stats)| stats.state.ready_to_accept_queries())
// Only send queries to clusters that are not already full
.filter(|(cluster, stats)| stats.query_counter < cluster.max_running_queries)
// Pick the emptiest cluster
.min_by_key(|(_, stats)| stats.query_counter)
.map(|(cluster, _)| cluster);

Ok(cluster_with_min_queries)
}

/// Collect statistics (such as state and query counter) for all Trino clusters in a given clusterGroup
#[instrument(skip(self))]
pub async fn get_cluster_stats_for_cluster_group(
&self,
cluster_group: &str,
) -> Result<HashMap<&TrinoCluster, ClusterStats>, Error> {
let clusters = self
.groups
.get(cluster_group)
Expand All @@ -273,13 +302,6 @@ impl ClusterGroupManager {
.await
.context(ReadCurrentClusterStateForClusterGroupFromPersistenceSnafu { cluster_group })?;

let clusters = clusters
.iter()
.zip(cluster_states)
.filter(|(_, state)| state.ready_to_accept_queries())
.map(|(c, _)| c)
.collect::<Vec<_>>();

let cluster_query_counters = try_join_all(
clusters
.iter()
Expand All @@ -288,21 +310,42 @@ impl ClusterGroupManager {
.await
.context(GetQueryCounterForGroupSnafu { cluster_group })?;

let debug_output = clusters
let cluster_stats = clusters
.iter()
.map(|c| &c.name)
.zip(cluster_query_counters.iter())
.collect::<Vec<_>>();
debug!(query_counters = ?debug_output, "Clusters had the following query counters");

let cluster_with_min_queries = clusters
.into_iter()
.zip(cluster_states)
.zip(cluster_query_counters)
.filter(|(cluster, counter)| *counter < cluster.max_running_queries)
.min_by_key(|(_, counter)| *counter)
.map(|(c, _)| c);
.map(|((trino_cluster, state), query_counter)| {
(
trino_cluster,
ClusterStats {
state,
query_counter,
},
)
})
.collect();

debug!(?cluster_stats, "Clusters had the following stats");

Ok(cluster_stats)
}

Ok(cluster_with_min_queries)
/// Get the stats for all clusters, regardless the cluster group membership
pub async fn get_all_cluster_stats(
&self,
) -> Result<HashMap<&TrinoCluster, ClusterStats>, Error> {
let cluster_stats = try_join_all(
self.groups
.keys()
.map(|cluster_group| self.get_cluster_stats_for_cluster_group(cluster_group)),
)
.await?;

let mut all_cluster_stats = HashMap::new();
for cluster_stat in cluster_stats {
all_cluster_stats.extend(cluster_stat);
}
Ok(all_cluster_stats)
}
}

Expand Down
2 changes: 2 additions & 0 deletions trino-lb/src/http_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub async fn start_http_server(
});

let app = Router::new()
.route("/", get(|| async { Redirect::permanent("/ui/index.html") }))
.route("/v1/statement", post(v1::statement::post_statement))
.route(
"/v1/statement/queued_in_trino_lb/{query_id}/{sequence_number}",
Expand All @@ -119,6 +120,7 @@ pub async fn start_http_server(
"/v1/statement/executing/{query_id}/{slug}/{token}",
delete(v1::statement::delete_trino_executing_statement),
)
.route("/ui/index.html", get(ui::index::get_ui_index))
.route("/ui/query.html", get(ui::query::get_ui_query))
.with_state(app_state);

Expand Down
Loading
Loading