Skip to content

Commit e6f1e2a

Browse files
feat: hot-tier on query node for distributed set up (#852)
1 parent 356c03c commit e6f1e2a

File tree

16 files changed

+1193
-18
lines changed

16 files changed

+1193
-18
lines changed

server/src/cli.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,12 @@ pub struct Cli {
101101

102102
/// CORS behaviour
103103
pub cors: bool,
104+
105+
/// The local hot_tier path is used for optimising the query performance in the distributed systems
106+
pub hot_tier_storage_path: Option<PathBuf>,
107+
108+
///maximum disk usage allowed
109+
pub max_disk_usage: f64,
104110
}
105111

106112
impl Cli {
@@ -134,6 +140,8 @@ impl Cli {
134140
pub const DEFAULT_PASSWORD: &'static str = "admin";
135141
pub const FLIGHT_PORT: &'static str = "flight-port";
136142
pub const CORS: &'static str = "cors";
143+
pub const HOT_TIER_PATH: &'static str = "hot-tier-path";
144+
pub const MAX_DISK_USAGE: &'static str = "max-disk-usage";
137145

138146
pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
139147
self.local_staging_path.join(stream_name)
@@ -395,7 +403,27 @@ impl Cli {
395403
"lz4",
396404
"zstd"])
397405
.help("Parquet compression algorithm"),
398-
).group(
406+
)
407+
.arg(
408+
Arg::new(Self::HOT_TIER_PATH)
409+
.long(Self::HOT_TIER_PATH)
410+
.env("P_HOT_TIER_DIR")
411+
.value_name("DIR")
412+
.value_parser(validation::canonicalize_path)
413+
.help("Local path on this device to be used for hot tier data")
414+
.next_line_help(true),
415+
)
416+
.arg(
417+
Arg::new(Self::MAX_DISK_USAGE)
418+
.long(Self::MAX_DISK_USAGE)
419+
.env("P_MAX_DISK_USAGE_PERCENT")
420+
.value_name("percentage")
421+
.default_value("80.0")
422+
.value_parser(validation::validate_disk_usage)
423+
.help("Maximum allowed disk usage in percentage e.g 90.0 for 90%")
424+
.next_line_help(true),
425+
)
426+
.group(
399427
ArgGroup::new("oidc")
400428
.args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER])
401429
.requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER])
@@ -532,6 +560,12 @@ impl FromArgMatches for Cli {
532560
_ => unreachable!(),
533561
};
534562

563+
self.hot_tier_storage_path = m.get_one::<PathBuf>(Self::HOT_TIER_PATH).cloned();
564+
self.max_disk_usage = m
565+
.get_one::<f64>(Self::MAX_DISK_USAGE)
566+
.cloned()
567+
.expect("default for max disk usage");
568+
535569
Ok(())
536570
}
537571
}

server/src/handlers/http/logstream.rs

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::handlers::{
2727
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
2828
UPDATE_STREAM_KEY,
2929
};
30+
use crate::hottier::{HotTierManager, StreamHotTier};
3031
use crate::metadata::STREAM_INFO;
3132
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
3233
use crate::option::{Mode, CONFIG};
@@ -37,6 +38,7 @@ use crate::{
3738
catalog::{self, remove_manifest_from_snapshot},
3839
event, stats,
3940
};
41+
4042
use crate::{metadata, validator};
4143
use actix_web::http::StatusCode;
4244
use actix_web::{web, HttpRequest, Responder};
@@ -919,6 +921,122 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
919921
Ok((web::Json(stream_info), StatusCode::OK))
920922
}
921923

