Skip to content

Commit 1b24487

Browse files
enhancement: add stream type to stream definition (#884)
This PR adds stream_type=UserDefined for user defined streams. For internal streams, it adds stream_type=Internal. This is useful for clients like the Console to decide on how to use the given stream. As we add other different types of streams, this segregation will ensure a clear pattern of usage for different stream types.
1 parent 6daf9e1 commit 1b24487

File tree

13 files changed

+205
-50
lines changed

13 files changed

+205
-50
lines changed

server/src/event.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::sync::Arc;
2727

2828
use self::error::EventError;
2929
pub use self::writer::STREAM_WRITERS;
30-
use crate::{handlers::http::ingest::PostError, metadata};
30+
use crate::{handlers::http::ingest::PostError, metadata, storage::StreamType};
3131
use chrono::NaiveDateTime;
3232
use std::collections::HashMap;
3333

@@ -45,6 +45,7 @@ pub struct Event {
4545
pub parsed_timestamp: NaiveDateTime,
4646
pub time_partition: Option<String>,
4747
pub custom_partition_values: HashMap<String, String>,
48+
pub stream_type: StreamType,
4849
}
4950

5051
// Events holds the schema related to a each event for a single log stream
@@ -75,6 +76,7 @@ impl Event {
7576
self.rb.clone(),
7677
self.parsed_timestamp,
7778
&self.custom_partition_values,
79+
&self.stream_type,
7880
)?;
7981

8082
metadata::STREAM_INFO.update_stats(
@@ -106,6 +108,7 @@ impl Event {
106108
self.rb.clone(),
107109
self.parsed_timestamp,
108110
&self.custom_partition_values,
111+
&self.stream_type,
109112
)
110113
.map_err(PostError::Event)
111114
}
@@ -122,13 +125,15 @@ impl Event {
122125
rb: RecordBatch,
123126
parsed_timestamp: NaiveDateTime,
124127
custom_partition_values: &HashMap<String, String>,
128+
stream_type: &StreamType,
125129
) -> Result<(), EventError> {
126130
STREAM_WRITERS.append_to_local(
127131
stream_name,
128132
schema_key,
129133
rb,
130134
parsed_timestamp,
131135
custom_partition_values.clone(),
136+
stream_type,
132137
)?;
133138
Ok(())
134139
}

server/src/event/writer.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use std::{
2626
};
2727

2828
use crate::{
29-
handlers::http::cluster::INTERNAL_STREAM_NAME,
3029
option::{Mode, CONFIG},
30+
storage::StreamType,
3131
utils,
3232
};
3333

@@ -92,6 +92,7 @@ impl WriterTable {
9292
record: RecordBatch,
9393
parsed_timestamp: NaiveDateTime,
9494
custom_partition_values: HashMap<String, String>,
95+
stream_type: &StreamType,
9596
) -> Result<(), StreamWriterError> {
9697
let hashmap_guard = self.read().unwrap();
9798

@@ -104,6 +105,7 @@ impl WriterTable {
104105
record,
105106
parsed_timestamp,
106107
&custom_partition_values,
108+
stream_type,
107109
)?;
108110
}
109111
None => {
@@ -118,12 +120,14 @@ impl WriterTable {
118120
record,
119121
parsed_timestamp,
120122
&custom_partition_values,
123+
stream_type,
121124
)?;
122125
}
123126
};
124127
Ok(())
125128
}
126129

