Skip to content

Commit 52b1b43

Browse files
authored
Prewarmer (#435)
* Prewarmer * hmm * Tests * default * fix test * Correct configuration * Added minimal config example * remove connect_timeout
1 parent 0907f1b commit 52b1b43

15 files changed

+337
-284
lines changed

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pgcat"
3-
version = "1.0.2-alpha1"
3+
version = "1.0.2-alpha2"
44
edition = "2021"
55

66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

pgcat.minimal.toml

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# This is an example of the most basic config
2+
# that will mimic what PgBouncer does in transaction mode with one server.
3+
4+
[general]
5+
6+
host = "0.0.0.0"
7+
port = 6433
8+
admin_username = "pgcat"
9+
admin_password = "pgcat"
10+
11+
[pools.pgml.users.0]
12+
username = "postgres"
13+
password = "postgres"
14+
pool_size = 10
15+
min_pool_size = 1
16+
pool_mode = "transaction"
17+
18+
[pools.pgml.shards.0]
19+
servers = [
20+
["127.0.0.1", 28815, "primary"]
21+
]
22+
database = "postgres"

pgcat.toml

+66-6
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,58 @@ admin_username = "admin_user"
7777
# Password to access the virtual administrative database
7878
admin_password = "admin_pass"
7979

80+
# Default plugins that are configured on all pools.
81+
[plugins]
82+
83+
# Prewarmer plugin that runs queries on server startup, before giving the connection
84+
# to the client.
85+
[plugins.prewarmer]
86+
enabled = false
87+
queries = [
88+
"SELECT pg_prewarm('pgbench_accounts')",
89+
]
90+
91+
# Log all queries to stdout.
92+
[plugins.query_logger]
93+
enabled = false
94+
95+
# Block access to tables that Postgres does not allow us to control.
96+
[plugins.table_access]
97+
enabled = false
98+
tables = [
99+
"pg_user",
100+
"pg_roles",
101+
"pg_database",
102+
]
103+
104+
# Intercept user queries and give a fake reply.
105+
[plugins.intercept]
106+
enabled = true
107+
108+
[plugins.intercept.queries.0]
109+
110+
query = "select current_database() as a, current_schemas(false) as b"
111+
schema = [
112+
["a", "text"],
113+
["b", "text"],
114+
]
115+
result = [
116+
["${DATABASE}", "{public}"],
117+
]
118+
119+
[plugins.intercept.queries.1]
120+
121+
query = "select current_database(), current_schema(), current_user"
122+
schema = [
123+
["current_database", "text"],
124+
["current_schema", "text"],
125+
["current_user", "text"],
126+
]
127+
result = [
128+
["${DATABASE}", "public", "${USER}"],
129+
]
130+
131+
80132
# pool configs are structured as pool.<pool_name>
81133
# the pool_name is what clients use as database name when connecting.
82134
# For a pool named `sharded_db`, clients access that pool using connection string like
@@ -154,23 +206,31 @@ connect_timeout = 3000
154206
# Specifies how often (in seconds) cached ip addresses for servers are rechecked (see `dns_cache_enabled`).
155207
# dns_max_ttl = 30
156208

157-
[plugins]
209+
# Plugins can be configured on a pool-per-pool basis. This overrides the global plugins setting,
210+
# so all plugins have to be configured here again.
211+
[pool.sharded_db.plugins]
158212

159-
[plugins.query_logger]
213+
[pools.sharded_db.plugins.prewarmer]
214+
enabled = true
215+
queries = [
216+
"SELECT pg_prewarm('pgbench_accounts')",
217+
]
218+
219+
[pools.sharded_db.plugins.query_logger]
160220
enabled = false
161221

162-
[plugins.table_access]
222+
[pools.sharded_db.plugins.table_access]
163223
enabled = false
164224
tables = [
165225
"pg_user",
166226
"pg_roles",
167227
"pg_database",
168228
]
169229

170-
[plugins.intercept]
230+
[pools.sharded_db.plugins.intercept]
171231
enabled = true
172232

173-
[plugins.intercept.queries.0]
233+
[pools.sharded_db.plugins.intercept.queries.0]
174234

175235
query = "select current_database() as a, current_schemas(false) as b"
176236
schema = [
@@ -181,7 +241,7 @@ result = [
181241
["${DATABASE}", "{public}"],
182242
]
183243

184-
[plugins.intercept.queries.1]
244+
[pools.sharded_db.plugins.intercept.queries.1]
185245

186246
query = "select current_database(), current_schema(), current_user"
187247
schema = [

src/config.rs

+73-5
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,16 @@ impl Default for Address {
122122
}
123123
}
124124

125+
impl std::fmt::Display for Address {
126+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
127+
write!(
128+
f,
129+
"[address: {}:{}][database: {}][user: {}]",
130+
self.host, self.port, self.database, self.username
131+
)
132+
}
133+
}
134+
125135
// We need to implement PartialEq by ourselves so we skip stats in the comparison
126136
impl PartialEq for Address {
127137
fn eq(&self, other: &Self) -> bool {
@@ -235,6 +245,8 @@ pub struct General {
235245
pub port: u16,
236246

237247
pub enable_prometheus_exporter: Option<bool>,
248+
249+
#[serde(default = "General::default_prometheus_exporter_port")]
238250
pub prometheus_exporter_port: i16,
239251

240252
#[serde(default = "General::default_connect_timeout")]
@@ -374,6 +386,10 @@ impl General {
374386
pub fn default_validate_config() -> bool {
375387
true
376388
}
389+
390+
pub fn default_prometheus_exporter_port() -> i16 {
391+
9930
392+
}
377393
}
378394

379395
impl Default for General {
@@ -462,6 +478,7 @@ pub struct Pool {
462478
#[serde(default = "Pool::default_load_balancing_mode")]
463479
pub load_balancing_mode: LoadBalancingMode,
464480

481+
#[serde(default = "Pool::default_default_role")]
465482
pub default_role: String,
466483

467484
#[serde(default)] // False
@@ -476,6 +493,7 @@ pub struct Pool {
476493

477494
pub server_lifetime: Option<u64>,
478495

496+
#[serde(default = "Pool::default_sharding_function")]
479497
pub sharding_function: ShardingFunction,
480498

481499
#[serde(default = "Pool::default_automatic_sharding_key")]
@@ -489,6 +507,7 @@ pub struct Pool {
489507
pub auth_query_user: Option<String>,
490508
pub auth_query_password: Option<String>,
491509

510+
pub plugins: Option<Plugins>,
492511
pub shards: BTreeMap<String, Shard>,
493512
pub users: BTreeMap<String, User>,
494513
// Note, don't put simple fields below these configs. There's a compatibility issue with TOML that makes it
@@ -521,6 +540,14 @@ impl Pool {
521540
None
522541
}
523542

543+
pub fn default_default_role() -> String {
544+
"any".into()
545+
}
546+
547+
pub fn default_sharding_function() -> ShardingFunction {
548+
ShardingFunction::PgBigintHash
549+
}
550+
524551
pub fn validate(&mut self) -> Result<(), Error> {
525552
match self.default_role.as_ref() {
526553
"any" => (),
@@ -609,6 +636,7 @@ impl Default for Pool {
609636
auth_query_user: None,
610637
auth_query_password: None,
611638
server_lifetime: None,
639+
plugins: None,
612640
}
613641
}
614642
}
@@ -687,30 +715,50 @@ impl Default for Shard {
687715
}
688716
}
689717

690-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
718+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
691719
pub struct Plugins {
692720
pub intercept: Option<Intercept>,
693721
pub table_access: Option<TableAccess>,
694722
pub query_logger: Option<QueryLogger>,
723+
pub prewarmer: Option<Prewarmer>,
695724
}
696725

697-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
726+
impl std::fmt::Display for Plugins {
727+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
728+
write!(
729+
f,
730+
"interceptor: {}, table_access: {}, query_logger: {}, prewarmer: {}",
731+
self.intercept.is_some(),
732+
self.table_access.is_some(),
733+
self.query_logger.is_some(),
734+
self.prewarmer.is_some(),
735+
)
736+
}
737+
}
738+
739+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
698740
pub struct Intercept {
699741
pub enabled: bool,
700742
pub queries: BTreeMap<String, Query>,
701743
}
702744

703-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
745+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
704746
pub struct TableAccess {
705747
pub enabled: bool,
706748
pub tables: Vec<String>,
707749
}
708750

709-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
751+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
710752
pub struct QueryLogger {
711753
pub enabled: bool,
712754
}
713755

756+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
757+
pub struct Prewarmer {
758+
pub enabled: bool,
759+
pub queries: Vec<String>,
760+
}
761+
714762
impl Intercept {
715763
pub fn substitute(&mut self, db: &str, user: &str) {
716764
for (_, query) in self.queries.iter_mut() {
@@ -720,7 +768,7 @@ impl Intercept {
720768
}
721769
}
722770

723-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
771+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
724772
pub struct Query {
725773
pub query: String,
726774
pub schema: Vec<Vec<String>>,
@@ -754,8 +802,13 @@ pub struct Config {
754802
#[serde(default = "Config::default_path")]
755803
pub path: String,
756804

805+
// General and global settings.
757806
pub general: General,
807+
808+
// Plugins that should run in all pools.
758809
pub plugins: Option<Plugins>,
810+
811+
// Connection pools.
759812
pub pools: HashMap<String, Pool>,
760813
}
761814

@@ -940,6 +993,13 @@ impl Config {
940993
"Server TLS certificate verification: {}",
941994
self.general.verify_server_certificate
942995
);
996+
info!(
997+
"Plugins: {}",
998+
match self.plugins {
999+
Some(ref plugins) => plugins.to_string(),
1000+
None => "not configured".into(),
1001+
}
1002+
);
9431003

9441004
for (pool_name, pool_config) in &self.pools {
9451005
// TODO: Make this output prettier (maybe a table?)
@@ -1006,6 +1066,14 @@ impl Config {
10061066
None => "default".to_string(),
10071067
}
10081068
);
1069+
info!(
1070+
"[pool: {}] Plugins: {}",
1071+
pool_name,
1072+
match pool_config.plugins {
1073+
Some(ref plugins) => plugins.to_string(),
1074+
None => "not configured".into(),
1075+
}
1076+
);
10091077

10101078
for user in &pool_config.users {
10111079
info!(

src/mirrors.rs

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ impl MirroredClient {
4343
ClientServerMap::default(),
4444
Arc::new(PoolStats::new(identifier, cfg.clone())),
4545
Arc::new(RwLock::new(None)),
46+
None,
4647
);
4748

4849
Pool::builder()

0 commit comments

Comments
 (0)