924+
pub async fn put_stream_hot_tier(
925+
req: HttpRequest,
926+
body: web::Json<serde_json::Value>,
927+
) -> Result<impl Responder, StreamError> {
928+
if CONFIG.parseable.mode != Mode::Query {
929+
return Err(StreamError::Custom {
930+
msg: "Hot tier can only be enabled in query mode".to_string(),
931+
status: StatusCode::BAD_REQUEST,
932+
});
933+
}
934+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
935+
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
936+
return Err(StreamError::StreamNotFound(stream_name));
937+
}
938+
if CONFIG.parseable.hot_tier_storage_path.is_none() {
939+
return Err(StreamError::HotTierNotEnabled(stream_name));
940+
}
941+
942+
if STREAM_INFO
943+
.get_time_partition(&stream_name)
944+
.unwrap()
945+
.is_some()
946+
{
947+
return Err(StreamError::Custom {
948+
msg: "Hot tier can not be enabled for stream with time partition".to_string(),
949+
status: StatusCode::BAD_REQUEST,
950+
});
951+
}
952+
953+
let body = body.into_inner();
954+
let mut hottier: StreamHotTier = match serde_json::from_value(body) {
955+
Ok(hottier) => hottier,
956+
Err(err) => return Err(StreamError::InvalidHotTierConfig(err)),
957+
};
958+
959+
validator::hot_tier(&hottier.size.to_string())?;
960+
961+
STREAM_INFO.set_hot_tier(&stream_name, true)?;
962+
if let Some(hot_tier_manager) = HotTierManager::global() {
963+
hot_tier_manager
964+
.validate_hot_tier_size(&stream_name, &hottier.size)
965+
.await?;
966+
hottier.used_size = Some("0GiB".to_string());
967+
hottier.available_size = Some(hottier.size.clone());
968+
hot_tier_manager
969+
.put_hot_tier(&stream_name, &mut hottier)
970+
.await?;
971+
let storage = CONFIG.storage().get_object_store();
972+
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
973+
stream_metadata.hot_tier_enabled = Some(true);
974+
storage
975+
.put_stream_manifest(&stream_name, &stream_metadata)
976+
.await?;
977+
}
978+
979+
Ok((
980+
format!("hot tier set for stream {stream_name}"),
981+
StatusCode::OK,
982+
))
983+
}
984+
985+
pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, StreamError> {
986+
if CONFIG.parseable.mode != Mode::Query {
987+
return Err(StreamError::Custom {
988+
msg: "Hot tier can only be enabled in query mode".to_string(),
989+
status: StatusCode::BAD_REQUEST,
990+
});
991+
}
992+
993+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
994+
995+
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
996+
return Err(StreamError::StreamNotFound(stream_name));
997+
}
998+
999+
if CONFIG.parseable.hot_tier_storage_path.is_none() {
1000+
return Err(StreamError::HotTierNotEnabled(stream_name));
1001+
}
1002+
1003+
if let Some(hot_tier_manager) = HotTierManager::global() {
1004+
let hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
1005+
Ok((web::Json(hot_tier), StatusCode::OK))
1006+
} else {
1007+
Err(StreamError::Custom {
1008+
msg: format!("hot tier not initialised for stream {}", stream_name),
1009+
status: (StatusCode::BAD_REQUEST),
1010+
})
1011+
}
1012+
}
1013+
1014+
pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, StreamError> {
1015+
if CONFIG.parseable.mode != Mode::Query {
1016+
return Err(StreamError::Custom {
1017+
msg: "Hot tier can only be enabled in query mode".to_string(),
1018+
status: StatusCode::BAD_REQUEST,
1019+
});
1020+
}
1021+
1022+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
1023+
1024+
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
1025+
return Err(StreamError::StreamNotFound(stream_name));
1026+
}
1027+
1028+
if CONFIG.parseable.hot_tier_storage_path.is_none() {
1029+
return Err(StreamError::HotTierNotEnabled(stream_name));
1030+
}
1031+
1032+
if let Some(hot_tier_manager) = HotTierManager::global() {
1033+
hot_tier_manager.delete_hot_tier(&stream_name).await?;
1034+
}
1035+
Ok((
1036+
format!("hot tier deleted for stream {stream_name}"),
1037+
StatusCode::OK,
1038+
))
1039+
}
9221040
#[allow(unused)]
9231041
fn classify_json_error(kind: serde_json::error::Category) -> StatusCode {
9241042
match kind {
@@ -935,9 +1053,12 @@ pub mod error {
9351053
use http::StatusCode;
9361054

9371055
use crate::{
1056+
hottier::HotTierError,
9381057
metadata::error::stream_info::MetadataError,
9391058
storage::ObjectStorageError,
940-
validator::error::{AlertValidationError, StreamNameValidationError},
1059+
validator::error::{
1060+
AlertValidationError, HotTierValidationError, StreamNameValidationError,
1061+
},
9411062
};
9421063

9431064
#[allow(unused)]
@@ -997,6 +1118,16 @@ pub mod error {
9971118
Network(#[from] reqwest::Error),
9981119
#[error("Could not deserialize into JSON object, {0}")]
9991120
SerdeError(#[from] serde_json::Error),
1121+
#[error(
1122+
"Hot tier is not enabled at the server config, cannot enable hot tier for stream {0}"
1123+
)]
1124+
HotTierNotEnabled(String),
1125+
#[error("failed to enable hottier due to err: {0}")]
1126+
InvalidHotTierConfig(serde_json::Error),
1127+
#[error("Hot tier validation failed due to {0}")]
1128+
HotTierValidation(#[from] HotTierValidationError),
1129+
#[error("{0}")]
1130+
HotTierError(#[from] HotTierError),
10001131
}
10011132

10021133
impl actix_web::ResponseError for StreamError {
@@ -1030,6 +1161,10 @@ pub mod error {
10301161
StreamError::Network(err) => {
10311162
err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
10321163
}
1164+
StreamError::HotTierNotEnabled(_) => StatusCode::BAD_REQUEST,
1165+
StreamError::InvalidHotTierConfig(_) => StatusCode::BAD_REQUEST,
1166+
StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST,
1167+
StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR,
10331168
}
10341169
}
10351170

server/src/handlers/http/modal/query_server.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::handlers::airplane;
2020
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
2121
use crate::handlers::http::middleware::RouteExt;
2222
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
23-
23+
use crate::hottier::HotTierManager;
2424
use crate::rbac::role::Action;
2525
use crate::sync;
2626
use crate::users::dashboards::DASHBOARDS;
@@ -188,6 +188,10 @@ impl QueryServer {
188188
if matches!(init_cluster_metrics_schedular(), Ok(())) {
189189
log::info!("Cluster metrics scheduler started successfully");
190190
}
191+
192+
if let Some(hot_tier_manager) = HotTierManager::global() {
193+
hot_tier_manager.download_from_s3()?;
194+
};
191195
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
192196
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
193197
sync::object_store_sync();

server/src/handlers/http/modal/server.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,25 @@ impl Server {
343343
.to(logstream::get_cache_enabled)
344344
.authorize_for_stream(Action::GetCacheEnabled),
345345
),
346+
)
347+
.service(
348+
web::resource("/hottier")
349+
// PUT "/logstream/{logstream}/hottier" ==> Set hottier for given logstream
350+
.route(
351+
web::put()
352+
.to(logstream::put_stream_hot_tier)
353+
.authorize_for_stream(Action::PutHotTierEnabled),
354+
)
355+
.route(
356+
web::get()
357+
.to(logstream::get_stream_hot_tier)
358+
.authorize_for_stream(Action::GetHotTierEnabled),
359+
)
360+
.route(
361+
web::delete()
362+
.to(logstream::delete_stream_hot_tier)
363+
.authorize_for_stream(Action::DeleteHotTierEnabled),
364+
),
346365
),
347366
)
348367
}

0 commit comments

Comments
 (0)