Skip to content

Commit 0b01d70

Browse files
authored
Allow configuring routing decision when no shard is selected (#578)
The TL;DR for the change is that we allow QueryRouter to set the active shard to None. This signals to the Pool::get method that we have no shard selected. The get method follows a no_shard_specified_behavior config to know how to route the query. Original PR description Ruby-pg library makes a startup query to SET client_encoding to ... if Encoding.default_internal value is set (Code). This query is troublesome because we cannot possibly attach a routing comment to it. PgCat, by default, will route that query to the default shard. Everything is fine until shard 0 has issues, Clients will all be attempting to send this query to shard0 which increases the connection latency significantly for all clients, even those not interested in shard0 This PR introduces no_shard_specified_behavior that defines the behavior in case we have routing-by-comment enabled but we get a query without a comment. The allowed behaviors are random: Picks a shard at random random_healthy: Picks a shard at random favoring shards with the least number of recent connection/checkout errors shard_<number>: e.g. shard_0, shard_4, etc. picks a specific shard, everytime In order to achieve this, this PR introduces an error_count on the Address Object that tracks the number of errors since the last checkout and uses that metric to sort shards by error count before making a routing decision. I didn't want to use address stats to avoid introducing a routing dependency on internal stats (We might do that in the future but I prefer to avoid this for the time being. I also made changes to the test environment to replace Ruby's TOML reader library, It appears to be abandoned and does not support mixed arrays (which we use in the config toml), and it also does not play nicely with single-quoted regular expressions. I opted for using yj which is a CLI tool that can convert from toml to JSON and back. So I refactor the tests to use that library.
1 parent 33db0df commit 0b01d70

16 files changed

+448
-112
lines changed

.circleci/config.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ jobs:
99
# Specify the execution environment. You can specify an image from Dockerhub or use one of our Convenience Images from CircleCI's Developer Hub.
1010
# See: https://circleci.com/docs/2.0/configuration-reference/#docker-machine-macos-windows-executor
1111
docker:
12-
- image: ghcr.io/levkk/pgcat-ci:1.67
12+
- image: ghcr.io/postgresml/pgcat-ci:latest
1313
environment:
1414
RUST_LOG: info
1515
LLVM_PROFILE_FILE: /tmp/pgcat-%m-%p.profraw

pgcat.toml

+6
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,12 @@ primary_reads_enabled = true
177177
# shard_id_regex = '/\* shard_id: (\d+) \*/'
178178
# regex_search_limit = 1000 # only look at the first 1000 characters of SQL statements
179179

180+
# Defines the behavior when no shard is selected in a sharded system.
181+
# `random`: picks a shard at random
182+
# `random_healthy`: picks a shard at random favoring shards with the least number of recent errors
183+
# `shard_<number>`: e.g. shard_0, shard_4, etc. picks a specific shard, everytime
184+
# no_shard_specified_behavior = "shard_0"
185+
180186
# So what if you wanted to implement a different hashing function,
181187
# or you've already built one and you want this pooler to use it?
182188
# Current options:

src/client.rs

+27-20
Original file line numberDiff line numberDiff line change
@@ -1009,23 +1009,27 @@ where
10091009

10101010
// SET SHARD TO
10111011
Some((Command::SetShard, _)) => {
1012-
// Selected shard is not configured.
1013-
if query_router.shard() >= pool.shards() {
1014-
// Set the shard back to what it was.
1015-
query_router.set_shard(current_shard);
1016-
1017-
error_response(
1018-
&mut self.write,
1019-
&format!(
1020-
"shard {} is more than configured {}, staying on shard {} (shard numbers start at 0)",
1021-
query_router.shard(),
1022-
pool.shards(),
1023-
current_shard,
1024-
),
1025-
)
1026-
.await?;
1027-
} else {
1028-
custom_protocol_response_ok(&mut self.write, "SET SHARD").await?;
1012+
match query_router.shard() {
1013+
None => (),
1014+
Some(selected_shard) => {
1015+
if selected_shard >= pool.shards() {
1016+
// Bad shard number, send error message to client.
1017+
query_router.set_shard(current_shard);
1018+
1019+
error_response(
1020+
&mut self.write,
1021+
&format!(
1022+
"shard {} is not configured {}, staying on shard {:?} (shard numbers start at 0)",
1023+
selected_shard,
1024+
pool.shards(),
1025+
current_shard,
1026+
),
1027+
)
1028+
.await?;
1029+
} else {
1030+
custom_protocol_response_ok(&mut self.write, "SET SHARD").await?;
1031+
}
1032+
}
10291033
}
10301034
continue;
10311035
}
@@ -1093,8 +1097,11 @@ where
10931097
self.buffer.clear();
10941098
}
10951099

1096-
error_response(&mut self.write, "could not get connection from the pool")
1097-
.await?;
1100+
error_response(
1101+
&mut self.write,
1102+
format!("could not get connection from the pool - {}", err).as_str(),
1103+
)
1104+
.await?;
10981105

10991106
error!(
11001107
"Could not get connection from pool: \
@@ -1234,7 +1241,7 @@ where
12341241
{{ \
12351242
pool_name: {}, \
12361243
username: {}, \
1237-
shard: {}, \
1244+
shard: {:?}, \
12381245
role: \"{:?}\" \
12391246
}}",
12401247
self.pool_name,

src/config.rs

+81
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ use arc_swap::ArcSwap;
33
use log::{error, info};
44
use once_cell::sync::Lazy;
55
use regex::Regex;
6+
use serde::{Deserializer, Serializer};
67
use serde_derive::{Deserialize, Serialize};
8+
79
use std::collections::hash_map::DefaultHasher;
810
use std::collections::{BTreeMap, HashMap, HashSet};
911
use std::hash::{Hash, Hasher};
1012
use std::path::Path;
13+
use std::sync::atomic::{AtomicU64, Ordering};
1114
use std::sync::Arc;
1215
use tokio::fs::File;
1316
use tokio::io::AsyncReadExt;
@@ -101,6 +104,9 @@ pub struct Address {
101104

102105
/// Address stats
103106
pub stats: Arc<AddressStats>,
107+
108+
/// Number of errors encountered since last successful checkout
109+
pub error_count: Arc<AtomicU64>,
104110
}
105111

106112
impl Default for Address {
@@ -118,6 +124,7 @@ impl Default for Address {
118124
pool_name: String::from("pool_name"),
119125
mirrors: Vec::new(),
120126
stats: Arc::new(AddressStats::default()),
127+
error_count: Arc::new(AtomicU64::new(0)),
121128
}
122129
}
123130
}
@@ -182,6 +189,18 @@ impl Address {
182189
),
183190
}
184191
}
192+
193+
pub fn error_count(&self) -> u64 {
194+
self.error_count.load(Ordering::Relaxed)
195+
}
196+
197+
pub fn increment_error_count(&self) {
198+
self.error_count.fetch_add(1, Ordering::Relaxed);
199+
}
200+
201+
pub fn reset_error_count(&self) {
202+
self.error_count.store(0, Ordering::Relaxed);
203+
}
185204
}
186205

