Skip to content

Commit

Permalink
feat: Add a new cluster state "Unhealthy" (#63)
Browse files Browse the repository at this point in the history
* feat: Add simple web-based dashboard for cluster states

* changelog

* fix: Only hand queries to clusters that are able to accept queries

* fix bug

* Use askama

* clippy

* WIP

* changelog

* Fix typo in changelog

* Use backslash instead of <br/>

* Update trino-lb/src/scaling/mod.rs

Co-authored-by: Techassi <[email protected]>

* Add rustdoc

* Reword changelog

* Reword rustdoc

---------

Co-authored-by: Techassi <[email protected]>
  • Loading branch information
sbernauer and Techassi authored Feb 28, 2025
1 parent 6eff077 commit f067322
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 16 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ All notable changes to this project will be documented in this file.
- Support configuring the scaler reconcile interval ([#61]).
- Add simple web-based dashboard that shows the current state and query counts of all clusters.
This makes it easier to debug state transitions of clusters ([#62]).
- Add `Unhealthy` cluster state.
This state is entered once the readiness check of a cluster in the `Ready` state fails.
The cluster will remain in the `Unhealthy` state until the scaler marks that cluster as `Ready` again.
`Unhealthy` clusters won't get any new queries; if all clusters are unhealthy, new queries will be queued.
The cluster health check interval can be configured using the scaler reconcile interval ([#63]).

### Changed

Expand All @@ -21,6 +26,7 @@ All notable changes to this project will be documented in this file.
[#57]: https://github.com/stackabletech/trino-lb/pull/57
[#61]: https://github.com/stackabletech/trino-lb/pull/61
[#62]: https://github.com/stackabletech/trino-lb/pull/62
[#63]: https://github.com/stackabletech/trino-lb/pull/63

## [0.3.2] - 2024-08-20

Expand Down
6 changes: 6 additions & 0 deletions trino-lb-core/src/trino_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub enum ClusterState {
Starting,
/// Up and running, ready to get queries
Ready,
/// Up, but not ready to accept queries. It should not be started or stopped, as it's healing itself.
Unhealthy,
/// No new queries should be submitted. Once all running queries are finished and a certain time period has passed
/// go to `Terminating`
Draining {
Expand All @@ -30,6 +32,7 @@ impl Display for ClusterState {
ClusterState::Stopped => f.write_str("Stopped"),
ClusterState::Starting => f.write_str("Starting"),
ClusterState::Ready => f.write_str("Ready"),
ClusterState::Unhealthy => f.write_str("Unhealthy"),
ClusterState::Draining { .. } => f.write_str("Draining"),
ClusterState::Terminating => f.write_str("Terminating"),
ClusterState::Deactivated => f.write_str("Deactivated"),
Expand All @@ -45,6 +48,7 @@ impl ClusterState {
| ClusterState::Starting
| ClusterState::Terminating => ClusterState::Starting,
ClusterState::Ready | ClusterState::Draining { .. } => ClusterState::Ready,
ClusterState::Unhealthy => ClusterState::Unhealthy,
ClusterState::Deactivated => ClusterState::Deactivated,
}
}
Expand All @@ -55,6 +59,7 @@ impl ClusterState {
ClusterState::Unknown
// No, because it is already started
| ClusterState::Starting
| ClusterState::Unhealthy
| ClusterState::Ready
| ClusterState::Terminating
| ClusterState::Deactivated => false,
Expand All @@ -64,6 +69,7 @@ impl ClusterState {
pub fn ready_to_accept_queries(&self) -> bool {
match self {
ClusterState::Unknown
| ClusterState::Unhealthy
| ClusterState::Stopped
| ClusterState::Draining { .. }
| ClusterState::Terminating
Expand Down
42 changes: 35 additions & 7 deletions trino-lb/src/maintenance/query_count_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,7 @@ impl QueryCountFetcher {
self.clusters
.iter()
.zip(cluster_states)
.filter_map(|(cluster, state)| match state{
ClusterState::Unknown | ClusterState::Stopped | ClusterState::Starting | ClusterState::Terminating | ClusterState::Deactivated => None,
ClusterState::Ready | ClusterState::Draining{ .. } => Some(cluster),
})
.map(|cluster| self.process_cluster(cluster)),
.map(|(cluster, state)| self.process_cluster(cluster, state))
)
.await;

Expand All @@ -152,8 +148,40 @@ impl QueryCountFetcher {
}
}

#[instrument(skip(self))]
async fn process_cluster(&self, cluster: &TrinoClusterConfig) {
/// Update the query count for the given cluster.
///
/// - In case the cluster is reachable, fetch the current query count and store it.
/// - In case we know the cluster is not reachable (e.g. the cluster is turned off or currently
/// starting), store a query count of zero (0) to avoid dangling clusters with non-zero query
/// counts.
#[instrument(skip(self, cluster), fields(cluster_name = cluster.name))]
async fn process_cluster(&self, cluster: &TrinoClusterConfig, state: ClusterState) {
match state {
ClusterState::Ready | ClusterState::Unhealthy | ClusterState::Draining { .. } => {
self.fetch_and_store_query_count(cluster).await;
}
ClusterState::Unknown
| ClusterState::Stopped
| ClusterState::Starting
| ClusterState::Terminating
| ClusterState::Deactivated => {
if let Err(err) = self
.persistence
.set_cluster_query_count(&cluster.name, 0)
.await
{
error!(
cluster = cluster.name,
?err,
"QueryCountFetcher: Failed to set current cluster query count to zero"
);
}
}
}
}

#[instrument(skip(self, cluster), fields(cluster_name = cluster.name))]
async fn fetch_and_store_query_count(&self, cluster: &TrinoClusterConfig) {
let cluster_info =
get_cluster_info(&cluster.endpoint, self.ignore_certs, &cluster.credentials).await;

Expand Down
35 changes: 26 additions & 9 deletions trino-lb/src/scaling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{
task::{JoinError, JoinSet},
time,
};
use tracing::{debug, error, info, instrument, Instrument, Span};
use tracing::{debug, error, info, instrument, warn, Instrument, Span};
use trino_lb_core::{
config::{Config, ScalerConfig, ScalerConfigImplementation},
trino_cluster::ClusterState,
Expand Down Expand Up @@ -266,7 +266,7 @@ impl Scaler {
Ok(())
}

#[instrument(name = "Scaler::reconcile_cluster_group", skip(self))]
#[instrument(name = "Scaler::reconcile_cluster_group", skip(self, clusters))]
pub async fn reconcile_cluster_group(
self: Arc<Self>,
cluster_group: String,
Expand Down Expand Up @@ -435,7 +435,7 @@ impl Scaler {
Ok(())
}

#[instrument(name = "Scaler::get_current_state", skip(self))]
#[instrument(name = "Scaler::get_current_state", skip(self, scaling_config))]
async fn get_current_cluster_state(
self: Arc<Self>,
cluster_name: TrinoClusterName,
Expand All @@ -461,8 +461,8 @@ impl Scaler {
// State not known in persistence, so let's determine current state
match (activated, ready) {
(true, true) => ClusterState::Ready,
// It could also be Terminating, but in that case it would need to be stored as Terminating
// in the persistence
// It could also be Terminating or Unhealthy, but in that case it would need to be stored as
// Terminating or Unhealthy in the persistence
(true, false) => ClusterState::Starting,
// This might happen for very short time periods. E.g. for the Stackable scaler, this can be
// the case when spec.clusterOperation.stopped was just set to true, but trino-operator did
Expand All @@ -480,8 +480,18 @@ impl Scaler {
}
}
ClusterState::Ready => {
// In the future we might want to check if the cluster is healthy and have a state `Unhealthy`.
ClusterState::Ready
if ready {
ClusterState::Ready
} else {
ClusterState::Unhealthy
}
}
ClusterState::Unhealthy => {
if ready {
ClusterState::Ready
} else {
ClusterState::Unhealthy
}
}
ClusterState::Draining {
last_time_seen_with_queries,
Expand Down Expand Up @@ -540,7 +550,11 @@ impl Scaler {
Ok((cluster_name, current_state))
}

#[instrument(name = "Scaler::apply_target_states", skip(self))]
#[instrument(
name = "Scaler::apply_cluster_target_state",
skip(self, cluster),
fields(%cluster.name)
)]
async fn apply_cluster_target_state(
self: Arc<Self>,
cluster: TrinoCluster,
Expand All @@ -555,7 +569,10 @@ impl Scaler {
ClusterState::Stopped | ClusterState::Terminating => {
scaler.deactivate(&cluster.name).await?;
}
ClusterState::Starting | ClusterState::Ready | ClusterState::Draining { .. } => {
ClusterState::Starting
| ClusterState::Unhealthy
| ClusterState::Ready
| ClusterState::Draining { .. } => {
scaler.activate(&cluster.name).await?;
}
ClusterState::Deactivated => {
Expand Down

0 comments on commit f067322

Please sign in to comment.