Skip to content

Commit

Permalink
feat: Support configuring the scaler reconcile interval (#61)
Browse files Browse the repository at this point in the history
* feat: Support configuring the scaler reconcile interval

* changelog

* Remove uneeded braces

* Move into let
  • Loading branch information
sbernauer authored Feb 21, 2025
1 parent ac08608 commit e0d4e3b
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 22 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Added

- Support configuring the scaler reconcile interval ([#61]).

### Changed

- Set defaults to oci ([#57]).
Expand All @@ -13,6 +17,7 @@ All notable changes to this project will be documented in this file.
- Reduce max poll delay from 10s to 3s to have better client responsiveness

[#57]: https://github.com/stackabletech/trino-lb/pull/57
[#61]: https://github.com/stackabletech/trino-lb/pull/61

## [0.3.2] - 2024-08-20

Expand Down
13 changes: 7 additions & 6 deletions deploy/helm/trino-lb/configs/trino-lb-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ trinoClusterGroups:
- name: trino-m-2
endpoint: https://trino-m-2-coordinator-default.default.svc.cluster.local:8443
credentials: *common-credentials
oidc:
maxRunningQueries: 3
trinoClusters:
- name: trino-oidc-1
endpoint: https://5.250.182.203:8443
credentials: *common-credentials
# oidc:
# maxRunningQueries: 3
# trinoClusters:
# - name: trino-oidc-1
# endpoint: https://5.250.182.203:8443
# credentials: *common-credentials
trinoClusterGroupsIgnoreCert: true

routers:
Expand Down Expand Up @@ -100,6 +100,7 @@ routers:
routingFallback: m

clusterAutoscaler:
reconcileInterval: 1s
stackable:
clusters:
trino-s-1:
Expand Down
4 changes: 2 additions & 2 deletions deploy/helm/trino-lb/templates/trinos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ metadata:
namespace: default
spec:
image:
productVersion: "451"
productVersion: "455"
clusterConfig:
catalogLabelSelector:
matchLabels:
Expand Down Expand Up @@ -40,7 +40,7 @@ metadata:
namespace: default
spec:
image:
productVersion: "451"
productVersion: "455"
clusterConfig:
catalogLabelSelector:
matchLabels:
Expand Down
20 changes: 18 additions & 2 deletions trino-lb-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl Default for TrinoLbPortsConfig {
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub enum PersistenceConfig {
InMemory {},
InMemory,
Redis(RedisConfig),
Postgres(PostgresConfig),
}
Expand Down Expand Up @@ -298,7 +298,23 @@ pub enum TagMatchingStrategy {

#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub enum ScalerConfig {
pub struct ScalerConfig {
#[serde(
with = "humantime_serde",
default = "default_scaler_reconcile_interval"
)]
pub reconcile_interval: Duration,
#[serde(flatten)]
pub implementation: ScalerConfigImplementation,
}

fn default_scaler_reconcile_interval() -> Duration {
Duration::from_secs(5)
}

#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub enum ScalerConfigImplementation {
Stackable(StackableScalerConfig),
}

Expand Down
2 changes: 1 addition & 1 deletion trino-lb/src/http_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum Error {
#[snafu(display(
"In case https is used the `tls.certPemFile` and `tls.keyPemFile` options must be set"
))]
CertsMissing {},
CertsMissing,
}

pub struct AppState {
Expand Down
2 changes: 1 addition & 1 deletion trino-lb/src/http_server/ui/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::http_server::AppState;
#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("Query ID missing. It needs to be specified as query parameter such as https://127.0.0.1:8443/ui/query.html?trino_lb_20231227_122313_2JzDa3bT"))]
QueryIdMissing {},
QueryIdMissing,

#[snafu(display("Query with ID {query_id:?} not found. Maybe the query is not queued any more but was handed over to a Trino cluster."))]
QueryIdNotFound {
Expand Down
7 changes: 5 additions & 2 deletions trino-lb/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ mod trino_client;
#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("Failed to install rustls crypto provider"))]
InstallRustlsCryptoProvider {},
InstallRustlsCryptoProvider,

#[snafu(display("Failed to set up tracing"))]
SetUpTracing { source: tracing::Error },
Expand Down Expand Up @@ -62,6 +62,9 @@ pub enum Error {
#[snafu(display("Failed to create scaler"))]
CreateScaler { source: scaling::Error },

#[snafu(display("Failed to start scaler"))]
StartScaler { source: scaling::Error },

#[snafu(display("Failed to start HTTP server"))]
StartHttpServer { source: http_server::Error },
}
Expand Down Expand Up @@ -157,7 +160,7 @@ async fn start() -> Result<(), MainError> {
let scaler = Scaler::new(&config, Arc::clone(&persistence))
.await
.context(CreateScalerSnafu)?;
scaler.start_loop();
scaler.start_loop().context(StartScalerSnafu)?;

let query_count_fetcher = QueryCountFetcher::new(
Arc::clone(&persistence),
Expand Down
2 changes: 1 addition & 1 deletion trino-lb/src/scaling/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub enum Error {
InvalidTimeRange { time_range: String },

#[snafu(display("Any weekdays other tha \"Mon - Son\" are not supported yet"))]
WeekdaysNotSupportedYet {},
WeekdaysNotSupportedYet,

#[snafu(display(
"Please configure a drainIdleDurationBeforeShutdown of at least {min_duration:?}"
Expand Down
28 changes: 21 additions & 7 deletions trino-lb/src/scaling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::{
};
use tracing::{debug, error, info, instrument, Instrument, Span};
use trino_lb_core::{
config::{Config, ScalerConfig},
config::{Config, ScalerConfig, ScalerConfigImplementation},
trino_cluster::ClusterState,
TrinoClusterName,
};
Expand Down Expand Up @@ -109,7 +109,10 @@ pub enum Error {
JoinGetCurrentClusterStateTask { source: JoinError },

#[snafu(display("The variable \"scaler\" is None. This should never happen, as we only run the reconciliation when a scaler is configured!"))]
ScalerVariableIsNone {},
ScalerVariableIsNone,

#[snafu(display("The scaler config is missing. This is a bug in trino-lb, as it should exist at this particular code path"))]
ScalerConfigMissing,
}

/// The scaler periodically
Expand All @@ -118,6 +121,8 @@ pub enum Error {
/// 2. In case scaling is enabled for a cluster group it supervises the load and scaled the number of clusters
/// accordingly
pub struct Scaler {
/// The original config passed by the user
config: Option<ScalerConfig>,
/// In case this is [`None`], no scaling at all is configured.
scaler: Option<ScalerImplementation>,
persistence: Arc<PersistenceImplementation>,
Expand Down Expand Up @@ -153,14 +158,15 @@ impl Scaler {
// Cluster groups that don't need scaling are missing from the `scaling_config`.
}

Some(match scaler {
ScalerConfig::Stackable(scaler_config) => {
let scaler = match &scaler.implementation {
ScalerConfigImplementation::Stackable(scaler_config) => {
StackableScaler::new(scaler_config, &config.trino_cluster_groups)
.await
.context(CreateStackableAutoscalerSnafu)?
.into()
}
})
};
Some(scaler)
}
};

Expand Down Expand Up @@ -192,13 +198,19 @@ impl Scaler {
persistence,
groups,
scaling_config,
config: config.cluster_autoscaler.clone(),
})
}

pub fn start_loop(self) {
pub fn start_loop(self) -> Result<(), Error> {
if self.scaler.is_some() {
// As there is a scaler configured, let's start it normally.
let mut interval = time::interval(Duration::from_secs(10));
let interval = self
.config
.as_ref()
.context(ScalerConfigMissingSnafu)?
.reconcile_interval;
let mut interval = time::interval(interval);
interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);

let me = Arc::new(self);
Expand Down Expand Up @@ -230,6 +242,8 @@ impl Scaler {
}
});
}

Ok(())
}

#[instrument(name = "Scaler::reconcile", skip(self))]
Expand Down

0 comments on commit e0d4e3b

Please sign in to comment.