187206
/// PostgreSQL user.
@@ -540,6 +559,9 @@ pub struct Pool {
540559
pub shard_id_regex: Option<String>,
541560
pub regex_search_limit: Option<usize>,
542561

562+
#[serde(default = "Pool::default_default_shard")]
563+
pub default_shard: DefaultShard,
564+
543565
pub auth_query: Option<String>,
544566
pub auth_query_user: Option<String>,
545567
pub auth_query_password: Option<String>,
@@ -575,6 +597,10 @@ impl Pool {
575597
PoolMode::Transaction
576598
}
577599

600+
pub fn default_default_shard() -> DefaultShard {
601+
DefaultShard::default()
602+
}
603+
578604
pub fn default_load_balancing_mode() -> LoadBalancingMode {
579605
LoadBalancingMode::Random
580606
}
@@ -666,6 +692,16 @@ impl Pool {
666692
None => None,
667693
};
668694

695+
match self.default_shard {
696+
DefaultShard::Shard(shard_number) => {
697+
if shard_number >= self.shards.len() {
698+
error!("Invalid shard {:?}", shard_number);
699+
return Err(Error::BadConfig);
700+
}
701+
}
702+
_ => (),
703+
}
704+
669705
for (_, user) in &self.users {
670706
user.validate()?;
671707
}
@@ -693,6 +729,7 @@ impl Default for Pool {
693729
sharding_key_regex: None,
694730
shard_id_regex: None,
695731
regex_search_limit: Some(1000),
732+
default_shard: Self::default_default_shard(),
696733
auth_query: None,
697734
auth_query_user: None,
698735
auth_query_password: None,
@@ -711,6 +748,50 @@ pub struct ServerConfig {
711748
pub role: Role,
712749
}
713750

751+
// No Shard Specified handling.
752+
#[derive(Debug, PartialEq, Clone, Eq, Hash, Copy)]
753+
pub enum DefaultShard {
754+
Shard(usize),
755+
Random,
756+
RandomHealthy,
757+
}
758+
impl Default for DefaultShard {
759+
fn default() -> Self {
760+
DefaultShard::Shard(0)
761+
}
762+
}
763+
impl serde::Serialize for DefaultShard {
764+
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
765+
match self {
766+
DefaultShard::Shard(shard) => {
767+
serializer.serialize_str(&format!("shard_{}", &shard.to_string()))
768+
}
769+
DefaultShard::Random => serializer.serialize_str("random"),
770+
DefaultShard::RandomHealthy => serializer.serialize_str("random_healthy"),
771+
}
772+
}
773+
}
774+
impl<'de> serde::Deserialize<'de> for DefaultShard {
775+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
776+
where
777+
D: Deserializer<'de>,
778+
{
779+
let s = String::deserialize(deserializer)?;
780+
if s.starts_with("shard_") {
781+
let shard = s[6..].parse::<usize>().map_err(serde::de::Error::custom)?;
782+
return Ok(DefaultShard::Shard(shard));
783+
}
784+
785+
match s.as_str() {
786+
"random" => Ok(DefaultShard::Random),
787+
"random_healthy" => Ok(DefaultShard::RandomHealthy),
788+
_ => Err(serde::de::Error::custom(
789+
"invalid value for no_shard_specified_behavior",
790+
)),
791+
}
792+
}
793+
}
794+
714795
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug, Hash, Eq)]
715796
pub struct MirrorServerConfig {
716797
pub host: String,

src/errors.rs

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub enum Error {
2828
UnsupportedStatement,
2929
QueryRouterParserError(String),
3030
QueryRouterError(String),
31+
InvalidShardId(usize),
3132
}
3233

3334
#[derive(Clone, PartialEq, Debug)]

0 commit comments

Comments
 (0)