Skip to content

Commit 29338a8

Browse files
allow specifying otel index id in header (#5503)
1 parent 6b4acd0 commit 29338a8

File tree

1 file changed

+65
-19
lines changed

1 file changed

+65
-19
lines changed

quickwit/quickwit-serve/src/otlp_api/rest_handler.rs

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

2020
use quickwit_common::rate_limited_error;
21-
use quickwit_opentelemetry::otlp::{
22-
OtelSignal, OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID,
23-
OTEL_TRACES_INDEX_ID,
24-
};
21+
use quickwit_opentelemetry::otlp::{OtelSignal, OtlpGrpcLogsService, OtlpGrpcTracesService};
2522
use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsService;
2623
use quickwit_proto::opentelemetry::proto::collector::logs::v1::{
2724
ExportLogsServiceRequest, ExportLogsServiceResponse,
@@ -80,12 +77,18 @@ pub(crate) fn otlp_default_logs_handler(
8077
"content-type",
8178
"application/x-protobuf",
8279
))
80+
.and(warp::header::optional::<String>(
81+
OtelSignal::Logs.header_name(),
82+
))
8383
.and(warp::post())
8484
.and(get_body_bytes())
85-
.then(|otlp_logs_service, body| async move {
86-
// TODO get index id from header if available
87-
otlp_ingest_logs(otlp_logs_service, OTEL_LOGS_INDEX_ID.to_string(), body).await
88-
})
85+
.then(
86+
|otlp_logs_service, index_id: Option<String>, body| async move {
87+
let index_id =
88+
index_id.unwrap_or_else(|| OtelSignal::Logs.default_index_id().to_string());
89+
otlp_ingest_logs(otlp_logs_service, index_id, body).await
90+
},
91+
)
8992
.and(with_arg(BodyFormat::default()))
9093
.map(into_rest_api_response)
9194
}
@@ -134,12 +137,18 @@ pub(crate) fn otlp_default_traces_handler(
134137
"content-type",
135138
"application/x-protobuf",
136139
))
140+
.and(warp::header::optional::<String>(
141+
OtelSignal::Traces.header_name(),
142+
))
137143
.and(warp::post())
138144
.and(get_body_bytes())
139-
.then(|otlp_traces_service, body| async move {
140-
// TODO get index id from header if available
141-
otlp_ingest_traces(otlp_traces_service, OTEL_TRACES_INDEX_ID.to_string(), body).await
142-
})
145+
.then(
146+
|otlp_traces_service, index_id: Option<String>, body| async move {
147+
let index_id =
148+
index_id.unwrap_or_else(|| OtelSignal::Traces.default_index_id().to_string());
149+
otlp_ingest_traces(otlp_traces_service, index_id, body).await
150+
},
151+
)
143152
.and(with_arg(BodyFormat::default()))
144153
.map(into_rest_api_response)
145154
}
@@ -194,7 +203,6 @@ async fn otlp_ingest_logs(
194203
index_id: IndexId,
195204
body: Body,
196205
) -> Result<ExportLogsServiceResponse, OtlpApiError> {
197-
// TODO: use index ID.
198206
let export_logs_request: ExportLogsServiceRequest =
199207
prost::Message::decode(&body.content[..])
200208
.map_err(|err| OtlpApiError::InvalidPayload(err.to_string()))?;
@@ -295,14 +303,14 @@ mod tests {
295303
});
296304
mock_ingest_router
297305
.expect_ingest()
298-
.times(1)
306+
.times(2)
299307
.withf(|request| {
300308
if request.subrequests.len() == 1 {
301309
let subrequest = &request.subrequests[0];
302310
subrequest.doc_batch.is_some()
303311
// && request.commit == CommitType::Auto as i32
304312
&& subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 1
305-
&& subrequest.index_id == "otel-traces-v0_6"
313+
&& subrequest.index_id == "otel-logs-v0_6"
306314
} else {
307315
false
308316
}
@@ -391,9 +399,31 @@ mod tests {
391399
);
392400
}
393401
{
394-
// Test endpoint with given index ID.
402+
// Test endpoint with index ID through header
403+
let resp = warp::test::request()
404+
.path("/otlp/v1/logs")
405+
.method("POST")
406+
.header("content-type", "application/x-protobuf")
407+
.header("qw-otel-logs-index", "otel-logs-v0_6")
408+
.body(body.clone())
409+
.reply(&otlp_traces_api_handler)
410+
.await;
411+
assert_eq!(resp.status(), 200);
412+
let actual_response: ExportLogsServiceResponse =
413+
serde_json::from_slice(resp.body()).unwrap();
414+
assert!(actual_response.partial_success.is_some());
415+
assert_eq!(
416+
actual_response
417+
.partial_success
418+
.unwrap()
419+
.rejected_log_records,
420+
0
421+
);
422+
}
423+
{
424+
// Test endpoint with given index ID through path.
395425
let resp = warp::test::request()
396-
.path("/otel-traces-v0_6/otlp/v1/logs")
426+
.path("/otel-logs-v0_6/otlp/v1/logs")
397427
.method("POST")
398428
.header("content-type", "application/x-protobuf")
399429
.body(body.clone())
@@ -441,7 +471,7 @@ mod tests {
441471
});
442472
mock_ingest_router
443473
.expect_ingest()
444-
.times(1)
474+
.times(2)
445475
.withf(|request| {
446476
if request.subrequests.len() == 1 {
447477
let subrequest = &request.subrequests[0];
@@ -503,7 +533,23 @@ mod tests {
503533
assert_eq!(actual_response.partial_success.unwrap().rejected_spans, 0);
504534
}
505535
{
506-
// Test endpoint with given index ID.
536+
// Test endpoint with given index ID through header.
537+
let resp = warp::test::request()
538+
.path("/otlp/v1/traces")
539+
.method("POST")
540+
.header("content-type", "application/x-protobuf")
541+
.header("qw-otel-traces-index", "otel-traces-v0_6")
542+
.body(body.clone())
543+
.reply(&otlp_traces_api_handler)
544+
.await;
545+
assert_eq!(resp.status(), 200);
546+
let actual_response: ExportTraceServiceResponse =
547+
serde_json::from_slice(resp.body()).unwrap();
548+
assert!(actual_response.partial_success.is_some());
549+
assert_eq!(actual_response.partial_success.unwrap().rejected_spans, 0);
550+
}
551+
{
552+
// Test endpoint with given index ID through path.
507553
let resp = warp::test::request()
508554
.path("/otel-traces-v0_6/otlp/v1/traces")
509555
.method("POST")

0 commit comments

Comments
 (0)