17
17
18
18
//! "crypto" DataFusion functions
19
19
20
- use arrow:: array:: { Array , ArrayRef , BinaryArray , OffsetSizeTrait } ;
20
+ use arrow:: array:: {
21
+ Array , ArrayRef , BinaryArray , BinaryArrayType , BinaryViewArray , GenericBinaryArray ,
22
+ OffsetSizeTrait ,
23
+ } ;
21
24
use arrow:: array:: { AsArray , GenericStringArray , StringArray , StringViewArray } ;
22
25
use arrow:: datatypes:: DataType ;
23
26
use blake2:: { Blake2b512 , Blake2s256 , Digest } ;
@@ -26,8 +29,8 @@ use datafusion_common::cast::as_binary_array;
26
29
27
30
use arrow:: compute:: StringArrayType ;
28
31
use datafusion_common:: {
29
- cast :: as_generic_binary_array , exec_err, internal_err, plan_err,
30
- utils :: take_function_args , DataFusionError , Result , ScalarValue ,
32
+ exec_err, internal_err, plan_err, utils :: take_function_args , DataFusionError , Result ,
33
+ ScalarValue ,
31
34
} ;
32
35
use datafusion_expr:: ColumnarValue ;
33
36
use md5:: Md5 ;
@@ -203,6 +206,7 @@ pub fn utf8_or_binary_to_binary_type(
203
206
| DataType :: LargeUtf8
204
207
| DataType :: Utf8
205
208
| DataType :: Binary
209
+ | DataType :: BinaryView
206
210
| DataType :: LargeBinary => DataType :: Binary ,
207
211
DataType :: Null => DataType :: Null ,
208
212
_ => {
@@ -251,27 +255,17 @@ impl DigestAlgorithm {
251
255
where
252
256
T : OffsetSizeTrait ,
253
257
{
254
- let input_value = as_generic_binary_array :: < T > ( value) ?;
255
- let array: ArrayRef = match self {
256
- Self :: Md5 => digest_to_array ! ( Md5 , input_value) ,
257
- Self :: Sha224 => digest_to_array ! ( Sha224 , input_value) ,
258
- Self :: Sha256 => digest_to_array ! ( Sha256 , input_value) ,
259
- Self :: Sha384 => digest_to_array ! ( Sha384 , input_value) ,
260
- Self :: Sha512 => digest_to_array ! ( Sha512 , input_value) ,
261
- Self :: Blake2b => digest_to_array ! ( Blake2b512 , input_value) ,
262
- Self :: Blake2s => digest_to_array ! ( Blake2s256 , input_value) ,
263
- Self :: Blake3 => {
264
- let binary_array: BinaryArray = input_value
265
- . iter ( )
266
- . map ( |opt| {
267
- opt. map ( |x| {
268
- let mut digest = Blake3 :: default ( ) ;
269
- digest. update ( x) ;
270
- Blake3 :: finalize ( & digest) . as_bytes ( ) . to_vec ( )
271
- } )
272
- } )
273
- . collect ( ) ;
274
- Arc :: new ( binary_array)
258
+ let array = match value. data_type ( ) {
259
+ DataType :: Binary | DataType :: LargeBinary => {
260
+ let v = value. as_binary :: < T > ( ) ;
261
+ self . digest_binary_array_impl :: < & GenericBinaryArray < T > > ( v)
262
+ }
263
+ DataType :: BinaryView => {
264
+ let v = value. as_binary_view ( ) ;
265
+ self . digest_binary_array_impl :: < & BinaryViewArray > ( v)
266
+ }
267
+ other => {
268
+ return exec_err ! ( "unsupported type for digest_utf_array: {other:?}" )
275
269
}
276
270
} ;
277
271
Ok ( ColumnarValue :: Array ( array) )
@@ -328,6 +322,37 @@ impl DigestAlgorithm {
328
322
}
329
323
}
330
324
}
325
+
326
+ pub fn digest_binary_array_impl < ' a , BinaryArrType > (
327
+ self ,
328
+ input_value : BinaryArrType ,
329
+ ) -> ArrayRef
330
+ where
331
+ BinaryArrType : BinaryArrayType < ' a > ,
332
+ {
333
+ match self {
334
+ Self :: Md5 => digest_to_array ! ( Md5 , input_value) ,
335
+ Self :: Sha224 => digest_to_array ! ( Sha224 , input_value) ,
336
+ Self :: Sha256 => digest_to_array ! ( Sha256 , input_value) ,
337
+ Self :: Sha384 => digest_to_array ! ( Sha384 , input_value) ,
338
+ Self :: Sha512 => digest_to_array ! ( Sha512 , input_value) ,
339
+ Self :: Blake2b => digest_to_array ! ( Blake2b512 , input_value) ,
340
+ Self :: Blake2s => digest_to_array ! ( Blake2s256 , input_value) ,
341
+ Self :: Blake3 => {
342
+ let binary_array: BinaryArray = input_value
343
+ . iter ( )
344
+ . map ( |opt| {
345
+ opt. map ( |x| {
346
+ let mut digest = Blake3 :: default ( ) ;
347
+ digest. update ( x) ;
348
+ Blake3 :: finalize ( & digest) . as_bytes ( ) . to_vec ( )
349
+ } )
350
+ } )
351
+ . collect ( ) ;
352
+ Arc :: new ( binary_array)
353
+ }
354
+ }
355
+ }
331
356
}
332
357
pub fn digest_process (
333
358
value : & ColumnarValue ,
@@ -342,22 +367,27 @@ pub fn digest_process(
342
367
DataType :: LargeBinary => {
343
368
digest_algorithm. digest_binary_array :: < i64 > ( a. as_ref ( ) )
344
369
}
345
- other => exec_err ! (
346
- "Unsupported data type {other:?} for function {digest_algorithm}"
347
- ) ,
348
- } ,
349
- ColumnarValue :: Scalar ( scalar) => match scalar {
350
- ScalarValue :: Utf8View ( a)
351
- | ScalarValue :: Utf8 ( a)
352
- | ScalarValue :: LargeUtf8 ( a) => {
353
- Ok ( digest_algorithm
354
- . digest_scalar ( a. as_ref ( ) . map ( |s : & String | s. as_bytes ( ) ) ) )
370
+ DataType :: BinaryView => {
371
+ digest_algorithm. digest_binary_array :: < i32 > ( a. as_ref ( ) )
355
372
}
356
- ScalarValue :: Binary ( a) | ScalarValue :: LargeBinary ( a) => Ok ( digest_algorithm
357
- . digest_scalar ( a. as_ref ( ) . map ( |v : & Vec < u8 > | v. as_slice ( ) ) ) ) ,
358
373
other => exec_err ! (
359
374
"Unsupported data type {other:?} for function {digest_algorithm}"
360
375
) ,
361
376
} ,
377
+ ColumnarValue :: Scalar ( scalar) => {
378
+ match scalar {
379
+ ScalarValue :: Utf8View ( a)
380
+ | ScalarValue :: Utf8 ( a)
381
+ | ScalarValue :: LargeUtf8 ( a) => Ok ( digest_algorithm
382
+ . digest_scalar ( a. as_ref ( ) . map ( |s : & String | s. as_bytes ( ) ) ) ) ,
383
+ ScalarValue :: Binary ( a)
384
+ | ScalarValue :: LargeBinary ( a)
385
+ | ScalarValue :: BinaryView ( a) => Ok ( digest_algorithm
386
+ . digest_scalar ( a. as_ref ( ) . map ( |v : & Vec < u8 > | v. as_slice ( ) ) ) ) ,
387
+ other => exec_err ! (
388
+ "Unsupported data type {other:?} for function {digest_algorithm}"
389
+ ) ,
390
+ }
391
+ }
362
392
}
363
393
}
0 commit comments