Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(udf): tune the log message about udf decode error #17414

Merged
merged 3 commits into from
Feb 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions src/query/expression/src/utils/udf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,18 @@ impl UDFFlightClient {
let flight_info = self.inner.get_flight_info(request).await?.into_inner();
let schema = flight_info
.try_decode_schema()
.map_err(|err| ErrorCode::UDFDataError(format!("Decode UDF schema error: {err}")))
.map_err(|err| {
ErrorCode::UDFDataError(format!(
"Decode UDF schema failed on UDF function {func_name}: {err}"
))
})
.and_then(|schema| DataSchema::try_from(&schema))?;

let fields_num = schema.fields().len();
if fields_num == 0 {
return Err(ErrorCode::UDFSchemaMismatch(
"UDF Server should return at least one column",
));
return Err(ErrorCode::UDFSchemaMismatch(format!(
"UDF Server should return at least one column on UDF function {func_name}"
)));
}

let (input_fields, output_fields) = schema.fields().split_at(fields_num - 1);
Expand All @@ -204,7 +208,8 @@ impl UDFFlightClient {
.collect::<Vec<_>>();
if remote_arg_types != arg_types {
return Err(ErrorCode::UDFSchemaMismatch(format!(
"UDF arg types mismatch, remote arg types: ({:?}), defined arg types: ({:?})",
"UDF arg types mismatch on UDF function {}, remote arg types: ({:?}), defined arg types: ({:?})",
func_name,
remote_arg_types
.iter()
.map(ToString::to_string)
Expand All @@ -220,8 +225,10 @@ impl UDFFlightClient {

if &expect_return_type[0] != return_type {
return Err(ErrorCode::UDFSchemaMismatch(format!(
"UDF return type mismatch, actual return type: {}",
expect_return_type[0]
"UDF return type mismatch on UDF function {}, expected return type: {}, actual return type: {}",
func_name,
expect_return_type[0],
return_type
)));
}

Expand Down Expand Up @@ -251,13 +258,17 @@ impl UDFFlightClient {
let record_batch_stream = FlightRecordBatchStream::new_from_flight_data(
flight_data_stream.map_err(|err| err.into()),
)
.map_err(|err| ErrorCode::UDFDataError(format!("Decode record batch error: {err}")));
.map_err(|err| {
ErrorCode::UDFDataError(format!(
"Decode record batch failed on UDF function {func_name}: {err}"
))
});

let batches: Vec<RecordBatch> = record_batch_stream.try_collect().await?;
if batches.is_empty() {
return Err(ErrorCode::EmptyDataFromServer(
"Get empty data from UDF Server",
));
return Err(ErrorCode::EmptyDataFromServer(format!(
"Get empty data from UDF Server on UDF function {func_name}"
)));
}

let schema = batches[0].schema();
Expand Down