Skip to content

Move replica banning to its own task #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ cd tests/ruby
sudo gem install bundler
bundle install
bundle exec ruby tests.rb || exit 1
bundle exec rspec *_spec.rb || exit 1
bundle exec rspec *_spec.rb --format documentation || exit 1
cd ../..

#
Expand Down
2 changes: 1 addition & 1 deletion src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ where
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let pool_state = pool.pool_state(shard, server);
let banned = pool.is_banned(address, Some(address.role));
let banned = pool.is_banned(address);

res.put(data_row(&vec![
address.name(), // name
Expand Down
160 changes: 160 additions & 0 deletions src/bans.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use arc_swap::ArcSwap;

use once_cell::sync::Lazy;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;

use crate::config::get_ban_time;
use crate::config::Address;
use crate::pool::PoolIdentifier;
use tokio::time::Duration;
use tokio::time::Instant;
#[derive(Debug, Clone, Copy)]
pub enum BanReason {
FailedHealthCheck,
MessageSendFailed,
MessageReceiveFailed,
FailedCheckout,
StatementTimeout,
#[allow(dead_code)]
ManualBan,
}
#[derive(Debug, Clone)]
pub struct BanEntry {
reason: BanReason,
time: Instant,
duration: Duration,
}
impl BanEntry {
pub fn has_expired(&self) -> bool {
return Instant::now().duration_since(self.time) > self.duration;
}

pub fn is_active(&self) -> bool {
!self.has_expired()
}
}
type BanList = HashMap<PoolIdentifier, HashMap<Address, BanEntry>>;
static BANLIST: Lazy<ArcSwap<BanList>> = Lazy::new(|| ArcSwap::from_pointee(BanList::default()));
static BANLIST_MUTEX: Lazy<Mutex<u8>> = Lazy::new(|| Mutex::new(0));

pub fn unban(pool_id: &PoolIdentifier, address: &Address) {
if !is_banned(pool_id, address) {
// Already not banned? No need to do any work
return;
}
let _guard = BANLIST_MUTEX.lock();
if !is_banned(pool_id, address) {
// Maybe it was unbanned between our initial check and locking the mutex
// In that case, we don't need to do any work
return;
}

let mut global_banlist = (**BANLIST.load()).clone();

match global_banlist.get_mut(pool_id) {
Some(pool_banlist) => {
if pool_banlist.remove(&address).is_none() {
// Was already not banned? Let's avoid publishing a new list
return;
} else {
// Banlist was updated, let's publish a new version for readers
BANLIST.store(Arc::new(global_banlist));
}
}
None => return, // Was already not banned? Let's avoid publishing a new list
}
}

fn ban(pool_id: &PoolIdentifier, address: &Address, reason: BanReason) {
if is_banned(pool_id, address) {
// Already banned? No need to do any work
return;
}
let _guard = BANLIST_MUTEX.lock();
if is_banned(pool_id, address) {
// Maybe it was banned between our initial check and locking the mutex
// In that case, we don't need to do any work
return;
}

let ban_duration_from_conf = get_ban_time();
let ban_duration = match reason {
BanReason::FailedHealthCheck
| BanReason::MessageReceiveFailed
| BanReason::MessageSendFailed
| BanReason::FailedCheckout
| BanReason::StatementTimeout => {
Duration::from_secs(ban_duration_from_conf.try_into().unwrap())
}
BanReason::ManualBan => Duration::from_secs(86400),
};

let ban_time = Instant::now();
let mut global_banlist = (**BANLIST.load()).clone();
let pool_banlist = global_banlist
.entry(pool_id.clone())
.or_insert(HashMap::default());

let ban_entry = pool_banlist.entry(address.clone()).or_insert(BanEntry {
reason: reason,
time: ban_time,
duration: ban_duration,
});

let old_banned_until = ban_entry.time + ban_entry.duration;
let new_banned_until = ban_time + ban_duration;
if new_banned_until >= old_banned_until {
ban_entry.duration = ban_duration;
ban_entry.time = ban_time;
ban_entry.reason = reason;
}

// Clean up
pool_banlist.retain(|_k, v| v.is_active());

BANLIST.store(Arc::new(global_banlist));
}

pub fn report_failed_checkout(pool_id: &PoolIdentifier, address: &Address) {
ban(pool_id, address, BanReason::FailedCheckout);
}

pub fn report_failed_healthcheck(pool_id: &PoolIdentifier, address: &Address) {
ban(pool_id, address, BanReason::FailedHealthCheck);
}

pub fn report_server_send_failed(pool_id: &PoolIdentifier, address: &Address) {
ban(pool_id, address, BanReason::MessageSendFailed);
}

pub fn report_server_receive_failed(pool_id: &PoolIdentifier, address: &Address) {
ban(pool_id, address, BanReason::MessageReceiveFailed);
}

pub fn report_statement_timeout(pool_id: &PoolIdentifier, address: &Address) {
ban(pool_id, address, BanReason::StatementTimeout);
}

#[allow(dead_code)]
pub fn report_manual_ban(pool_id: &PoolIdentifier, address: &Address) {
ban(pool_id, address, BanReason::ManualBan);
}

pub fn banlist(pool_id: &PoolIdentifier) -> HashMap<Address, BanEntry> {
match (**BANLIST.load()).get(pool_id) {
Some(banlist) => banlist.clone(),
None => HashMap::default(),
}
}

pub fn is_banned(pool_id: &PoolIdentifier, address: &Address) -> bool {
match (**BANLIST.load()).get(pool_id) {
Some(pool_banlist) => match pool_banlist.get(address) {
Some(ban_entry) => ban_entry.is_active(),
None => false,
},
None => false,
}
}
9 changes: 5 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::Sender;

use crate::admin::{generate_server_info_for_admin, handle_admin};
use crate::bans::BanReason;
use crate::config::{get_config, Address, PoolMode};
use crate::constants::*;
use crate::errors::Error;
Expand Down Expand Up @@ -1037,7 +1038,7 @@ where
match server.send(message).await {
Ok(_) => Ok(()),
Err(err) => {
pool.ban(address, self.process_id);
pool.ban(address, self.process_id, BanReason::MessageSendFailed);
Err(err)
}
}
Expand All @@ -1059,7 +1060,7 @@ where
Ok(result) => match result {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, self.process_id);
pool.ban(address, self.process_id, BanReason::MessageReceiveFailed);
error_response_terminal(
&mut self.write,
&format!("error receiving data from server: {:?}", err),
Expand All @@ -1074,7 +1075,7 @@ where
address, pool.settings.user.username
);
server.mark_bad();
pool.ban(address, self.process_id);
pool.ban(address, self.process_id, BanReason::StatementTimeout);
error_response_terminal(&mut self.write, "pool statement timeout").await?;
Err(Error::StatementTimeout)
}
Expand All @@ -1083,7 +1084,7 @@ where
match server.recv().await {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, self.process_id);
pool.ban(address, self.process_id, BanReason::MessageReceiveFailed);
error_response_terminal(
&mut self.write,
&format!("error receiving data from server: {:?}", err),
Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,10 @@ pub fn get_config() -> Config {
(*(*CONFIG.load())).clone()
}

pub fn get_ban_time() -> i64 {
(*(*CONFIG.load())).general.ban_time
}

/// Parse the configuration file located at the path.
pub async fn parse(path: &str) -> Result<(), Error> {
let mut contents = String::new();
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod bans;
pub mod config;
pub mod constants;
pub mod errors;
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use std::sync::Arc;
use tokio::sync::broadcast;

mod admin;
mod bans;
mod client;
mod config;
mod constants;
Expand Down
Loading