Skip to content

Commit 003539b

Browse files
restrict otel ingestion (#1263)
log ingestion is not allowed if stream is already associated with otel metrics or traces metrics ingestion is not allowed if stream is already associated with otel traces or any log formats similarly, traces ingestion is not allowed if stream is already associated with otel metrics or any log formats otel logs can be ingested with other log formats
1 parent 0f7e92c commit 003539b

File tree

2 files changed

+90
-12
lines changed

2 files changed

+90
-12
lines changed

src/handlers/http/ingest.rs

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,32 @@ pub async fn ingest(
8989
};
9090

9191
let log_source_entry = LogSourceEntry::new(log_source.clone(), fields);
92-
let p_custom_fields = get_custom_fields_from_header(req);
93-
92+
9493
PARSEABLE
9594
.create_stream_if_not_exists(
9695
&stream_name,
9796
StreamType::UserDefined,
98-
vec![log_source_entry],
97+
vec![log_source_entry.clone()],
9998
)
10099
.await?;
101100

101+
//if stream exists, fetch the stream log source
102+
//return error if the stream log source is otel traces or otel metrics
103+
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
104+
stream
105+
.get_log_source()
106+
.iter()
107+
.find(|&stream_log_source_entry| {
108+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
109+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
110+
})
111+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
112+
}
113+
114+
PARSEABLE
115+
.add_update_log_source(&stream_name, log_source_entry)
116+
.await?;
117+
let p_custom_fields = get_custom_fields_from_header(req);
102118
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
103119

104120
Ok(HttpResponse::Ok().finish())
@@ -159,9 +175,27 @@ pub async fn handle_otel_logs_ingestion(
159175
.create_stream_if_not_exists(
160176
&stream_name,
161177
StreamType::UserDefined,
162-
vec![log_source_entry],
178+
vec![log_source_entry.clone()],
163179
)
164180
.await?;
181+
182+
//if stream exists, fetch the stream log source
183+
//return error if the stream log source is otel traces or otel metrics
184+
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
185+
stream
186+
.get_log_source()
187+
.iter()
188+
.find(|&stream_log_source_entry| {
189+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
190+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
191+
})
192+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
193+
}
194+
195+
PARSEABLE
196+
.add_update_log_source(&stream_name, log_source_entry)
197+
.await?;
198+
165199
let p_custom_fields = get_custom_fields_from_header(req);
166200

167201
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
@@ -188,6 +222,7 @@ pub async fn handle_otel_metrics_ingestion(
188222
}
189223

190224
let stream_name = stream_name.to_str().unwrap().to_owned();
225+
191226
let log_source_entry = LogSourceEntry::new(
192227
log_source.clone(),
193228
OTEL_METRICS_KNOWN_FIELD_LIST
@@ -199,10 +234,26 @@ pub async fn handle_otel_metrics_ingestion(
199234
.create_stream_if_not_exists(
200235
&stream_name,
201236
StreamType::UserDefined,
202-
vec![log_source_entry],
237+
vec![log_source_entry.clone()],
203238
)
204239
.await?;
205240

241+
//if stream exists, fetch the stream log source
242+
//return error if the stream log source is not otel metrics
243+
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
244+
stream
245+
.get_log_source()
246+
.iter()
247+
.find(|&stream_log_source_entry| {
248+
stream_log_source_entry.log_source_format == log_source.clone()
249+
})
250+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
251+
}
252+
253+
PARSEABLE
254+
.add_update_log_source(&stream_name, log_source_entry)
255+
.await?;
256+
206257
let p_custom_fields = get_custom_fields_from_header(req);
207258

208259
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
@@ -229,6 +280,7 @@ pub async fn handle_otel_traces_ingestion(
229280
return Err(PostError::IncorrectLogSource(LogSource::OtelTraces));
230281
}
231282
let stream_name = stream_name.to_str().unwrap().to_owned();
283+
232284
let log_source_entry = LogSourceEntry::new(
233285
log_source.clone(),
234286
OTEL_TRACES_KNOWN_FIELD_LIST
@@ -241,10 +293,26 @@ pub async fn handle_otel_traces_ingestion(
241293
.create_stream_if_not_exists(
242294
&stream_name,
243295
StreamType::UserDefined,
244-
vec![log_source_entry],
296+
vec![log_source_entry.clone()],
245297
)
246298
.await?;
247299

300+
//if stream exists, fetch the stream log source
301+
//return error if the stream log source is not otel traces
302+
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
303+
stream
304+
.get_log_source()
305+
.iter()
306+
.find(|&stream_log_source_entry| {
307+
stream_log_source_entry.log_source_format == log_source.clone()
308+
})
309+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
310+
}
311+
312+
PARSEABLE
313+
.add_update_log_source(&stream_name, log_source_entry)
314+
.await?;
315+
248316
let p_custom_fields = get_custom_fields_from_header(req);
249317

250318
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
@@ -304,6 +372,18 @@ pub async fn post_event(
304372
_ => {}
305373
}
306374

375+
//if stream exists, fetch the stream log source
376+
//return error if the stream log source is otel traces or otel metrics
377+
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
378+
stream
379+
.get_log_source()
380+
.iter()
381+
.find(|&stream_log_source_entry| {
382+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
383+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
384+
})
385+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
386+
}
307387
let p_custom_fields = get_custom_fields_from_header(req);
308388
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
309389

@@ -373,6 +453,8 @@ pub enum PostError {
373453
MissingTimePartition(String),
374454
#[error("{0}")]
375455
KnownFormat(#[from] known_schema::Error),
456+
#[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")]
457+
IncorrectLogFormat(String),
376458
}
377459

378460
impl actix_web::ResponseError for PostError {
@@ -400,6 +482,7 @@ impl actix_web::ResponseError for PostError {
400482
PostError::IngestionNotAllowed => StatusCode::BAD_REQUEST,
401483
PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST,
402484
PostError::KnownFormat(_) => StatusCode::BAD_REQUEST,
485+
PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST,
403486
}
404487
}
405488

src/parseable/mod.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -427,11 +427,6 @@ impl Parseable {
427427
log_source: Vec<LogSourceEntry>,
428428
) -> Result<bool, PostError> {
429429
if self.streams.contains(stream_name) {
430-
for stream_log_source in log_source {
431-
self.add_update_log_source(stream_name, stream_log_source)
432-
.await?;
433-
}
434-
435430
return Ok(true);
436431
}
437432

@@ -443,7 +438,7 @@ impl Parseable {
443438
.create_stream_and_schema_from_storage(stream_name)
444439
.await?
445440
{
446-
return Ok(false);
441+
return Ok(true);
447442
}
448443

449444
self.create_stream(

0 commit comments

Comments
 (0)