From f0673224eb7676b9b45802e1a892a801f1226bd4 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 28 Feb 2025 10:26:08 +0100 Subject: [PATCH] feat: Add a new cluster state "Unhealthy" (#63) * feat: Add simple web-based dashboard for cluster states * changelog * fix: Only hand queries to clusters that are able to accept queries * fix bug * Use askama * clippy * WIP * changelog * Fix typo in changelog * Use backslash instead of
* Update trino-lb/src/scaling/mod.rs Co-authored-by: Techassi * Add rustdoc * Reword changelog * Reword rustdoc --------- Co-authored-by: Techassi --- CHANGELOG.md | 6 +++ trino-lb-core/src/trino_cluster.rs | 6 +++ .../src/maintenance/query_count_fetcher.rs | 42 +++++++++++++++---- trino-lb/src/scaling/mod.rs | 35 ++++++++++++---- 4 files changed, 73 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 914e2f4..37dbe7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ All notable changes to this project will be documented in this file. - Support configuring the scaler reconcile interval ([#61]). - Add simple web-based dashboard that shows the current state and query counts of all clusters. This makes it easier to debug state transitions of clusters ([#62]). +- Add `Unhealthy` cluster state. + This state is entered once the readiness check of a cluster in the `Ready` state fails. + The cluster will remain in the `Unhealthy` state until the scaler marks that cluster as `Ready` again. + `Unhealthy` clusters won't get any new queries; if all clusters are unhealthy, new queries will be queued. + The cluster health check interval can be configured using the scaler reconcile interval ([#63]). ### Changed @@ -21,6 +26,7 @@ All notable changes to this project will be documented in this file. [#57]: https://github.com/stackabletech/trino-lb/pull/57 [#61]: https://github.com/stackabletech/trino-lb/pull/61 [#62]: https://github.com/stackabletech/trino-lb/pull/62 +[#63]: https://github.com/stackabletech/trino-lb/pull/63 ## [0.3.2] - 2024-08-20 diff --git a/trino-lb-core/src/trino_cluster.rs b/trino-lb-core/src/trino_cluster.rs index 8bf4cef..7e5f445 100644 --- a/trino-lb-core/src/trino_cluster.rs +++ b/trino-lb-core/src/trino_cluster.rs @@ -12,6 +12,8 @@ pub enum ClusterState { Starting, /// Up and running, ready to get queries Ready, + /// Up, but not ready to accept queries. It should not be started or stopped, as it's healing itself. + Unhealthy, /// No new queries should be submitted. Once all running queries are finished and a certain time period has passed /// go to `Terminating` Draining { @@ -30,6 +32,7 @@ impl Display for ClusterState { ClusterState::Stopped => f.write_str("Stopped"), ClusterState::Starting => f.write_str("Starting"), ClusterState::Ready => f.write_str("Ready"), + ClusterState::Unhealthy => f.write_str("Unhealthy"), ClusterState::Draining { .. } => f.write_str("Draining"), ClusterState::Terminating => f.write_str("Terminating"), ClusterState::Deactivated => f.write_str("Deactivated"), @@ -45,6 +48,7 @@ impl ClusterState { | ClusterState::Starting | ClusterState::Terminating => ClusterState::Starting, ClusterState::Ready | ClusterState::Draining { .. } => ClusterState::Ready, + ClusterState::Unhealthy => ClusterState::Unhealthy, ClusterState::Deactivated => ClusterState::Deactivated, } } @@ -55,6 +59,7 @@ impl ClusterState { ClusterState::Unknown // No, because it is already started | ClusterState::Starting + | ClusterState::Unhealthy | ClusterState::Ready | ClusterState::Terminating | ClusterState::Deactivated => false, @@ -64,6 +69,7 @@ impl ClusterState { pub fn ready_to_accept_queries(&self) -> bool { match self { ClusterState::Unknown + | ClusterState::Unhealthy | ClusterState::Stopped | ClusterState::Draining { .. } | ClusterState::Terminating diff --git a/trino-lb/src/maintenance/query_count_fetcher.rs b/trino-lb/src/maintenance/query_count_fetcher.rs index 25f93d7..cae2cb4 100644 --- a/trino-lb/src/maintenance/query_count_fetcher.rs +++ b/trino-lb/src/maintenance/query_count_fetcher.rs @@ -136,11 +136,7 @@ impl QueryCountFetcher { self.clusters .iter() .zip(cluster_states) - .filter_map(|(cluster, state)| match state{ - ClusterState::Unknown | ClusterState::Stopped | ClusterState::Starting | ClusterState::Terminating | ClusterState::Deactivated => None, - ClusterState::Ready | ClusterState::Draining{ .. } => Some(cluster), - }) - .map(|cluster| self.process_cluster(cluster)), + .map(|(cluster, state)| self.process_cluster(cluster, state)) ) .await; @@ -152,8 +148,40 @@ impl QueryCountFetcher { } } - #[instrument(skip(self))] - async fn process_cluster(&self, cluster: &TrinoClusterConfig) { + /// Update the query count for the given cluster. + /// + /// - In case the cluster is reachable, fetch the current query count and store it. + /// - In case we know the cluster is not reachable (e.g. the cluster is turned off or currently + /// starting), store a query count of zero (0) to avoid dangling clusters with non-zero query + /// counts. + #[instrument(skip(self, cluster), fields(cluster_name = cluster.name))] + async fn process_cluster(&self, cluster: &TrinoClusterConfig, state: ClusterState) { + match state { + ClusterState::Ready | ClusterState::Unhealthy | ClusterState::Draining { .. } => { + self.fetch_and_store_query_count(cluster).await; + } + ClusterState::Unknown + | ClusterState::Stopped + | ClusterState::Starting + | ClusterState::Terminating + | ClusterState::Deactivated => { + if let Err(err) = self + .persistence + .set_cluster_query_count(&cluster.name, 0) + .await + { + error!( + cluster = cluster.name, + ?err, + "QueryCountFetcher: Failed to set current cluster query count to zero" + ); + } + } + } + } + + #[instrument(skip(self, cluster), fields(cluster_name = cluster.name))] + async fn fetch_and_store_query_count(&self, cluster: &TrinoClusterConfig) { let cluster_info = get_cluster_info(&cluster.endpoint, self.ignore_certs, &cluster.credentials).await; diff --git a/trino-lb/src/scaling/mod.rs b/trino-lb/src/scaling/mod.rs index 2d338b4..bfeef82 100644 --- a/trino-lb/src/scaling/mod.rs +++ b/trino-lb/src/scaling/mod.rs @@ -14,7 +14,7 @@ use tokio::{ task::{JoinError, JoinSet}, time, }; -use tracing::{debug, error, info, instrument, Instrument, Span}; +use tracing::{debug, error, info, instrument, warn, Instrument, Span}; use trino_lb_core::{ config::{Config, ScalerConfig, ScalerConfigImplementation}, trino_cluster::ClusterState, @@ -266,7 +266,7 @@ impl Scaler { Ok(()) } - #[instrument(name = "Scaler::reconcile_cluster_group", skip(self))] + #[instrument(name = "Scaler::reconcile_cluster_group", skip(self, clusters))] pub async fn reconcile_cluster_group( self: Arc, cluster_group: String, @@ -435,7 +435,7 @@ impl Scaler { Ok(()) } - #[instrument(name = "Scaler::get_current_state", skip(self))] + #[instrument(name = "Scaler::get_current_state", skip(self, scaling_config))] async fn get_current_cluster_state( self: Arc, cluster_name: TrinoClusterName, @@ -461,8 +461,8 @@ impl Scaler { // State not known in persistence, so let's determine current state match (activated, ready) { (true, true) => ClusterState::Ready, - // It could also be Terminating, but in that case it would need to be stored as Terminating - // in the persistence + // It could also be Terminating or Unhealthy, but in that case it would need to be stored as + // Terminating or Unhealthy in the persistence (true, false) => ClusterState::Starting, // This might happen for very short time periods. E.g. for the Stackable scaler, this can be // the case when spec.clusterOperation.stopped was just set to true, but trino-operator did @@ -480,8 +480,18 @@ impl Scaler { } } ClusterState::Ready => { - // In the future we might want to check if the cluster is healthy and have a state `Unhealthy`. - ClusterState::Ready + if ready { + ClusterState::Ready + } else { + ClusterState::Unhealthy + } + } + ClusterState::Unhealthy => { + if ready { + ClusterState::Ready + } else { + ClusterState::Unhealthy + } } ClusterState::Draining { last_time_seen_with_queries, @@ -540,7 +550,11 @@ impl Scaler { Ok((cluster_name, current_state)) } - #[instrument(name = "Scaler::apply_target_states", skip(self))] + #[instrument( + name = "Scaler::apply_cluster_target_state", + skip(self, cluster), + fields(%cluster.name) + )] async fn apply_cluster_target_state( self: Arc, cluster: TrinoCluster, @@ -555,7 +569,10 @@ impl Scaler { ClusterState::Stopped | ClusterState::Terminating => { scaler.deactivate(&cluster.name).await?; } - ClusterState::Starting | ClusterState::Ready | ClusterState::Draining { .. } => { + ClusterState::Starting + | ClusterState::Unhealthy + | ClusterState::Ready + | ClusterState::Draining { .. } => { scaler.activate(&cluster.name).await?; } ClusterState::Deactivated => {