From 412bf4ed4fb4178ce25ae020dc69248656595c08 Mon Sep 17 00:00:00 2001
From: Thomas Hanley <thanley@notablehealth.com>
Date: Tue, 25 Feb 2025 12:49:05 -0800
Subject: [PATCH] refactor(server): improve statement timeout and parameter
 handling

- Add statement timeout configuration at pool level (pools.<pool_name>.statement_timeout)
- Add logging for statement timeout configuration during pool initialization
- Improve sync_parameters method to handle statement timeout consistently
- Document statement timeout configuration in CONFIG.md
- Change string initialization from String::from() to String::new()
- Maintain cleanup state until connection check-in for proper connection pooling

The statement timeout setting leverages PostgreSQL's native statement_timeout
parameter, allowing the database to handle query termination automatically.
This ensures reliable timeout enforcement directly at the database level,
rather than managing timeouts in the connection pooler. Pool-level settings
take precedence over user-level settings for consistent timeout enforcement
across all pool users.
---
 CONFIG.md                  | 10 ++++++++++
 examples/docker/pgcat.toml |  3 +++
 src/config.rs              | 18 ++++++++++++++++++
 src/server.rs              | 19 +++++++++++++------
 4 files changed, 44 insertions(+), 6 deletions(-)

diff --git a/CONFIG.md b/CONFIG.md
index b6e16a7c..d6c5e8b2 100644
--- a/CONFIG.md
+++ b/CONFIG.md
@@ -458,6 +458,16 @@ default: 3000
 
 Connect timeout can be overwritten in the pool
 
+### statement_timeout
+
+```
+path: pools.<pool_name>.statement_timeout
+default: 0
+```
+
+Statement Timeout.
+The `statement_timeout` setting controls the maximum amount of time (in milliseconds) that any query is allowed to run before being cancelled. When set, this allows Postgres to manage the statement_timeout. [See Postgres Docs](https://www.postgresql.org/docs/13/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-STATEMENT)
+
 ## `pools.<pool_name>.users.<user_index>` Section
 
 ### username
diff --git a/examples/docker/pgcat.toml b/examples/docker/pgcat.toml
index cfd94a1a..5fc874c6 100644
--- a/examples/docker/pgcat.toml
+++ b/examples/docker/pgcat.toml
@@ -90,6 +90,9 @@ primary_reads_enabled = true
 #
 sharding_function = "pg_bigint_hash"
 
+# Maximum query duration in milliseconds. This value is relayed to Postgres for managing query timeouts.
+statement_timeout = 30000 # 30 seconds
+
 # Credentials for users that may connect to this cluster
 [pools.postgres.users.0]
 username = "postgres"
diff --git a/src/config.rs b/src/config.rs
index 39eb1153..e2df2052 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -541,6 +541,9 @@ pub struct Pool {
     #[serde(default = "Pool::default_default_role")]
     pub default_role: String,
 
+    #[serde(default = "Pool::default_statement_timeout")]
+    pub statement_timeout: u64,
+
     #[serde(default)] // False
     pub query_parser_enabled: bool,
 
@@ -674,6 +677,10 @@ impl Pool {
         50
     }
 
+    pub fn default_statement_timeout() -> u64 {
+        0 // Default to no timeout
+    }
+
     pub fn validate(&mut self) -> Result<(), Error> {
         match self.default_role.as_ref() {
             "any" => (),
@@ -783,6 +790,7 @@ impl Default for Pool {
             pool_mode: Self::default_pool_mode(),
             load_balancing_mode: Self::default_load_balancing_mode(),
             default_role: String::from("any"),
+            statement_timeout: Self::default_statement_timeout(),
             query_parser_enabled: false,
             query_parser_max_length: None,
             query_parser_read_write_splitting: false,
@@ -1276,6 +1284,16 @@ impl Config {
                     .sum::<u32>()
                     .to_string()
             );
+            info!(
+                "[pool: {}] Statement timeout: {}{}",
+                pool_name,
+                pool_config.statement_timeout,
+                if pool_config.statement_timeout > 0 {
+                    "ms"
+                } else {
+                    ""
+                }
+            );
             info!(
                 "[pool: {}] Default pool mode: {}",
                 pool_name,
diff --git a/src/server.rs b/src/server.rs
index 882450ea..1bf94b43 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1259,18 +1259,25 @@ impl Server {
     }
 
     pub async fn sync_parameters(&mut self, parameters: &ServerParameters) -> Result<(), Error> {
-        let parameter_diff = self.server_parameters.compare_params(parameters);
+        let mut queries = Vec::new();
 
-        if parameter_diff.is_empty() {
-            return Ok(());
+        let config = get_config();
+        if let Some(pool) = config.pools.get(&self.address.database) {
+            if pool.statement_timeout > 0 {
+                queries.push(format!(
+                    "SET statement_timeout TO {}",
+                    pool.statement_timeout
+                ));
+                self.cleanup_state.needs_cleanup_set = true;
+            }
         }
 
-        let mut query = String::from("");
-
+        let parameter_diff = self.server_parameters.compare_params(parameters);
         for (key, value) in parameter_diff {
-            query.push_str(&format!("SET {} TO '{}';", key, value));
+            queries.push(format!("SET {} TO '{}'", key, value));
         }
 
+        let query = queries.join("; ");
         let res = self.query(&query).await;
 
         self.cleanup_state.reset();