130+
#[allow(clippy::too_many_arguments)]
127131
fn handle_existing_writer(
128132
&self,
129133
stream_writer: &Mutex<Writer>,
@@ -132,8 +136,9 @@ impl WriterTable {
132136
record: RecordBatch,
133137
parsed_timestamp: NaiveDateTime,
134138
custom_partition_values: &HashMap<String, String>,
139+
stream_type: &StreamType,
135140
) -> Result<(), StreamWriterError> {
136-
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
141+
if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal {
137142
stream_writer.lock().unwrap().push(
138143
stream_name,
139144
schema_key,
@@ -151,6 +156,7 @@ impl WriterTable {
151156
Ok(())
152157
}
153158

159+
#[allow(clippy::too_many_arguments)]
154160
fn handle_missing_writer(
155161
&self,
156162
mut map: RwLockWriteGuard<HashMap<String, Mutex<Writer>>>,
@@ -159,10 +165,11 @@ impl WriterTable {
159165
record: RecordBatch,
160166
parsed_timestamp: NaiveDateTime,
161167
custom_partition_values: &HashMap<String, String>,
168+
stream_type: &StreamType,
162169
) -> Result<(), StreamWriterError> {
163170
match map.get(stream_name) {
164171
Some(writer) => {
165-
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
172+
if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal {
166173
writer.lock().unwrap().push(
167174
stream_name,
168175
schema_key,
@@ -175,7 +182,7 @@ impl WriterTable {
175182
}
176183
}
177184
None => {
178-
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
185+
if CONFIG.parseable.mode != Mode::Query || *stream_type == StreamType::Internal {
179186
let mut writer = Writer::default();
180187
writer.push(
181188
stream_name,

server/src/handlers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
3434
const AUTHORIZATION_KEY: &str = "authorization";
3535
const SEPARATOR: char = '^';
3636
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
37+
const STREAM_TYPE_KEY: &str = "x-p-stream-type";
3738
const OIDC_SCOPE: &str = "openid profile email";
3839
const COOKIE_AGE_DAYS: usize = 7;
3940
const SESSION_COOKIE_NAME: &str = "session";

server/src/handlers/http/cluster/mod.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ use crate::stats::Stats;
3030
use crate::storage::object_storage::ingestor_metadata_path;
3131
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
3232
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
33-
use actix_web::http::header;
33+
use actix_web::http::header::{self, HeaderMap};
3434
use actix_web::{HttpRequest, Responder};
3535
use bytes::Bytes;
3636
use chrono::Utc;
37-
use http::StatusCode;
37+
use http::{header as http_header, StatusCode};
3838
use itertools::Itertools;
3939
use relative_path::RelativePathBuf;
4040
use serde::de::Error;
@@ -96,10 +96,15 @@ pub async fn sync_cache_with_ingestors(
9696

9797
// forward the request to all ingestors to keep them in sync
9898
pub async fn sync_streams_with_ingestors(
99-
req: HttpRequest,
99+
headers: HeaderMap,
100100
body: Bytes,
101101
stream_name: &str,
102102
) -> Result<(), StreamError> {
103+
let mut reqwest_headers = http_header::HeaderMap::new();
104+
105+
for (key, value) in headers.iter() {
106+
reqwest_headers.insert(key.clone(), value.clone());
107+
}
103108
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
104109
log::error!("Fatal: failed to get ingestor info: {:?}", err);
105110
StreamError::Anyhow(err)
@@ -119,7 +124,7 @@ pub async fn sync_streams_with_ingestors(
119124
);
120125
let res = client
121126
.put(url)
122-
.headers(req.headers().into())
127+
.headers(reqwest_headers.clone())
123128
.header(header::AUTHORIZATION, &ingestor.token)
124129
.body(body.clone())
125130
.send()
@@ -572,7 +577,6 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
572577

573578
pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
574579
log::info!("Setting up schedular for cluster metrics ingestion");
575-
576580
let mut scheduler = AsyncScheduler::new();
577581
scheduler
578582
.every(CLUSTER_METRICS_INTERVAL_SECONDS)
@@ -583,11 +587,9 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
583587
if !metrics.is_empty() {
584588
log::info!("Cluster metrics fetched successfully from all ingestors");
585589
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
586-
let stream_name = INTERNAL_STREAM_NAME;
587-
588590
if matches!(
589591
ingest_internal_stream(
590-
stream_name.to_string(),
592+
INTERNAL_STREAM_NAME.to_string(),
591593
bytes::Bytes::from(metrics_bytes),
592594
)
593595
.await,

server/src/handlers/http/ingest.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
*
1717
*/
1818

19-
use super::cluster::INTERNAL_STREAM_NAME;
20-
use super::logstream::error::CreateStreamError;
19+
use super::logstream::error::{CreateStreamError, StreamError};
2120
use super::users::dashboards::DashboardError;
2221
use super::users::filters::FiltersError;
2322
use super::{kinesis, otel};
@@ -34,7 +33,7 @@ use crate::localcache::CacheError;
3433
use crate::metadata::error::stream_info::MetadataError;
3534
use crate::metadata::{self, STREAM_INFO};
3635
use crate::option::{Mode, CONFIG};
37-
use crate::storage::{LogStream, ObjectStorageError};
36+
use crate::storage::{LogStream, ObjectStorageError, StreamType};
3837
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3938
use crate::utils::json::convert_array_to_object;
4039
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
@@ -57,13 +56,14 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
5756
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
5857
{
5958
let stream_name = stream_name.to_str().unwrap().to_owned();
60-
if stream_name.eq(INTERNAL_STREAM_NAME) {
59+
let internal_stream_names = STREAM_INFO.list_internal_streams();
60+
if internal_stream_names.contains(&stream_name) {
6161
return Err(PostError::Invalid(anyhow::anyhow!(
62-
"Stream {} is an internal stream and cannot be ingested into",
62+
"The stream {} is reserved for internal use and cannot be ingested into",
6363
stream_name
6464
)));
6565
}
66-
create_stream_if_not_exists(&stream_name, false).await?;
66+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
6767

6868
flatten_and_push_logs(req, body, stream_name).await?;
6969
Ok(HttpResponse::Ok().finish())
@@ -73,7 +73,6 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
7373
}
7474

7575
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
76-
create_stream_if_not_exists(&stream_name, true).await?;
7776
let size: usize = body.len();
7877
let parsed_timestamp = Utc::now().naive_utc();
7978
let (rb, is_first) = {
@@ -100,6 +99,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
10099
parsed_timestamp,
101100
time_partition: None,
102101
custom_partition_values: HashMap::new(),
102+
stream_type: StreamType::Internal,
103103
}
104104
.process()
105105
.await?;
@@ -116,7 +116,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
116116
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
117117
{
118118
let stream_name = stream_name.to_str().unwrap().to_owned();
119-
create_stream_if_not_exists(&stream_name, false).await?;
119+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
120120

121121
//flatten logs
122122
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
@@ -176,7 +176,8 @@ async fn flatten_and_push_logs(
176176
// fails if the logstream does not exist
177177
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
178178
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
179-
if stream_name.eq(INTERNAL_STREAM_NAME) {
179+
let internal_stream_names = STREAM_INFO.list_internal_streams();
180+
if internal_stream_names.contains(&stream_name) {
180181
return Err(PostError::Invalid(anyhow::anyhow!(
181182
"Stream {} is an internal stream and cannot be ingested into",
182183
stream_name
@@ -202,6 +203,7 @@ pub async fn push_logs_unchecked(
202203
time_partition: None,
203204
is_first_event: true, // NOTE: Maybe should be false
204205
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
206+
stream_type: StreamType::UserDefined,
205207
};
206208
unchecked_event.process_unchecked()?;
207209

@@ -369,6 +371,7 @@ async fn create_process_record_batch(
369371
parsed_timestamp,
370372
time_partition: time_partition.clone(),
371373
custom_partition_values: custom_partition_values.clone(),
374+
stream_type: StreamType::UserDefined,
372375
}
373376
.process()
374377
.await?;
@@ -413,7 +416,7 @@ fn into_event_batch(
413416
// Check if the stream exists and create a new stream if doesn't exist
414417
pub async fn create_stream_if_not_exists(
415418
stream_name: &str,
416-
internal_stream: bool,
419+
stream_type: &str,
417420
) -> Result<(), PostError> {
418421
if STREAM_INFO.stream_exists(stream_name) {
419422
return Ok(());
@@ -427,7 +430,7 @@ pub async fn create_stream_if_not_exists(
427430
"",
428431
"",
429432
Arc::new(Schema::empty()),
430-
internal_stream,
433+
stream_type,
431434
)
432435
.await?;
433436
}
@@ -488,6 +491,8 @@ pub enum PostError {
488491
DashboardError(#[from] DashboardError),
489492
#[error("Error: {0}")]
490493
CacheError(#[from] CacheError),
494+
#[error("Error: {0}")]
495+
StreamError(#[from] StreamError),
491496
}
492497

493498
impl actix_web::ResponseError for PostError {
@@ -509,6 +514,7 @@ impl actix_web::ResponseError for PostError {
509514
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
510515
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
511516
PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR,
517+
PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
512518
}
513519
}
514520

0 commit comments

Comments
 (0)