Skip to content

Commit aa89e35

Browse files
authored
PgCat Query Mirroring (#341)
This is an implementation of Query mirroring in PgCat (outlined here #302) In configs, we match mirror hosts with the servers handling the traffic. A mirror host will receive the same protocol messages as the main server it was matched with. This is done by creating an async task for each mirror server, it communicates with the main server through two channels, one for the protocol messages and one for the exit signal. The mirror server sends the protocol packets to the underlying PostgreSQL server. We receive from the underlying PostgreSQL server as soon as the data is available and we immediately discard it. We use bb8 to manage the life cycle of the connection, not for pooling since each mirror server handler is more or less single-threaded. We don't have any connection pooling in the mirrors. Matching each mirror connection to an actual server connection guarantees that we will not have more connections to any of the mirrors than the parent pool would allow.
1 parent c0855bf commit aa89e35

17 files changed

+370
-23
lines changed

Diff for: .circleci/config.yml

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,28 @@ jobs:
1818
RUSTFLAGS: "-Zprofile -Ccodegen-units=1 -Copt-level=0 -Clink-dead-code -Coverflow-checks=off -Zpanic_abort_tests -Cpanic=abort -Cinstrument-coverage"
1919
RUSTDOCFLAGS: "-Cpanic=abort"
2020
- image: postgres:14
21-
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements"]
21+
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
2222
environment:
2323
POSTGRES_USER: postgres
2424
POSTGRES_DB: postgres
2525
POSTGRES_PASSWORD: postgres
2626
POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5
2727
- image: postgres:14
28-
command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements"]
28+
command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
2929
environment:
3030
POSTGRES_USER: postgres
3131
POSTGRES_DB: postgres
3232
POSTGRES_PASSWORD: postgres
3333
POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256
3434
- image: postgres:14
35-
command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements"]
35+
command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
3636
environment:
3737
POSTGRES_USER: postgres
3838
POSTGRES_DB: postgres
3939
POSTGRES_PASSWORD: postgres
4040
POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256
4141
- image: postgres:14
42-
command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements"]
42+
command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
4343
environment:
4444
POSTGRES_USER: postgres
4545
POSTGRES_DB: postgres

Diff for: .circleci/run_tests.sh

+3-3
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,12 @@ sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml
9292
kill -SIGHUP $(pgrep pgcat) # Reload config again
9393

9494
#
95-
# ActiveRecord tests
95+
# Integration tests and ActiveRecord tests
9696
#
9797
cd tests/ruby
9898
sudo bundle install
99-
bundle exec ruby tests.rb || exit 1
100-
bundle exec rspec *_spec.rb || exit 1
99+
bundle exec ruby tests.rb --format documentation || exit 1
100+
bundle exec rspec *_spec.rb --format documentation || exit 1
101101
cd ../..
102102

103103
#

Diff for: .gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
/target
33
*.deb
44
.vscode
5-
.profraw
5+
*.profraw
66
cov/
77
lcov.info
88

Diff for: dev/docker-compose.yaml

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,29 +33,29 @@ services:
3333
<<: *common-env-pg
3434
POSTGRES_INITDB_ARGS: --auth-local=md5 --auth-host=md5 --auth=md5
3535
PGPORT: 5432
36-
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "5432"]
36+
command: ["postgres", "-p", "5432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
3737

3838
pg2:
3939
<<: *common-definition-pg
4040
environment:
4141
<<: *common-env-pg
4242
POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256
4343
PGPORT: 7432
44-
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "7432"]
44+
command: ["postgres", "-p", "7432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
4545
pg3:
4646
<<: *common-definition-pg
4747
environment:
4848
<<: *common-env-pg
4949
POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256
5050
PGPORT: 8432
51-
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "8432"]
51+
command: ["postgres", "-p", "8432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
5252
pg4:
5353
<<: *common-definition-pg
5454
environment:
5555
<<: *common-env-pg
5656
POSTGRES_INITDB_ARGS: --auth-local=scram-sha-256 --auth-host=scram-sha-256 --auth=scram-sha-256
5757
PGPORT: 9432
58-
command: ["postgres", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-p", "9432"]
58+
command: ["postgres", "-p", "9432", "-c", "shared_preload_libraries=pg_stat_statements", "-c", "pg_stat_statements.track=all", "-c", "pg_stat_statements.max=100000"]
5959

6060
toxiproxy:
6161
build: .

Diff for: src/admin.rs

-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::config::Role;
21
use crate::pool::BanReason;
32
/// Admin database.
43
use bytes::{Buf, BufMut, BytesMut};

Diff for: src/config.rs

+20-1
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,16 @@ pub enum Role {
2929
Primary,
3030
#[serde(alias = "replica", alias = "Replica")]
3131
Replica,
32+
#[serde(alias = "mirror", alias = "Mirror")]
33+
Mirror,
3234
}
3335

3436
impl ToString for Role {
3537
fn to_string(&self) -> String {
3638
match *self {
3739
Role::Primary => "primary".to_string(),
3840
Role::Replica => "replica".to_string(),
41+
Role::Mirror => "mirror".to_string(),
3942
}
4043
}
4144
}
@@ -90,6 +93,9 @@ pub struct Address {
9093

9194
/// The name of this pool (i.e. database name visible to the client).
9295
pub pool_name: String,
96+
97+
/// List of addresses to receive mirrored traffic.
98+
pub mirrors: Vec<Address>,
9399
}
94100

95101
impl Default for Address {
@@ -105,6 +111,7 @@ impl Default for Address {
105111
role: Role::Replica,
106112
username: String::from("username"),
107113
pool_name: String::from("pool_name"),
114+
mirrors: Vec::new(),
108115
}
109116
}
110117
}
@@ -114,11 +121,14 @@ impl Address {
114121
pub fn name(&self) -> String {
115122
match self.role {
116123
Role::Primary => format!("{}_shard_{}_primary", self.pool_name, self.shard),
117-
118124
Role::Replica => format!(
119125
"{}_shard_{}_replica_{}",
120126
self.pool_name, self.shard, self.replica_number
121127
),
128+
Role::Mirror => format!(
129+
"{}_shard_{}_mirror_{}",
130+
self.pool_name, self.shard, self.replica_number
131+
),
122132
}
123133
}
124134
}
@@ -465,11 +475,19 @@ pub struct ServerConfig {
465475
pub role: Role,
466476
}
467477

478+
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug, Hash, Eq)]
479+
pub struct MirrorServerConfig {
480+
pub host: String,
481+
pub port: u16,
482+
pub mirroring_target_index: usize,
483+
}
484+
468485
/// Shard configuration.
469486
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Hash, Eq)]
470487
pub struct Shard {
471488
pub database: String,
472489
pub servers: Vec<ServerConfig>,
490+
pub mirrors: Option<Vec<MirrorServerConfig>>,
473491
}
474492

