Skip to content

Commit 9937193

Browse files
authored
Allow pause/resuming all pools (#566)
support pausing all pools
1 parent baa00ff commit 9937193

File tree

2 files changed

+124
-68
lines changed

2 files changed

+124
-68
lines changed

src/admin.rs

+100-68
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ where
7474
}
7575
"PAUSE" => {
7676
trace!("PAUSE");
77-
pause(stream, query_parts[1]).await
77+
pause(stream, query_parts).await
7878
}
7979
"RESUME" => {
8080
trace!("RESUME");
81-
resume(stream, query_parts[1]).await
81+
resume(stream, query_parts).await
8282
}
8383
"SHUTDOWN" => {
8484
trace!("SHUTDOWN");
@@ -797,96 +797,128 @@ where
797797
}
798798

799799
/// Pause a pool. It won't pass any more queries to the backends.
800-
async fn pause<T>(stream: &mut T, query: &str) -> Result<(), Error>
800+
async fn pause<T>(stream: &mut T, tokens: Vec<&str>) -> Result<(), Error>
801801
where
802802
T: tokio::io::AsyncWrite + std::marker::Unpin,
803803
{
804-
let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect();
805-
806-
if parts.len() != 2 {
807-
error_response(
808-
stream,
809-
"PAUSE requires a database and a user, e.g. PAUSE my_db,my_user",
810-
)
811-
.await
812-
} else {
813-
let database = parts[0];
814-
let user = parts[1];
815-
816-
match get_pool(database, user) {
817-
Some(pool) => {
818-
pool.pause();
804+
let parts: Vec<&str> = match tokens.len() == 2 {
805+
true => tokens[1].split(",").map(|part| part.trim()).collect(),
806+
false => Vec::new(),
807+
};
819808

820-
let mut res = BytesMut::new();
809+
match parts.len() {
810+
0 => {
811+
for (_, pool) in get_all_pools() {
812+
pool.pause();
813+
}
821814

822-
res.put(command_complete(&format!("PAUSE {},{}", database, user)));
815+
let mut res = BytesMut::new();
823816

824-
// ReadyForQuery
825-
res.put_u8(b'Z');
826-
res.put_i32(5);
827-
res.put_u8(b'I');
817+
res.put(command_complete("PAUSE"));
828818

829-
write_all_half(stream, &res).await
830-
}
819+
// ReadyForQuery
820+
res.put_u8(b'Z');
821+
res.put_i32(5);
822+
res.put_u8(b'I');
831823

832-
None => {
833-
error_response(
834-
stream,
835-
&format!(
836-
"No pool configured for database: {}, user: {}",
837-
database, user
838-
),
839-
)
840-
.await
824+
write_all_half(stream, &res).await
825+
}
826+
2 => {
827+
let database = parts[0];
828+
let user = parts[1];
829+
830+
match get_pool(database, user) {
831+
Some(pool) => {
832+
pool.pause();
833+
834+
let mut res = BytesMut::new();
835+
836+
res.put(command_complete(&format!("PAUSE {},{}", database, user)));
837+
838+
// ReadyForQuery
839+
res.put_u8(b'Z');
840+
res.put_i32(5);
841+
res.put_u8(b'I');
842+
843+
write_all_half(stream, &res).await
844+
}
845+
846+
None => {
847+
error_response(
848+
stream,
849+
&format!(
850+
"No pool configured for database: {}, user: {}",
851+
database, user
852+
),
853+
)
854+
.await
855+
}
841856
}
842857
}
858+
_ => error_response(stream, "usage: PAUSE [db, user]").await,
843859
}
844860
}
845861

846862
/// Resume a pool. Queries are allowed again.
847-
async fn resume<T>(stream: &mut T, query: &str) -> Result<(), Error>
863+
async fn resume<T>(stream: &mut T, tokens: Vec<&str>) -> Result<(), Error>
848864
where
849865
T: tokio::io::AsyncWrite + std::marker::Unpin,
850866
{
851-
let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect();
852-
853-
if parts.len() != 2 {
854-
error_response(
855-
stream,
856-
"RESUME requires a database and a user, e.g. RESUME my_db,my_user",
857-
)
858-
.await
859-
} else {
860-
let database = parts[0];
861-
let user = parts[1];
862-
863-
match get_pool(database, user) {
864-
Some(pool) => {
865-
pool.resume();
867+
let parts: Vec<&str> = match tokens.len() == 2 {
868+
true => tokens[1].split(",").map(|part| part.trim()).collect(),
869+
false => Vec::new(),
870+
};
866871

867-
let mut res = BytesMut::new();
872+
match parts.len() {
873+
0 => {
874+
for (_, pool) in get_all_pools() {
875+
pool.resume();
876+
}
868877

869-
res.put(command_complete(&format!("RESUME {},{}", database, user)));
878+
let mut res = BytesMut::new();
870879

871-
// ReadyForQuery
872-
res.put_u8(b'Z');
873-
res.put_i32(5);
874-
res.put_u8(b'I');
880+
res.put(command_complete("RESUME"));
875881

876-
write_all_half(stream, &res).await
877-
}
882+
// ReadyForQuery
883+
res.put_u8(b'Z');
884+
res.put_i32(5);
885+
res.put_u8(b'I');
878886

879-
None => {
880-
error_response(
881-
stream,
882-
&format!(
883-
"No pool configured for database: {}, user: {}",
884-
database, user
885-
),
886-
)
887-
.await
887+
write_all_half(stream, &res).await
888+
}
889+
2 => {
890+
let database = parts[0];
891+
let user = parts[1];
892+
893+
match get_pool(database, user) {
894+
Some(pool) => {
895+
pool.resume();
896+
897+
let mut res = BytesMut::new();
898+
899+
res.put(command_complete(&format!("RESUME {},{}", database, user)));
900+
901+
// ReadyForQuery
902+
res.put_u8(b'Z');
903+
res.put_i32(5);
904+
res.put_u8(b'I');
905+
906+
write_all_half(stream, &res).await
907+
}
908+
909+
None => {
910+
error_response(
911+
stream,
912+
&format!(
913+
"No pool configured for database: {}, user: {}",
914+
database, user
915+
),
916+
)
917+
.await
918+
}
888919
}
889920
}
921+
_ => error_response(stream, "usage: RESUME [db, user]").await,
890922
}
891923
}
892924

tests/ruby/admin_spec.rb

+24
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,28 @@
9090
expect(results["pool_mode"]).to eq("transaction")
9191
end
9292
end
93+
94+
describe "PAUSE" do
95+
it "pauses all pools" do
96+
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
97+
results = admin_conn.async_exec("SHOW DATABASES").to_a
98+
expect(results.map{ |r| r["paused"] }.uniq).to eq(["0"])
99+
100+
admin_conn.async_exec("PAUSE")
101+
102+
results = admin_conn.async_exec("SHOW DATABASES").to_a
103+
expect(results.map{ |r| r["paused"] }.uniq).to eq(["1"])
104+
105+
admin_conn.async_exec("RESUME")
106+
107+
results = admin_conn.async_exec("SHOW DATABASES").to_a
108+
expect(results.map{ |r| r["paused"] }.uniq).to eq(["0"])
109+
end
110+
111+
it "handles errors" do
112+
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
113+
expect { admin_conn.async_exec("PAUSE foo").to_a }.to raise_error(PG::SystemError)
114+
expect { admin_conn.async_exec("PAUSE foo,bar").to_a }.to raise_error(PG::SystemError)
115+
end
116+
end
93117
end

0 commit comments

Comments
 (0)