Skip to content

Commit 47eaf48

Browse files
fix: S3 calls optimization and server fixes (#818)
* S3 calls optimization and server fixes improved error logging * otel - parse intValue and doubleValue as string in attributes * fixed deleted events and deleted event ingestion size label names
1 parent 32c9456 commit 47eaf48

20 files changed

+432
-427
lines changed

server/src/catalog.rs

Lines changed: 88 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ use std::{io::ErrorKind, sync::Arc};
2020

2121
use self::{column::Column, snapshot::ManifestItem};
2222
use crate::handlers::http::base_path_without_preceding_slash;
23+
use crate::metadata::STREAM_INFO;
2324
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
2425
use crate::option::CONFIG;
25-
use crate::stats::{event_labels_date, storage_size_labels_date, update_deleted_stats};
26+
use crate::stats::{
27+
event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats,
28+
};
2629
use crate::{
2730
catalog::manifest::Manifest,
2831
event::DEFAULT_TIMESTAMP_KEY,
@@ -103,9 +106,8 @@ pub async fn update_snapshot(
103106
change: manifest::File,
104107
) -> Result<(), ObjectStorageError> {
105108
let mut meta = storage.get_object_store_format(stream_name).await?;
106-
let meta_clone = meta.clone();
107109
let manifests = &mut meta.snapshot.manifest_list;
108-
let time_partition = &meta_clone.time_partition;
110+
let time_partition = &meta.time_partition;
109111
let lower_bound = match time_partition {
110112
Some(time_partition) => {
111113
let (lower_bound, _) = get_file_bounds(&change, time_partition.to_string());
@@ -174,12 +176,17 @@ pub async fn update_snapshot(
174176
}
175177
}
176178

177-
meta.snapshot.manifest_list = manifests.to_vec();
178-
storage.put_snapshot(stream_name, meta.snapshot).await?;
179179
if ch {
180180
if let Some(mut manifest) = storage.get_manifest(&path).await? {
181181
manifest.apply_change(change);
182182
storage.put_manifest(&path, manifest).await?;
183+
let stats = get_current_stats(stream_name, "json");
184+
if let Some(stats) = stats {
185+
meta.stats = stats;
186+
}
187+
meta.snapshot.manifest_list = manifests.to_vec();
188+
189+
storage.put_stream_manifest(stream_name, &meta).await?;
183190
} else {
184191
//instead of returning an error, create a new manifest (otherwise local to storage sync fails)
185192
//but don't update the snapshot
@@ -189,7 +196,7 @@ pub async fn update_snapshot(
189196
storage.clone(),
190197
stream_name,
191198
false,
192-
meta_clone,
199+
meta,
193200
events_ingested,
194201
ingestion_size,
195202
storage_size,
@@ -203,7 +210,7 @@ pub async fn update_snapshot(
203210
storage.clone(),
204211
stream_name,
205212
true,
206-
meta_clone,
213+
meta,
207214
events_ingested,
208215
ingestion_size,
209216
storage_size,
@@ -217,7 +224,7 @@ pub async fn update_snapshot(
217224
storage.clone(),
218225
stream_name,
219226
true,
220-
meta_clone,
227+
meta,
221228
events_ingested,
222229
ingestion_size,
223230
storage_size,
@@ -256,6 +263,30 @@ async fn create_manifest(
256263
files: vec![change],
257264
..Manifest::default()
258265
};
266+
let mut first_event_at = STREAM_INFO.get_first_event(stream_name)?;
267+
if first_event_at.is_none() {
268+
if let Some(first_event) = manifest.files.first() {
269+
let time_partition = &meta.time_partition;
270+
let lower_bound = match time_partition {
271+
Some(time_partition) => {
272+
let (lower_bound, _) = get_file_bounds(first_event, time_partition.to_string());
273+
lower_bound
274+
}
275+
None => {
276+
let (lower_bound, _) =
277+
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
278+
lower_bound
279+
}
280+
};
281+
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
282+
if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) {
283+
log::error!(
284+
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
285+
stream_name
286+
);
287+
}
288+
}
289+
}
259290

260291
let mainfest_file_name = manifest_path("").to_string();
261292
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
@@ -275,7 +306,12 @@ async fn create_manifest(
275306
};
276307
manifests.push(new_snapshot_entry);
277308
meta.snapshot.manifest_list = manifests;
278-
storage.put_snapshot(stream_name, meta.snapshot).await?;
309+
let stats = get_current_stats(stream_name, "json");
310+
if let Some(stats) = stats {
311+
meta.stats = stats;
312+
}
313+
meta.first_event_at = first_event_at;
314+
storage.put_stream_manifest(stream_name, &meta).await?;
279315
}
280316

281317
Ok(())
@@ -294,6 +330,8 @@ pub async fn remove_manifest_from_snapshot(
294330
let manifests = &mut meta.snapshot.manifest_list;
295331
// Filter out items whose manifest_path contains any of the dates_to_delete
296332
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
333+
meta.first_event_at = None;
334+
STREAM_INFO.set_first_event_at(stream_name, None)?;
297335
storage.put_snapshot(stream_name, meta.snapshot).await?;
298336
}
299337
match CONFIG.parseable.mode {
@@ -313,39 +351,48 @@ pub async fn get_first_event(
313351
match CONFIG.parseable.mode {
314352
Mode::All | Mode::Ingest => {
315353
// get current snapshot
316-
let mut meta = storage.get_object_store_format(stream_name).await?;
317-
let manifests = &mut meta.snapshot.manifest_list;
318-
let time_partition = meta.time_partition;
319-
if manifests.is_empty() {
320-
log::info!("No manifest found for stream {stream_name}");
321-
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
322-
}
323-
let manifest = &manifests[0];
324-
let path = partition_path(
325-
stream_name,
326-
manifest.time_lower_bound,
327-
manifest.time_upper_bound,
328-
);
329-
let Some(manifest) = storage.get_manifest(&path).await? else {
330-
return Err(ObjectStorageError::UnhandledError(
331-
"Manifest found in snapshot but not in object-storage"
332-
.to_string()
333-
.into(),
334-
));
335-
};
336-
if let Some(first_event) = manifest.files.first() {
337-
let lower_bound = match time_partition {
338-
Some(time_partition) => {
339-
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
340-
lower_bound
341-
}
342-
None => {
343-
let (lower_bound, _) =
344-
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
345-
lower_bound
346-
}
354+
let stream_first_event = STREAM_INFO.get_first_event(stream_name)?;
355+
if stream_first_event.is_some() {
356+
first_event_at = stream_first_event.unwrap();
357+
} else {
358+
let mut meta = storage.get_object_store_format(stream_name).await?;
359+
let meta_clone = meta.clone();
360+
let manifests = meta_clone.snapshot.manifest_list;
361+
let time_partition = meta_clone.time_partition;
362+
if manifests.is_empty() {
363+
log::info!("No manifest found for stream {stream_name}");
364+
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
365+
}
366+
let manifest = &manifests[0];
367+
let path = partition_path(
368+
stream_name,
369+
manifest.time_lower_bound,
370+
manifest.time_upper_bound,
371+
);
372+
let Some(manifest) = storage.get_manifest(&path).await? else {
373+
return Err(ObjectStorageError::UnhandledError(
374+
"Manifest found in snapshot but not in object-storage"
375+
.to_string()
376+
.into(),
377+
));
347378
};
348-
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
379+
if let Some(first_event) = manifest.files.first() {
380+
let lower_bound = match time_partition {
381+
Some(time_partition) => {
382+
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
383+
lower_bound
384+
}
385+
None => {
386+
let (lower_bound, _) =
387+
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
388+
lower_bound
389+
}
390+
};
391+
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
392+
meta.first_event_at = Some(first_event_at.clone());
393+
storage.put_stream_manifest(stream_name, &meta).await?;
394+
STREAM_INFO.set_first_event_at(stream_name, Some(first_event_at.clone()))?;
395+
}
349396
}
350397
}
351398
Mode::Query => {

server/src/event/format/json.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
221221
}
222222
}
223223
DataType::Timestamp(_, _) => value.is_string() || value.is_number(),
224-
_ => unreachable!(),
224+
_ => {
225+
log::error!("Unsupported datatype {:?}, value {:?}", data_type, value);
226+
unreachable!()
227+
}
225228
}
226229
}

server/src/handlers/http/ingest.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::handlers::{
3131
STREAM_NAME_HEADER_KEY,
3232
};
3333
use crate::localcache::CacheError;
34+
use crate::metadata::error::stream_info::MetadataError;
3435
use crate::metadata::{self, STREAM_INFO};
3536
use crate::option::{Mode, CONFIG};
3637
use crate::storage::{LogStream, ObjectStorageError};
@@ -62,7 +63,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
6263
stream_name
6364
)));
6465
}
65-
create_stream_if_not_exists(&stream_name).await?;
66+
create_stream_if_not_exists(&stream_name, false).await?;
6667

6768
flatten_and_push_logs(req, body, stream_name).await?;
6869
Ok(HttpResponse::Ok().finish())
@@ -72,7 +73,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
7273
}
7374

7475
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
75-
create_stream_if_not_exists(&stream_name).await?;
76+
create_stream_if_not_exists(&stream_name, true).await?;
7677
let size: usize = body.len();
7778
let parsed_timestamp = Utc::now().naive_utc();
7879
let (rb, is_first) = {
@@ -115,7 +116,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
115116
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
116117
{
117118
let stream_name = stream_name.to_str().unwrap().to_owned();
118-
create_stream_if_not_exists(&stream_name).await?;
119+
create_stream_if_not_exists(&stream_name, false).await?;
119120

120121
//flatten logs
121122
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
@@ -128,7 +129,6 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
128129
push_logs(stream_name.to_string(), req.clone(), body).await?;
129130
}
130131
} else {
131-
log::warn!("Unknown log source: {}", log_source);
132132
return Err(PostError::CustomError("Unknown log source".to_string()));
133133
}
134134
} else {
@@ -206,16 +206,10 @@ pub async fn push_logs_unchecked(
206206
}
207207

208208
async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> {
209-
let glob_storage = CONFIG.storage().get_object_store();
210-
let object_store_format = glob_storage
211-
.get_object_store_format(&stream_name)
212-
.await
213-
.map_err(|_| PostError::StreamNotFound(stream_name.clone()))?;
214-
215-
let time_partition = object_store_format.time_partition;
216-
let time_partition_limit = object_store_format.time_partition_limit;
217-
let static_schema_flag = object_store_format.static_schema_flag;
218-
let custom_partition = object_store_format.custom_partition;
209+
let time_partition = STREAM_INFO.get_time_partition(&stream_name)?;
210+
let time_partition_limit = STREAM_INFO.get_time_partition_limit(&stream_name)?;
211+
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name)?;
212+
let custom_partition = STREAM_INFO.get_custom_partition(&stream_name)?;
219213
let body_val: Value = serde_json::from_slice(&body)?;
220214
let size: usize = body.len();
221215
let mut parsed_timestamp = Utc::now().naive_utc();
@@ -414,7 +408,10 @@ fn into_event_batch(
414408
}
415409

416410
// Check if the stream exists and create a new stream if doesn't exist
417-
pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostError> {
411+
pub async fn create_stream_if_not_exists(
412+
stream_name: &str,
413+
internal_stream: bool,
414+
) -> Result<(), PostError> {
418415
if STREAM_INFO.stream_exists(stream_name) {
419416
return Ok(());
420417
}
@@ -427,6 +424,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
427424
"",
428425
"",
429426
Arc::new(Schema::empty()),
427+
internal_stream,
430428
)
431429
.await?;
432430
}
@@ -440,7 +438,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
440438
}) {
441439
log::error!("Stream {} not found", stream_name);
442440
return Err(PostError::Invalid(anyhow::anyhow!(
443-
"Stream {} not found. Has it been created?",
441+
"Stream `{}` not found. Please create it using the Query server.",
444442
stream_name
445443
)));
446444
}
@@ -472,6 +470,8 @@ pub enum PostError {
472470
Invalid(#[from] anyhow::Error),
473471
#[error("{0}")]
474472
CreateStream(#[from] CreateStreamError),
473+
#[error("Error: {0}")]
474+
MetadataStreamError(#[from] MetadataError),
475475
#[allow(unused)]
476476
#[error("Error: {0}")]
477477
CustomError(String),
@@ -498,6 +498,7 @@ impl actix_web::ResponseError for PostError {
498498
StatusCode::BAD_REQUEST
499499
}
500500
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
501+
PostError::MetadataStreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
501502
PostError::StreamNotFound(_) => StatusCode::NOT_FOUND,
502503
PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR,
503504
PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR,

0 commit comments

Comments
 (0)