12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
+ use std:: collections:: BTreeMap ;
15
16
use std:: collections:: HashMap ;
16
17
use std:: ops:: ControlFlow ;
18
+ use std:: ops:: Deref ;
17
19
use std:: sync:: Arc ;
18
20
19
21
use databend_common_ast:: Span ;
@@ -24,9 +26,11 @@ use databend_common_expression::eval_function;
24
26
use databend_common_expression:: expr:: * ;
25
27
use databend_common_expression:: types:: boolean:: BooleanDomain ;
26
28
use databend_common_expression:: types:: nullable:: NullableDomain ;
29
+ use databend_common_expression:: types:: AnyType ;
27
30
use databend_common_expression:: types:: Bitmap ;
28
31
use databend_common_expression:: types:: Buffer ;
29
32
use databend_common_expression:: types:: DataType ;
33
+ use databend_common_expression:: types:: MapType ;
30
34
use databend_common_expression:: types:: NullableType ;
31
35
use databend_common_expression:: types:: Number ;
32
36
use databend_common_expression:: types:: NumberDataType ;
@@ -35,16 +39,20 @@ use databend_common_expression::types::ValueType;
35
39
use databend_common_expression:: visit_expr;
36
40
use databend_common_expression:: BlockEntry ;
37
41
use databend_common_expression:: Column ;
42
+ use databend_common_expression:: ColumnBuilder ;
38
43
use databend_common_expression:: ColumnId ;
39
44
use databend_common_expression:: ConstantFolder ;
40
45
use databend_common_expression:: DataBlock ;
41
46
use databend_common_expression:: Domain ;
42
47
use databend_common_expression:: Expr ;
43
48
use databend_common_expression:: ExprVisitor ;
49
+ use databend_common_expression:: FieldIndex ;
44
50
use databend_common_expression:: FunctionContext ;
45
51
use databend_common_expression:: Scalar ;
52
+ use databend_common_expression:: ScalarRef ;
46
53
use databend_common_expression:: TableDataType ;
47
54
use databend_common_expression:: TableField ;
55
+ use databend_common_expression:: TableSchema ;
48
56
use databend_common_expression:: TableSchemaRef ;
49
57
use databend_common_expression:: Value ;
50
58
use databend_common_functions:: BUILTIN_FUNCTIONS ;
@@ -59,8 +67,11 @@ use serde::Serialize;
59
67
use super :: eliminate_cast:: is_injective_cast;
60
68
use crate :: eliminate_cast:: cast_const;
61
69
use crate :: filters:: BlockBloomFilterIndexVersion ;
70
+ use crate :: filters:: BlockFilter ;
62
71
use crate :: filters:: Filter ;
72
+ use crate :: filters:: FilterBuilder ;
63
73
use crate :: filters:: V2BloomBlock ;
74
+ use crate :: filters:: Xor8Builder ;
64
75
use crate :: filters:: Xor8Filter ;
65
76
use crate :: statistics_to_domain;
66
77
use crate :: Index ;
@@ -439,6 +450,175 @@ impl BloomIndex {
439
450
}
440
451
}
441
452
453
+ pub struct BloomIndexBuilder {
454
+ func_ctx : FunctionContext ,
455
+ columns : Vec < ColumnXor8Builder > ,
456
+ }
457
+
458
+ struct ColumnXor8Builder {
459
+ index : FieldIndex ,
460
+ field : TableField ,
461
+ builder : Xor8Builder ,
462
+ }
463
+
464
+ impl BloomIndexBuilder {
465
+ pub fn create (
466
+ func_ctx : FunctionContext ,
467
+ bloom_columns_map : BTreeMap < FieldIndex , TableField > ,
468
+ ) -> Self {
469
+ let columns = bloom_columns_map
470
+ . iter ( )
471
+ . map ( |( & index, field) | ColumnXor8Builder {
472
+ index,
473
+ field : field. clone ( ) ,
474
+ builder : Xor8Builder :: create ( ) ,
475
+ } )
476
+ . collect ( ) ;
477
+ Self { func_ctx, columns }
478
+ }
479
+
480
+ pub fn add_block ( & mut self , block : & DataBlock ) -> Result < ( ) > {
481
+ if block. is_empty ( ) {
482
+ return Err ( ErrorCode :: BadArguments ( "block is empty" ) ) ;
483
+ }
484
+ if block. num_columns ( ) == 0 {
485
+ return Ok ( ( ) ) ;
486
+ }
487
+
488
+ let mut keys_to_remove = Vec :: with_capacity ( self . columns . len ( ) ) ;
489
+ for ( index, bloom_index_column) in self . columns . iter_mut ( ) . enumerate ( ) {
490
+ let field_type = & block. get_by_offset ( bloom_index_column. index ) . data_type ;
491
+ if !Xor8Filter :: supported_type ( field_type) {
492
+ keys_to_remove. push ( index) ;
493
+ continue ;
494
+ }
495
+
496
+ let column = match & block. get_by_offset ( bloom_index_column. index ) . value {
497
+ Value :: Scalar ( s) => {
498
+ let builder = ColumnBuilder :: repeat ( & s. as_ref ( ) , 1 , field_type) ;
499
+ builder. build ( )
500
+ }
501
+ Value :: Column ( c) => c. clone ( ) ,
502
+ } ;
503
+
504
+ let ( column, data_type) = match field_type. remove_nullable ( ) {
505
+ DataType :: Map ( box inner_ty) => {
506
+ // Add bloom filter for the value of map type
507
+ let map_column = if field_type. is_nullable ( ) {
508
+ let nullable_column =
509
+ NullableType :: < MapType < AnyType , AnyType > > :: try_downcast_column ( & column)
510
+ . unwrap ( ) ;
511
+ nullable_column. column
512
+ } else {
513
+ MapType :: < AnyType , AnyType > :: try_downcast_column ( & column) . unwrap ( )
514
+ } ;
515
+ let column = map_column. underlying_column ( ) . values ;
516
+
517
+ let DataType :: Tuple ( kv_tys) = inner_ty else {
518
+ unreachable ! ( ) ;
519
+ } ;
520
+ let val_type = kv_tys[ 1 ] . clone ( ) ;
521
+ // Extract JSON value of string type to create bloom index,
522
+ // other types of JSON value will be ignored.
523
+ if val_type. remove_nullable ( ) == DataType :: Variant {
524
+ let mut builder = ColumnBuilder :: with_capacity (
525
+ & DataType :: Nullable ( Box :: new ( DataType :: String ) ) ,
526
+ column. len ( ) ,
527
+ ) ;
528
+ for val in column. iter ( ) {
529
+ if let ScalarRef :: Variant ( v) = val {
530
+ let raw_jsonb = RawJsonb :: new ( v) ;
531
+ if let Ok ( Some ( str_val) ) = raw_jsonb. as_str ( ) {
532
+ builder. push ( ScalarRef :: String ( & str_val) ) ;
533
+ continue ;
534
+ }
535
+ }
536
+ builder. push_default ( ) ;
537
+ }
538
+ let str_column = builder. build ( ) ;
539
+ if BloomIndex :: check_large_string ( & str_column) {
540
+ keys_to_remove. push ( index) ;
541
+ continue ;
542
+ }
543
+ let str_type = DataType :: Nullable ( Box :: new ( DataType :: String ) ) ;
544
+ ( str_column, str_type)
545
+ } else {
546
+ if BloomIndex :: check_large_string ( & column) {
547
+ keys_to_remove. push ( index) ;
548
+ continue ;
549
+ }
550
+ ( column, val_type)
551
+ }
552
+ }
553
+ _ => {
554
+ if BloomIndex :: check_large_string ( & column) {
555
+ keys_to_remove. push ( index) ;
556
+ continue ;
557
+ }
558
+ ( column, field_type. clone ( ) )
559
+ }
560
+ } ;
561
+
562
+ let ( column, validity) =
563
+ BloomIndex :: calculate_nullable_column_digest ( & self . func_ctx , & column, & data_type) ?;
564
+
565
+ // create filter per column
566
+ if validity. as_ref ( ) . map ( |v| v. null_count ( ) ) . unwrap_or ( 0 ) > 0 {
567
+ let validity = validity. unwrap ( ) ;
568
+ let it = column. deref ( ) . iter ( ) . zip ( validity. iter ( ) ) . map (
569
+ |( v, b) | {
570
+ if !b {
571
+ & 0
572
+ } else {
573
+ v
574
+ }
575
+ } ,
576
+ ) ;
577
+ bloom_index_column. builder . add_digests ( it) ;
578
+ } else {
579
+ bloom_index_column. builder . add_digests ( column. deref ( ) ) ;
580
+ }
581
+ }
582
+ for k in keys_to_remove {
583
+ self . columns . remove ( k) ;
584
+ }
585
+ Ok ( ( ) )
586
+ }
587
+
588
+ pub fn finalize ( mut self ) -> Result < Option < BloomIndex > > {
589
+ let mut column_distinct_count = HashMap :: with_capacity ( self . columns . len ( ) ) ;
590
+ let mut filters = Vec :: with_capacity ( self . columns . len ( ) ) ;
591
+ let mut filter_fields = Vec :: with_capacity ( self . columns . len ( ) ) ;
592
+ for column in self . columns . iter_mut ( ) {
593
+ let filter = column. builder . build ( ) ?;
594
+ if let Some ( len) = filter. len ( ) {
595
+ if !matches ! (
596
+ column. field. data_type( ) . remove_nullable( ) ,
597
+ TableDataType :: Map ( _) | TableDataType :: Variant
598
+ ) {
599
+ column_distinct_count. insert ( column. field . column_id , len) ;
600
+ }
601
+ }
602
+ let filter_name =
603
+ BloomIndex :: build_filter_column_name ( BlockFilter :: VERSION , & column. field ) ?;
604
+ filter_fields. push ( TableField :: new ( & filter_name, TableDataType :: Binary ) ) ;
605
+ filters. push ( Arc :: new ( filter) ) ;
606
+ }
607
+
608
+ if filter_fields. is_empty ( ) {
609
+ return Ok ( None ) ;
610
+ }
611
+ let filter_schema = Arc :: new ( TableSchema :: new ( filter_fields) ) ;
612
+ Ok ( Some ( BloomIndex {
613
+ func_ctx : self . func_ctx ,
614
+ version : BlockFilter :: VERSION ,
615
+ filter_schema,
616
+ filters,
617
+ column_distinct_count,
618
+ } ) )
619
+ }
620
+ }
621
+
442
622
struct Visitor < T : EqVisitor > ( T ) ;
443
623
444
624
impl < T > ExprVisitor < String > for Visitor < T >
0 commit comments