Skip to content

Commit 4506159

Browse files
fix for create and update stream validation (#857)
added validation - time partition cannot be part of custom partition
1 parent 03b3226 commit 4506159

File tree

4 files changed

+37
-3
lines changed

4 files changed

+37
-3
lines changed

server/src/handlers/http/ingest.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
307307
}
308308

309309
fn get_parsed_timestamp(body: &Value, time_partition: &Option<String>) -> NaiveDateTime {
310-
let body_timestamp = body.get(&time_partition.clone().unwrap().to_string());
310+
let body_timestamp = body.get(time_partition.clone().unwrap().to_string());
311311
let parsed_timestamp = body_timestamp
312312
.unwrap()
313313
.to_owned()

server/src/handlers/http/logstream.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,23 @@ fn validate_custom_partition(custom_partition: &str) -> Result<(), CreateStreamE
246246
Ok(())
247247
}
248248

249+
fn validate_time_with_custom_partition(
250+
time_partition: &str,
251+
custom_partition: &str,
252+
) -> Result<(), CreateStreamError> {
253+
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
254+
if custom_partition_list.contains(&time_partition) {
255+
return Err(CreateStreamError::Custom {
256+
msg: format!(
257+
"time partition {} cannot be set as custom partition",
258+
time_partition
259+
),
260+
status: StatusCode::BAD_REQUEST,
261+
});
262+
}
263+
Ok(())
264+
}
265+
249266
fn validate_static_schema(
250267
body: &Bytes,
251268
stream_name: &str,
@@ -338,6 +355,10 @@ async fn create_update_stream(
338355
validate_custom_partition(&custom_partition)?;
339356
}
340357

358+
if !time_partition.is_empty() && !custom_partition.is_empty() {
359+
validate_time_with_custom_partition(&time_partition, &custom_partition)?;
360+
}
361+
341362
let schema = validate_static_schema(
342363
body,
343364
stream_name,
@@ -741,6 +762,7 @@ pub async fn update_custom_partition_in_stream(
741762
custom_partition: &str,
742763
) -> Result<(), CreateStreamError> {
743764
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name).unwrap();
765+
let time_partition = STREAM_INFO.get_time_partition(&stream_name).unwrap();
744766
if static_schema_flag.is_some() {
745767
let schema = STREAM_INFO.schema(&stream_name).unwrap();
746768

@@ -766,6 +788,18 @@ pub async fn update_custom_partition_in_stream(
766788
status: StatusCode::BAD_REQUEST,
767789
});
768790
}
791+
792+
if let Some(time_partition) = time_partition.clone() {
793+
if time_partition == *partition {
794+
return Err(CreateStreamError::Custom {
795+
msg: format!(
796+
"time partition {} cannot be set as custom partition",
797+
partition
798+
),
799+
status: StatusCode::BAD_REQUEST,
800+
});
801+
}
802+
}
769803
}
770804
}
771805
}

server/src/query/stream_schema_provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ impl TableProvider for StandardTableProvider {
427427

428428
let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema, 1);
429429
let remote_exec = create_parquet_physical_plan(
430-
ObjectStoreUrl::parse(&glob_storage.store_url()).unwrap(),
430+
ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(),
431431
partitioned_files,
432432
statistics,
433433
self.schema.clone(),

server/src/utils/json/flatten.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ pub fn validate_time_partition(
156156
} else {
157157
30
158158
};
159-
let body_timestamp = value.get(&time_partition.clone().unwrap().to_string());
159+
let body_timestamp = value.get(time_partition.clone().unwrap().to_string());
160160
if body_timestamp.is_some() && body_timestamp.unwrap().to_owned().as_str().is_some() {
161161
if body_timestamp
162162
.unwrap()

0 commit comments

Comments
 (0)