Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions src/query/catalog/src/sbbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
use core::simd::cmp::SimdPartialEq;
use core::simd::Simd;

use databend_common_base::hints::assume;

/// Salt values as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
const SALT: [u32; 8] = [
0x47b6137b_u32,
Expand Down Expand Up @@ -244,15 +246,15 @@ impl Sbbf {
self.0[block_index].check(hash as u32)
}

/// Check a batch of hashes. The callback is triggered for each matching hash index.
pub fn check_hash_batch<F>(&self, hashes: &[u64], mut on_match: F)
where F: FnMut(usize) {
for (idx, &hash) in hashes.iter().enumerate() {
/// Check a batch of hashes and return a boolean vector indicating which hashes match.
pub fn check_hash_batch(&self, hashes: &[u64]) -> Vec<bool> {
let mut matches = Vec::with_capacity(hashes.len());
for &hash in hashes {
let block_index = self.hash_to_block_index(hash);
if self.0[block_index].check(hash as u32) {
on_match(idx);
}
assume(block_index < self.0.len());
matches.push(self.0[block_index].check(hash as u32));
}
matches
}

/// Merge another bloom filter into this one (bitwise OR operation)
Expand Down Expand Up @@ -312,9 +314,9 @@ mod tests {
let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
let hashes: Vec<u64> = (0..10_000).collect();
sbbf.insert_hash_batch(&hashes);
let mut matched = 0;
sbbf.check_hash_batch(&hashes, |_| matched += 1);
assert_eq!(matched, hashes.len());
let matches = sbbf.check_hash_batch(&hashes);
assert!(matches.iter().all(|m| *m));
assert_eq!(matches.len(), hashes.len());
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,10 +663,9 @@ impl NativeDeserializeDataTransform {
};

let probe_block = self.block_reader.build_block(&[column], None)?;
let mut bitmap = MutableBitmap::from_len_zeroed(probe_block.num_rows());
let probe_column = probe_block.get_last_column().clone();
// Apply the filter to the probe column.
ExprBloomFilter::new(&runtime_filter.filter).apply(probe_column, &mut bitmap)?;
let bitmap = ExprBloomFilter::new(&runtime_filter.filter).apply(probe_column)?;

let unset_bits = bitmap.null_count();
let elapsed = start.elapsed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::types::Bitmap;
use databend_common_expression::types::DataType;
use databend_common_expression::types::MutableBitmap;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::DataBlock;
use databend_common_expression::DataField;
Expand Down Expand Up @@ -175,13 +174,12 @@ impl DeserializeDataTransform {

let mut bitmaps = vec![];
for runtime_filter in self.cached_runtime_filter.as_ref().unwrap().iter() {
let mut bitmap = MutableBitmap::from_len_zeroed(data_block.num_rows());
let probe_block_entry = data_block.get_by_offset(runtime_filter.column_index);
let probe_column = probe_block_entry.to_column();

// Apply bloom filter
let start = Instant::now();
ExprBloomFilter::new(&runtime_filter.filter).apply(probe_column, &mut bitmap)?;
let bitmap = ExprBloomFilter::new(&runtime_filter.filter).apply(probe_column)?;
let elapsed = start.elapsed();
let unset_bits = bitmap.null_count();
runtime_filter
Expand Down
14 changes: 5 additions & 9 deletions src/query/storages/fuse/src/pruning/expr_bloom_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ impl<'a> ExprBloomFilter<'a> {
Self { filter }
}

/// Apply the bloom filter to a column, updating the bitmap.
pub fn apply(&self, column: Column, bitmap: &mut MutableBitmap) -> Result<()> {
/// Apply the bloom filter to a column and return the resulting bitmap.
pub fn apply(&self, column: Column) -> Result<MutableBitmap> {
let data_type = column.data_type();
let num_rows = column.len();
let method = DataBlock::choose_hash_method_with_types(&[data_type.clone()])?;
Expand All @@ -38,12 +38,8 @@ impl<'a> ExprBloomFilter<'a> {
let mut hashes = Vec::with_capacity(num_rows);
hash_by_method_for_bloom(&method, group_columns, num_rows, &mut hashes)?;
debug_assert_eq!(hashes.len(), num_rows);
let bitmap_len = bitmap.len();
self.filter.check_hash_batch(&hashes, |index| {
debug_assert!(index < bitmap_len);
unsafe { bitmap.set_unchecked(index, true) };
});

Ok(())
let results = self.filter.check_hash_batch(&hashes);
debug_assert_eq!(results.len(), num_rows);
Ok(MutableBitmap::from_trusted_len_iter(results.into_iter()))
}
}
Loading