475493
impl Shard {
@@ -518,6 +536,7 @@ impl Default for Shard {
518536
port: 5432,
519537
role: Role::Primary,
520538
}],
539+
mirrors: None,
521540
database: String::from("postgres"),
522541
}
523542
}

Diff for: src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod config;
22
pub mod constants;
33
pub mod errors;
44
pub mod messages;
5+
pub mod mirrors;
56
pub mod multi_logger;
67
pub mod pool;
78
pub mod scram;

Diff for: src/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ mod config;
6666
mod constants;
6767
mod errors;
6868
mod messages;
69+
mod mirrors;
6970
mod multi_logger;
7071
mod pool;
7172
mod prometheus;

Diff for: src/mirrors.rs

+169
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/// A mirrored PostgreSQL client.
2+
/// Packets arrive to us through a channel from the main client and we send them to the server.
3+
use bb8::Pool;
4+
use bytes::{Bytes, BytesMut};
5+
6+
use crate::config::{get_config, Address, Role, User};
7+
use crate::pool::{ClientServerMap, ServerPool};
8+
use crate::stats::get_reporter;
9+
use log::{error, info, trace, warn};
10+
use tokio::sync::mpsc::{channel, Receiver, Sender};
11+
12+
pub struct MirroredClient {
13+
address: Address,
14+
user: User,
15+
database: String,
16+
bytes_rx: Receiver<Bytes>,
17+
disconnect_rx: Receiver<()>,
18+
}
19+
20+
impl MirroredClient {
21+
async fn create_pool(&self) -> Pool<ServerPool> {
22+
let config = get_config();
23+
let default = std::time::Duration::from_millis(10_000).as_millis() as u64;
24+
let (connection_timeout, idle_timeout) = match config.pools.get(&self.address.pool_name) {
25+
Some(cfg) => (
26+
cfg.connect_timeout.unwrap_or(default),
27+
cfg.idle_timeout.unwrap_or(default),
28+
),
29+
None => (default, default),
30+
};
31+
32+
let manager = ServerPool::new(
33+
self.address.clone(),
34+
self.user.clone(),
35+
self.database.as_str(),
36+
ClientServerMap::default(),
37+
get_reporter(),
38+
);
39+
40+
Pool::builder()
41+
.max_size(1)
42+
.connection_timeout(std::time::Duration::from_millis(connection_timeout))
43+
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
44+
.test_on_check_out(false)
45+
.build(manager)
46+
.await
47+
.unwrap()
48+
}
49+
50+
pub fn start(mut self) {
51+
tokio::spawn(async move {
52+
let pool = self.create_pool().await;
53+
let address = self.address.clone();
54+
loop {
55+
let mut server = match pool.get().await {
56+
Ok(server) => server,
57+
Err(err) => {
58+
error!(
59+
"Failed to get connection from pool, Discarding message {:?}, {:?}",
60+
err,
61+
address.clone()
62+
);
63+
continue;
64+
}
65+
};
66+
67+
tokio::select! {
68+
// Exit channel events
69+
_ = self.disconnect_rx.recv() => {
70+
info!("Got mirror exit signal, exiting {:?}", address.clone());
71+
break;
72+
}
73+
74+
// Incoming data from server (we read to clear the socket buffer and discard the data)
75+
recv_result = server.recv() => {
76+
match recv_result {
77+
Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()),
78+
Err(err) => {
79+
server.mark_bad();
80+
error!("Failed to receive from mirror {:?} {:?}", err, address.clone());
81+
}
82+
}
83+
}
84+
85+
// Messages to send to the server
86+
message = self.bytes_rx.recv() => {
87+
match message {
88+
Some(bytes) => {
89+
match server.send(&BytesMut::from(&bytes[..])).await {
90+
Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()),
91+
Err(err) => {
92+
server.mark_bad();
93+
error!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone())
94+
}
95+
}
96+
}
97+
None => {
98+
info!("Mirror channel closed, exiting {:?}", address.clone());
99+
break;
100+
},
101+
}
102+
}
103+
}
104+
}
105+
});
106+
}
107+
}
108+
pub struct MirroringManager {
109+
pub byte_senders: Vec<Sender<Bytes>>,
110+
pub disconnect_senders: Vec<Sender<()>>,
111+
}
112+
impl MirroringManager {
113+
pub fn from_addresses(
114+
user: User,
115+
database: String,
116+
addresses: Vec<Address>,
117+
) -> MirroringManager {
118+
let mut byte_senders: Vec<Sender<Bytes>> = vec![];
119+
let mut exit_senders: Vec<Sender<()>> = vec![];
120+
121+
addresses.iter().for_each(|mirror| {
122+
let (bytes_tx, bytes_rx) = channel::<Bytes>(500);
123+
let (exit_tx, exit_rx) = channel::<()>(1);
124+
let mut addr = mirror.clone();
125+
addr.role = Role::Mirror;
126+
let client = MirroredClient {
127+
user: user.clone(),
128+
database: database.to_owned(),
129+
address: addr,
130+
bytes_rx,
131+
disconnect_rx: exit_rx,
132+
};
133+
exit_senders.push(exit_tx.clone());
134+
byte_senders.push(bytes_tx.clone());
135+
client.start();
136+
});
137+
138+
Self {
139+
byte_senders: byte_senders,
140+
disconnect_senders: exit_senders,
141+
}
142+
}
143+
144+
pub fn send(self: &mut Self, bytes: &BytesMut) {
145+
let cpy = bytes.clone().freeze();
146+
self.byte_senders
147+
.iter_mut()
148+
.for_each(|sender| match sender.try_send(cpy.clone()) {
149+
Ok(_) => {}
150+
Err(err) => {
151+
warn!("Failed to send bytes to a mirror channel {}", err);
152+
}
153+
});
154+
}
155+
156+
pub fn disconnect(self: &mut Self) {
157+
self.disconnect_senders
158+
.iter_mut()
159+
.for_each(|sender| match sender.try_send(()) {
160+
Ok(_) => {}
161+
Err(err) => {
162+
warn!(
163+
"Failed to send disconnect signal to a mirror channel {}",
164+
err
165+
);
166+
}
167+
});
168+
}
169+
}

0 commit comments

Comments
 (0)