Skip to content

Commit

Permalink
Merge pull request #68 from rainshowerLabs/ready-live
Browse files Browse the repository at this point in the history
Add proper k8s liveness checks
  • Loading branch information
makemake-kbo authored Mar 13, 2024
2 parents cbb4e7b + 44534a8 commit 4bc33a9
Show file tree
Hide file tree
Showing 17 changed files with 674 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "blutgang"
version = "0.3.2"
version = "0.3.3"
edition = "2021"
authors = ["makemake <[email protected]>, Rainshower Labs, contributors"]
license-file = "LICENSE"
Expand Down
20 changes: 13 additions & 7 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
pkgs = import nixpkgs {
inherit system overlays;
};
cargoMeta = builtins.fromTOML (builtins.readFile ./Cargo.toml);
in {
cargoMeta = builtins.fromTOML (builtins.readFile ./Cargo.toml);
in
{
packages.default = pkgs.rustPlatform.buildRustPackage {
pname = cargoMeta.package.name;
version = cargoMeta.package.version;
Expand All @@ -33,7 +34,6 @@
extensions = [ "rust-src" "rustfmt-preview" "rust-analyzer" ];
})
];

cargoBuildFlags = [ "--profile maxperf" ];
};

Expand All @@ -43,8 +43,10 @@
pkg-config
openssl
systemd
(rust-bin.stable.latest.default.override {
extensions = [ "rust-src" "rustfmt-preview" "rust-analyzer"];
clang
gdb
(rust-bin.stable.latest.default.override {
extensions = [ "rust-src" "rustfmt-preview" "rust-analyzer" ];
})
];

Expand All @@ -53,13 +55,17 @@
export RUSTC_WRAPPER=$(which sccache)
export OLD_PS1="$PS1" # Preserve the original PS1
export PS1="nix-shell:blutgang $PS1" # Customize this line as needed
# Set NIX_LD and NIX_LD_LIBRARY_PATH for rust-analyzer
export NIX_LD_LIBRARY_PATH="${pkgs.lib.makeLibraryPath [ pkgs.glibc pkgs.gcc-unwrapped.lib ]}"
export NIX_LD="${pkgs.stdenv.cc}/nix-support/dynamic-linker"
'';

# reser PS1
# reset ps1
shellExitHook = ''
export PS1="$OLD_PS1"
'';
};
}
);
}
}
1 change: 0 additions & 1 deletion shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pkgs.mkShell {
pkgs.pkg-config
pkgs.openssl
pkgs.systemdLibs
pkgs.cargo2nix
];

shellHook = ''
Expand Down
16 changes: 15 additions & 1 deletion src/admin/accept.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use crate::log_info;
use crate::{
admin::liveready::{
accept_health_request,
accept_readiness_request,
LiveReadyRequestSnd,
},
log_info,
};
use http_body_util::Full;
use hyper::{
body::Bytes,
Expand Down Expand Up @@ -131,7 +138,14 @@ pub async fn accept_admin_request(
poverty_list_rwlock: Arc<RwLock<Vec<Rpc>>>,
cache: Arc<Db>,
config: Arc<RwLock<Settings>>,
liveness_request_tx: LiveReadyRequestSnd,
) -> Result<hyper::Response<Full<Bytes>>, Infallible> {
if tx.uri().path() == "/ready" {
return accept_readiness_request(liveness_request_tx).await;
} else if tx.uri().path() == "/health" {
return accept_health_request(liveness_request_tx).await;
}

let mut tx = incoming_to_value(tx).await.unwrap();

// If we have JWT enabled check that tx is valid
Expand Down
74 changes: 58 additions & 16 deletions src/admin/listener.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
use std::sync::{
Arc,
RwLock,
use std::{
net::SocketAddr,
sync::{
Arc,
RwLock,
},
};

use sled::Db;

use crate::{
admin::accept::accept_admin_request,
admin::{
accept::accept_admin_request,
liveready::{
liveness_monitor,
LiveReadyRequestSnd,
LiveReadyUpdateRecv,
},
},
log_info,
Rpc,
Settings,
Expand All @@ -17,7 +27,10 @@ use hyper::{
service::service_fn,
};
use hyper_util_blutgang::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::{
net::TcpListener,
sync::mpsc,
};

macro_rules! accept_admin {
(
Expand All @@ -26,6 +39,7 @@ macro_rules! accept_admin {
$poverty_list_rwlock:expr,
$cache:expr,
$config:expr,
$liveness_request_tx:expr,
) => {
// Bind the incoming connection to our service
if let Err(err) = http1::Builder::new()
Expand All @@ -39,6 +53,7 @@ macro_rules! accept_admin {
Arc::clone($poverty_list_rwlock),
Arc::clone($cache),
Arc::clone($config),
$liveness_request_tx.clone(),
);
response
}),
Expand All @@ -50,24 +65,17 @@ macro_rules! accept_admin {
};
}

// Used for listening to admin requests as its own tokio task.
//
// Similar to what you'd find in main/balancer
pub async fn listen_for_admin_requests(
async fn admin_api_server(
rpc_list_rwlock: Arc<RwLock<Vec<Rpc>>>,
poverty_list_rwlock: Arc<RwLock<Vec<Rpc>>>,
cache: Arc<Db>,
config: Arc<RwLock<Settings>>,
address: SocketAddr,
liveness_request_tx: LiveReadyRequestSnd,
) -> Result<(), Box<dyn std::error::Error>> {
let address;
{
let config_guard = config.read().unwrap();
address = config_guard.admin.address;
}

// Create a listener and bind to it
let listener = TcpListener::bind(address).await?;
log_info!("Bound admin to: {}", address);
log_info!("Bound admin API to: {}", address);

loop {
let (stream, socketaddr) = listener.accept().await?;
Expand All @@ -81,6 +89,7 @@ pub async fn listen_for_admin_requests(
let poverty_list_rwlock_clone = Arc::clone(&poverty_list_rwlock);
let cache_clone = Arc::clone(&cache);
let config_clone = Arc::clone(&config);
let liveness_request_tx_clone = liveness_request_tx.clone();

// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
Expand All @@ -90,7 +99,40 @@ pub async fn listen_for_admin_requests(
&poverty_list_rwlock_clone,
&cache_clone,
&config_clone,
&liveness_request_tx_clone,
);
});
}
}

// Used for listening to admin requests as its own tokio task.
// Also used for k8s liveness/readiness probes.
//
// Similar to what you'd find in main/balancer
pub async fn listen_for_admin_requests(
rpc_list_rwlock: Arc<RwLock<Vec<Rpc>>>,
poverty_list_rwlock: Arc<RwLock<Vec<Rpc>>>,
cache: Arc<Db>,
config: Arc<RwLock<Settings>>,
liveness_receiver: LiveReadyUpdateRecv,
) -> Result<(), Box<dyn std::error::Error>> {
let address;
{
let config_guard = config.read().unwrap();
address = config_guard.admin.address;
}

// Spawn thread for monitoring the current liveness status of Blutgang
let (liveness_request_tx, liveness_request_rx) = mpsc::channel(16);
tokio::spawn(liveness_monitor(liveness_receiver, liveness_request_rx));

admin_api_server(
rpc_list_rwlock,
poverty_list_rwlock,
cache,
config,
address,
liveness_request_tx,
)
.await
}
Loading

0 comments on commit 4bc33a9

Please sign in to comment.