Skip to content

Commit 0df41a7

Browse files
feat: hot tier for pmeta (#883)
Enable hot tier for the internal stream with a fixed size of 10 MiB. This automatically happens, only if hot tier is configured globally Also, it is restricted to update size for hot tier for an internal stream.
1 parent 1b24487 commit 0df41a7

File tree

3 files changed

+26
-0
lines changed

3 files changed

+26
-0
lines changed

server/src/handlers/http/logstream.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -957,6 +957,13 @@ pub async fn put_stream_hot_tier(
957957
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
958958
return Err(StreamError::StreamNotFound(stream_name));
959959
}
960+
961+
if STREAM_INFO.stream_type(&stream_name).unwrap() == StreamType::Internal.to_string() {
962+
return Err(StreamError::Custom {
963+
msg: "Hot tier can not be updated for internal stream".to_string(),
964+
status: StatusCode::BAD_REQUEST,
965+
});
966+
}
960967
if CONFIG.parseable.hot_tier_storage_path.is_none() {
961968
return Err(StreamError::HotTierNotEnabled(stream_name));
962969
}

server/src/handlers/http/modal/query_server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ impl QueryServer {
193193
log::info!("Cluster metrics scheduler started successfully");
194194
}
195195
if let Some(hot_tier_manager) = HotTierManager::global() {
196+
hot_tier_manager.put_internal_stream_hot_tier().await?;
196197
hot_tier_manager.download_from_s3()?;
197198
};
198199
let (localsync_handler, mut localsync_outbox, localsync_inbox) =

server/src/hottier.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::{
2525

2626
use crate::{
2727
catalog::manifest::{File, Manifest},
28+
handlers::http::cluster::INTERNAL_STREAM_NAME,
2829
metadata::{error::stream_info::MetadataError, STREAM_INFO},
2930
option::{
3031
validation::{bytes_to_human_size, human_size_to_bytes},
@@ -51,6 +52,7 @@ use tokio_stream::wrappers::ReadDirStream;
5152
pub const STREAM_HOT_TIER_FILENAME: &str = ".hot_tier.json";
5253
pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB
5354
const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1);
55+
pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB
5456

5557
#[derive(Debug, serde::Deserialize, serde::Serialize)]
5658
pub struct StreamHotTier {
@@ -685,6 +687,22 @@ impl HotTierManager {
685687

686688
Ok(None)
687689
}
690+
691+
pub async fn put_internal_stream_hot_tier(&self) -> Result<(), HotTierError> {
692+
if CONFIG.parseable.hot_tier_storage_path.is_some()
693+
&& !self.check_stream_hot_tier_exists(INTERNAL_STREAM_NAME)
694+
{
695+
let mut stream_hot_tier = StreamHotTier {
696+
size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(),
697+
used_size: Some("0".to_string()),
698+
available_size: Some(INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string()),
699+
oldest_date_time_entry: None,
700+
};
701+
self.put_hot_tier(INTERNAL_STREAM_NAME, &mut stream_hot_tier)
702+
.await?;
703+
}
704+
Ok(())
705+
}
688706
}
689707

690708
/// get the hot tier file path for the stream

0 commit comments

Comments
 (0)