Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Change health connection to SSE #209

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ Date: Mon, 27 Jun 2022 14:26:45 GMT

As part of making this API performant, all reading endpoints support long-polling as an efficient alternative to regular (repeated) polling. Using this function requires the following parameters:

- `wait_count`: The API call will block until at least this many results are available. If there are more matching tasks/results avalible all of them will be returned.
- `wait_count`: The API call will block until at least this many results are available. If there are more matching tasks/results available all of them will be returned.
- `wait_time`: ... or this time has passed (if not stated differently, e.g., by adding 'm', 'h', 'ms', ..., this is interpreted as seconds), whichever comes first.

For example, retrieving a task's results:
Expand Down Expand Up @@ -629,6 +629,12 @@ Samply.Beam encrypts all information in the `body` fields of both Tasks and Resu

The data is symmetrically encrypted using the Authenticated Encryption with Authenticated Data (AEAD) algorithm "XChaCha20Poly1305", a widespread algorithm (e.g., mandatory for the TLS protocol), regarded as highly secure by experts. The used [chacha20poly1305 library](https://docs.rs/chacha20poly1305/latest/chacha20poly1305/) was sublected to a [security audit](https://research.nccgroup.com/2020/02/26/public-report-rustcrypto-aes-gcm-and-chacha20poly1305-implementation-review/), with no significant findings. The randomly generated symmetric keys are encapsulated in a RSA encrypted ciphertext using OAEP Padding. This ensures, that only the intended recipients can decrypt the key and subsequently the transferred data.

### Health check connection

The beam proxy tries to keep a permanent connection to the broker to make it possible to see which sites are currently connected.
This also allows us to detected invalid connection states such as multiple proxies with the same proxy id connecting simultaneously.
In that case the second proxy trying to connect will receive a 409 status code and shut down.

## Roadmap

- [X] API Key authentication of local applications
Expand Down
2 changes: 1 addition & 1 deletion broker/src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::Duration;
use tokio::{sync::RwLock, time::timeout};
use tracing::{debug, error, warn, info};

use crate::health::{self, Health, VaultStatus};
use crate::serve_health::{Health, VaultStatus};

pub struct GetCertsFromPki {
pki_realm: String,
Expand Down
85 changes: 0 additions & 85 deletions broker/src/health.rs

This file was deleted.

7 changes: 3 additions & 4 deletions broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

mod banner;
mod crypto;
mod health;
mod serve;
mod serve_health;
mod serve_pki;
Expand All @@ -15,7 +14,7 @@ mod compare_client_server_version;
use std::{collections::HashMap, sync::Arc, time::Duration};

use crypto::GetCertsFromPki;
use health::{Health, InitStatus};
use serve_health::{Health, InitStatus};
use once_cell::sync::Lazy;
use shared::{config::CONFIG_CENTRAL, *, errors::SamplyBeamError};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -45,8 +44,8 @@ pub async fn main() -> anyhow::Result<()> {

async fn init_broker_ca_chain(health: Arc<RwLock<Health>>) {
{
health.write().await.initstatus = health::InitStatus::FetchingIntermediateCert
health.write().await.initstatus = InitStatus::FetchingIntermediateCert
}
shared::crypto::init_ca_chain().await.expect("Failed to init broker ca chain");
health.write().await.initstatus = health::InitStatus::Done;
health.write().await.initstatus = InitStatus::Done;
}
2 changes: 1 addition & 1 deletion broker/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::{
};
use tracing::{debug, info, trace, warn};

use crate::{banner, crypto, health::Health, serve_health, serve_pki, serve_tasks, compare_client_server_version};
use crate::{banner, crypto, serve_health::Health, serve_health, serve_pki, serve_tasks, compare_client_server_version};

pub(crate) async fn serve(health: Arc<RwLock<Health>>) -> anyhow::Result<()> {
let app = serve_tasks::router()
Expand Down
129 changes: 87 additions & 42 deletions broker/src/serve_health.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{sync::Arc, time::{Duration, SystemTime}};
use std::{collections::HashMap, convert::Infallible, marker::PhantomData, sync::Arc, time::{Duration, SystemTime}};

use axum::{extract::{State, Path}, http::StatusCode, routing::get, Json, Router, response::Response};
use axum::{extract::{Path, State}, http::StatusCode, response::{sse::{Event, KeepAlive}, Response, Sse}, routing::get, Json, Router};
use axum_extra::{headers::{authorization::Basic, Authorization}, TypedHeader};
use beam_lib::ProxyId;
use futures_core::Stream;
use serde::{Serialize, Deserialize};
use shared::{crypto_jwt::Authorized, Msg, config::CONFIG_CENTRAL};
use tokio::sync::RwLock;
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};

use crate::{health::{Health, VaultStatus, Verdict, ProxyStatus, InitStatus}, compare_client_server_version::log_version_mismatch};
use crate::compare_client_server_version::log_version_mismatch;

#[derive(Serialize)]
struct HealthOutput {
Expand All @@ -16,6 +17,58 @@ struct HealthOutput {
init_status: InitStatus
}

#[derive(Serialize)]
#[serde(rename_all = "lowercase")]
pub enum Verdict {
Healthy,
Unhealthy,
Unknown,
}

impl Default for Verdict {
fn default() -> Self {
Verdict::Unknown
}
}

#[derive(Debug, Serialize, Clone, Copy, Default)]
#[serde(rename_all = "lowercase")]
pub enum VaultStatus {
Ok,
#[default]
Unknown,
OtherError,
LockedOrSealed,
Unreachable,
}

#[derive(Debug, Serialize, Clone, Copy, Default)]
#[serde(rename_all = "lowercase")]
pub enum InitStatus {
#[default]
Unknown,
FetchingIntermediateCert,
Done
}

#[derive(Debug, Default)]
pub struct Health {
pub vault: VaultStatus,
pub initstatus: InitStatus,
proxies: HashMap<ProxyId, ProxyStatus>
}

#[derive(Debug, Clone, Default)]
struct ProxyStatus {
online_guard: Arc<Mutex<Option<SystemTime>>>
}

impl ProxyStatus {
pub fn is_online(&self) -> bool {
self.online_guard.try_lock().is_err()
}
}

pub(crate) fn router(health: Arc<RwLock<Health>>) -> Router {
Router::new()
.route("/v1/health", get(handler))
Expand Down Expand Up @@ -46,14 +99,14 @@ async fn handler(
}

async fn get_all_proxies(State(state): State<Arc<RwLock<Health>>>) -> Json<Vec<ProxyId>> {
Json(state.read().await.proxies.keys().cloned().collect())
Json(state.read().await.proxies.iter().filter(|(_, v)| v.is_online()).map(|(k, _)| k).cloned().collect())
}

async fn proxy_health(
State(state): State<Arc<RwLock<Health>>>,
Path(proxy): Path<ProxyId>,
auth: TypedHeader<Authorization<Basic>>
) -> Result<(StatusCode, Json<ProxyStatus>), StatusCode> {
) -> Result<(StatusCode, Json<serde_json::Value>), StatusCode> {
let Some(ref monitoring_key) = CONFIG_CENTRAL.monitoring_api_key else {
return Err(StatusCode::NOT_IMPLEMENTED);
};
Expand All @@ -63,10 +116,12 @@ async fn proxy_health(
}

if let Some(reported_back) = state.read().await.proxies.get(&proxy) {
if reported_back.online() {
Err(StatusCode::OK)
if let Ok(last_disconnect) = reported_back.online_guard.try_lock().as_deref().copied() {
Ok((StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({
"last_disconnect": last_disconnect
}))))
} else {
Ok((StatusCode::SERVICE_UNAVAILABLE, Json(reported_back.clone())))
Err(StatusCode::OK)
}
} else {
Err(StatusCode::NOT_FOUND)
Expand All @@ -76,48 +131,38 @@ async fn proxy_health(
async fn get_control_tasks(
State(state): State<Arc<RwLock<Health>>>,
proxy_auth: Authorized,
) -> StatusCode {
) -> Result<Sse<ForeverStream>, StatusCode> {
let proxy_id = proxy_auth.get_from().proxy_id();
// Once this is freed the connection will be removed from the map of connected proxies again
// This ensures that when the connection is dropped and therefore this response future the status of this proxy will be updated
let _connection_remover = ConnectedGuard::connect(&proxy_id, &state).await;

// In the future, this will wait for control tasks for the given proxy
tokio::time::sleep(Duration::from_secs(60 * 60)).await;
let status_mutex = state
.write()
.await
.proxies
.entry(proxy_id)
.or_default()
.online_guard
.clone();
let Ok(connect_guard) = tokio::time::timeout(Duration::from_secs(60), status_mutex.lock_owned()).await
else {
return Err(StatusCode::CONFLICT);
};

StatusCode::OK
Ok(Sse::new(ForeverStream(connect_guard)).keep_alive(KeepAlive::new()))
}

struct ConnectedGuard<'a> {
proxy: &'a ProxyId,
state: &'a Arc<RwLock<Health>>
}
struct ForeverStream(OwnedMutexGuard<Option<SystemTime>>);

impl<'a> ConnectedGuard<'a> {
async fn connect(proxy: &'a ProxyId, state: &'a Arc<RwLock<Health>>) -> ConnectedGuard<'a> {
{
state.write().await.proxies
.entry(proxy.clone())
.and_modify(ProxyStatus::connect)
.or_insert(ProxyStatus::new());
}
Self { proxy, state }
impl Stream for ForeverStream {
type Item = Result<Event, Infallible>;

fn poll_next(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
std::task::Poll::Pending
}
}

impl<'a> Drop for ConnectedGuard<'a> {
impl Drop for ForeverStream {
fn drop(&mut self) {
let proxy_id = self.proxy.clone();
let map = self.state.clone();
tokio::spawn(async move {
// We wait here for one second to give the client a bit of time to reconnect incrementing the connection count so that it will be one again after the decrement
tokio::time::sleep(Duration::from_secs(1)).await;
map.write()
.await
.proxies
.get_mut(&proxy_id)
.expect("Has to exist as we don't remove items and the constructor of this type inserts the entry")
.disconnect();
});
*self.0 = Some(SystemTime::now());
}
}
}
Loading