From d17f113d3b03a52bb14201801327d14f05c2d32e Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Thu, 30 Mar 2023 20:37:05 +0200 Subject: [PATCH 1/3] allow for multiple sharding keys --- src/config.rs | 36 ++++++++++++------------- src/pool.rs | 6 ++--- src/query_router.rs | 66 +++++++++++++++++++++++++++++++++------------ 3 files changed, 70 insertions(+), 38 deletions(-) diff --git a/src/config.rs b/src/config.rs index 9228b9bb..6255e258 100644 --- a/src/config.rs +++ b/src/config.rs @@ -529,7 +529,7 @@ pub struct Pool { pub sharding_function: ShardingFunction, #[serde(default = "Pool::default_automatic_sharding_key")] - pub automatic_sharding_key: Option, + pub automatic_sharding_keys: Option>, pub sharding_key_regex: Option, pub shard_id_regex: Option, @@ -571,7 +571,7 @@ impl Pool { LoadBalancingMode::Random } - pub fn default_automatic_sharding_key() -> Option { + pub fn default_automatic_sharding_key() -> Option> { None } @@ -627,23 +627,23 @@ impl Pool { } } - self.automatic_sharding_key = match &self.automatic_sharding_key { - Some(key) => { - // No quotes in the key so we don't have to compare quoted - // to unquoted idents. - let key = key.replace("\"", ""); - - if key.split(".").count() != 2 { - error!( - "automatic_sharding_key '{}' must be fully qualified, e.g. t.{}`", - key, key - ); - return Err(Error::BadConfig); + match &mut self.automatic_sharding_keys { + Some(keys) => { + for key in keys { + // No quotes in the key so we don't have to compare quoted + // to unquoted idents. + let key = key.replace("\"", ""); + + if key.split(".").count() != 2 { + error!( + "automatic_sharding_key '{}' must be fully qualified, e.g. t.{}`", + key, key + ); + return Err(Error::BadConfig); + } } - - Some(key) } - None => None, + None => (), }; for (_, user) in &self.users { @@ -665,7 +665,7 @@ impl Default for Pool { query_parser_enabled: false, primary_reads_enabled: false, sharding_function: ShardingFunction::PgBigintHash, - automatic_sharding_key: None, + automatic_sharding_keys: None, connect_timeout: None, idle_timeout: None, sharding_key_regex: None, diff --git a/src/pool.rs b/src/pool.rs index b9293521..a50c830e 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -118,7 +118,7 @@ pub struct PoolSettings { pub sharding_function: ShardingFunction, // Sharding key - pub automatic_sharding_key: Option, + pub automatic_sharding_keys: Option>, // Health check timeout pub healthcheck_timeout: u64, @@ -159,7 +159,7 @@ impl Default for PoolSettings { query_parser_enabled: false, primary_reads_enabled: true, sharding_function: ShardingFunction::PgBigintHash, - automatic_sharding_key: None, + automatic_sharding_keys: None, healthcheck_delay: General::default_healthcheck_delay(), healthcheck_timeout: General::default_healthcheck_timeout(), ban_time: General::default_ban_time(), @@ -458,7 +458,7 @@ impl ConnectionPool { query_parser_enabled: pool_config.query_parser_enabled, primary_reads_enabled: pool_config.primary_reads_enabled, sharding_function: pool_config.sharding_function, - automatic_sharding_key: pool_config.automatic_sharding_key.clone(), + automatic_sharding_keys: pool_config.automatic_sharding_keys.clone(), healthcheck_delay: config.general.healthcheck_delay, healthcheck_timeout: config.general.healthcheck_timeout, ban_time: config.general.ban_time, diff --git a/src/query_router.rs b/src/query_router.rs index 126b8138..973f2988 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -390,7 +390,7 @@ impl QueryRouter { // Likely a read-only query Query(query) => { - match &self.pool_settings.automatic_sharding_key { + match &self.pool_settings.automatic_sharding_keys { Some(_) => { // TODO: if we have multiple queries in the same message, // we can either split them and execute them individually @@ -571,17 +571,23 @@ impl QueryRouter { let mut result = Vec::new(); let mut found = false; - let sharding_key = self + let sharding_keys = self .pool_settings - .automatic_sharding_key + .automatic_sharding_keys .as_ref() .unwrap() - .split(".") - .map(|ident| Ident::new(ident)) - .collect::>(); - - // Sharding key must be always fully qualified - assert_eq!(sharding_key.len(), 2); + .iter() + .map(|x| { + x.split(".") + .map(|ident| Ident::new(ident)) + .collect::>() + }) + .collect::>>(); + + for sharding_key in sharding_keys.iter() { + // Sharding key must be always fully qualified + assert_eq!(sharding_key.len(), 2); + } // This parses `sharding_key = 5`. But it's technically // legal to write `5 = sharding_key`. I don't judge the people @@ -593,7 +599,10 @@ impl QueryRouter { Expr::Identifier(ident) => { // Only if we're dealing with only one table // and there is no ambiguity - if &ident.value == &sharding_key[1].value { + if let Some(sharding_key) = sharding_keys + .iter() + .find(|key| &ident.value == &key[1].value) + { // Sharding key is unique enough, don't worry about // table names. if &sharding_key[0].value == "*" { @@ -624,8 +633,13 @@ impl QueryRouter { // The key is fully qualified in the query, // it will exist or Postgres will throw an error. if idents.len() == 2 { - found = &sharding_key[0].value == &idents[0].value - && &sharding_key[1].value == &idents[1].value; + found = sharding_keys + .iter() + .find(|key| { + &key[0].value == &idents[0].value + && &key[1].value == &idents[1].value + }) + .is_some(); } // TODO: key can have schema as well, e.g. public.data.id (len == 3) } @@ -1166,7 +1180,7 @@ mod test { query_parser_enabled: true, primary_reads_enabled: false, sharding_function: ShardingFunction::PgBigintHash, - automatic_sharding_key: Some(String::from("test.id")), + automatic_sharding_keys: Some(vec![String::from("test.id")]), healthcheck_delay: PoolSettings::default().healthcheck_delay, healthcheck_timeout: PoolSettings::default().healthcheck_timeout, ban_time: PoolSettings::default().ban_time, @@ -1241,7 +1255,7 @@ mod test { query_parser_enabled: true, primary_reads_enabled: false, sharding_function: ShardingFunction::PgBigintHash, - automatic_sharding_key: None, + automatic_sharding_keys: None, healthcheck_delay: PoolSettings::default().healthcheck_delay, healthcheck_timeout: PoolSettings::default().healthcheck_timeout, ban_time: PoolSettings::default().ban_time, @@ -1282,7 +1296,8 @@ mod test { QueryRouter::setup(); let mut qr = QueryRouter::new(); - qr.pool_settings.automatic_sharding_key = Some("data.id".to_string()); + qr.pool_settings.automatic_sharding_keys = + Some(vec!["data.id".to_string(), "derived.data_id".to_string()]); qr.pool_settings.shards = 3; assert!(qr @@ -1290,6 +1305,12 @@ mod test { .is_ok()); assert_eq!(qr.shard(), 2); + assert!(qr.infer(&QueryRouter::parse(&simple_query("SELECT * FROM derived WHERE data_id = 5")).unwrap()).is_ok()); + assert_eq!(qr.shard(), 2); + + assert!(qr.infer(&QueryRouter::parse(&simple_query("SELECT * FROM derived WHERE data_id = 5")).unwrap()).is_ok()); + assert_eq!(qr.shard(), 2); + assert!(qr .infer( &QueryRouter::parse(&simple_query( @@ -1300,6 +1321,16 @@ mod test { .is_ok()); assert_eq!(qr.shard(), 0); + assert!(qr.infer(&QueryRouter::parse(&simple_query( + "SELECT one, two, three FROM public.derived WHERE data_id = 6" + )).unwrap()).is_ok()); + assert_eq!(qr.shard(), 0); + + assert!(qr.infer(&QueryRouter::parse(&simple_query( + "SELECT one, two, three FROM public.derived WHERE data_id = 6" + )).unwrap()).is_ok()); + assert_eq!(qr.shard(), 0); + assert!(qr .infer( &QueryRouter::parse(&simple_query( @@ -1346,7 +1377,8 @@ mod test { assert_eq!(qr.shard(), 2); // Super unique sharding key - qr.pool_settings.automatic_sharding_key = Some("*.unique_enough_column_name".to_string()); + qr.pool_settings.automatic_sharding_keys = + Some(vec!["*.unique_enough_column_name".to_string()]); assert!(qr .infer( &QueryRouter::parse(&simple_query( @@ -1383,7 +1415,7 @@ mod test { bind.put(payload); let mut qr = QueryRouter::new(); - qr.pool_settings.automatic_sharding_key = Some("data.id".to_string()); + qr.pool_settings.automatic_sharding_keys = Some(vec!["data.id".to_string()]); qr.pool_settings.shards = 3; assert!(qr From 72ac5592269bd0abcd292a67155555bf9d508891 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 3 Apr 2023 09:06:43 +0200 Subject: [PATCH 2/3] rename config to automatic_sharding_keys --- .circleci/pgcat.toml | 2 ++ pgcat.toml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.circleci/pgcat.toml b/.circleci/pgcat.toml index 377680a0..5b16833f 100644 --- a/.circleci/pgcat.toml +++ b/.circleci/pgcat.toml @@ -89,6 +89,8 @@ primary_reads_enabled = true # sharding_function = "pg_bigint_hash" +automatic_sharding_keys = ["data.id"] + # Credentials for users that may connect to this cluster [pools.sharded_db.users.0] username = "sharding_user" diff --git a/pgcat.toml b/pgcat.toml index 3e8801b6..4ba602d4 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -196,7 +196,7 @@ sharding_function = "pg_bigint_hash" # auth_query_password = "sharding_user" # Automatically parse this from queries and route queries to the right shard! -# automatic_sharding_key = "data.id" +# automatic_sharding_keys = ["data.id"] # Idle timeout can be overwritten in the pool idle_timeout = 40000 From 26f4b8aaaa82afb0433583de25c328ea3cefe8ef Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Tue, 25 Jul 2023 21:02:04 +0200 Subject: [PATCH 3/3] cargo fmt --- src/query_router.rs | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/src/query_router.rs b/src/query_router.rs index 973f2988..668dc675 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -1305,10 +1305,20 @@ mod test { .is_ok()); assert_eq!(qr.shard(), 2); - assert!(qr.infer(&QueryRouter::parse(&simple_query("SELECT * FROM derived WHERE data_id = 5")).unwrap()).is_ok()); + assert!(qr + .infer( + &QueryRouter::parse(&simple_query("SELECT * FROM derived WHERE data_id = 5")) + .unwrap() + ) + .is_ok()); assert_eq!(qr.shard(), 2); - assert!(qr.infer(&QueryRouter::parse(&simple_query("SELECT * FROM derived WHERE data_id = 5")).unwrap()).is_ok()); + assert!(qr + .infer( + &QueryRouter::parse(&simple_query("SELECT * FROM derived WHERE data_id = 5")) + .unwrap() + ) + .is_ok()); assert_eq!(qr.shard(), 2); assert!(qr @@ -1321,14 +1331,24 @@ mod test { .is_ok()); assert_eq!(qr.shard(), 0); - assert!(qr.infer(&QueryRouter::parse(&simple_query( - "SELECT one, two, three FROM public.derived WHERE data_id = 6" - )).unwrap()).is_ok()); + assert!(qr + .infer( + &QueryRouter::parse(&simple_query( + "SELECT one, two, three FROM public.derived WHERE data_id = 6" + )) + .unwrap() + ) + .is_ok()); assert_eq!(qr.shard(), 0); - assert!(qr.infer(&QueryRouter::parse(&simple_query( - "SELECT one, two, three FROM public.derived WHERE data_id = 6" - )).unwrap()).is_ok()); + assert!(qr + .infer( + &QueryRouter::parse(&simple_query( + "SELECT one, two, three FROM public.derived WHERE data_id = 6" + )) + .unwrap() + ) + .is_ok()); assert_eq!(qr.shard(), 0); assert!(qr