@@ -183,14 +183,18 @@ impl UDFFlightClient {
183
183
let flight_info = self . inner . get_flight_info ( request) . await ?. into_inner ( ) ;
184
184
let schema = flight_info
185
185
. try_decode_schema ( )
186
- . map_err ( |err| ErrorCode :: UDFDataError ( format ! ( "Decode UDF schema error: {err}" ) ) )
186
+ . map_err ( |err| {
187
+ ErrorCode :: UDFDataError ( format ! (
188
+ "Decode UDF schema failed on UDF function {func_name}: {err}"
189
+ ) )
190
+ } )
187
191
. and_then ( |schema| DataSchema :: try_from ( & schema) ) ?;
188
192
189
193
let fields_num = schema. fields ( ) . len ( ) ;
190
194
if fields_num == 0 {
191
- return Err ( ErrorCode :: UDFSchemaMismatch (
192
- "UDF Server should return at least one column" ,
193
- ) ) ;
195
+ return Err ( ErrorCode :: UDFSchemaMismatch ( format ! (
196
+ "UDF Server should return at least one column on UDF function {func_name}"
197
+ ) ) ) ;
194
198
}
195
199
196
200
let ( input_fields, output_fields) = schema. fields ( ) . split_at ( fields_num - 1 ) ;
@@ -204,7 +208,8 @@ impl UDFFlightClient {
204
208
. collect :: < Vec < _ > > ( ) ;
205
209
if remote_arg_types != arg_types {
206
210
return Err ( ErrorCode :: UDFSchemaMismatch ( format ! (
207
- "UDF arg types mismatch, remote arg types: ({:?}), defined arg types: ({:?})" ,
211
+ "UDF arg types mismatch on UDF function {}, remote arg types: ({:?}), defined arg types: ({:?})" ,
212
+ func_name,
208
213
remote_arg_types
209
214
. iter( )
210
215
. map( ToString :: to_string)
@@ -220,8 +225,10 @@ impl UDFFlightClient {
220
225
221
226
if & expect_return_type[ 0 ] != return_type {
222
227
return Err ( ErrorCode :: UDFSchemaMismatch ( format ! (
223
- "UDF return type mismatch, actual return type: {}" ,
224
- expect_return_type[ 0 ]
228
+ "UDF return type mismatch on UDF function {}, expected return type: {}, actual return type: {}" ,
229
+ func_name,
230
+ expect_return_type[ 0 ] ,
231
+ return_type
225
232
) ) ) ;
226
233
}
227
234
@@ -251,13 +258,17 @@ impl UDFFlightClient {
251
258
let record_batch_stream = FlightRecordBatchStream :: new_from_flight_data (
252
259
flight_data_stream. map_err ( |err| err. into ( ) ) ,
253
260
)
254
- . map_err ( |err| ErrorCode :: UDFDataError ( format ! ( "Decode record batch error: {err}" ) ) ) ;
261
+ . map_err ( |err| {
262
+ ErrorCode :: UDFDataError ( format ! (
263
+ "Decode record batch failed on UDF function {func_name}: {err}"
264
+ ) )
265
+ } ) ;
255
266
256
267
let batches: Vec < RecordBatch > = record_batch_stream. try_collect ( ) . await ?;
257
268
if batches. is_empty ( ) {
258
- return Err ( ErrorCode :: EmptyDataFromServer (
259
- "Get empty data from UDF Server" ,
260
- ) ) ;
269
+ return Err ( ErrorCode :: EmptyDataFromServer ( format ! (
270
+ "Get empty data from UDF Server on UDF function {func_name}"
271
+ ) ) ) ;
261
272
}
262
273
263
274
let schema = batches[ 0 ] . schema ( ) ;
0 commit comments